侧边栏壁纸
博主头像
Elltor

用心发现生活,积极面对工作、事,坚持每天让自己进步一点。

  • 累计撰写 169 篇文章
  • 累计创建 1 个标签
  • 累计收到 12 条评论
标签搜索

目 录CONTENT

文章目录

Java线程池总结

Elltor
2021-05-27 / 0 评论 / 0 点赞 / 107 阅读 / 15,522 字 / 正在检测是否收录...

一、常用接口及实现类

1. Executor、ExecutorService

Executor里定义了一个execute方法,这个方法接受一个实现Runnable的对象。

public interface Executor {
    void execute(Runnable command);
}

ExecutorService是继承Excutor的一个接口,它定义了Executor框架常用的方法,如提交任务、线程池关闭、判断线程池的状态等方法。

image.png

2. Runnable、Callable

Runnable定义了一个没有返回值的可执行方法,Callable定义了一个有返回值的方法。

实现这两个接口的类都可以做为线程Thread执行的对象,这里演示一个Callable。

    public void test() throws Exception {
        FutureTask<String> task = new FutureTask<>(new Callable() {
            @Override
            public Object call() throws Exception {
                String res = "hello world";
                Thread.sleep(3000);
                return res;
            }
        });
        
        new Thread(task).start();
        
        task.get();//call return前阻塞
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Callable<String> c = new Callable() {
            @Override
            public String call() throws Exception {
                return "Hello Callable";
            }
        };

        ExecutorService service = Executors.newCachedThreadPool();
        Future<String> future = service.submit(c); //异步

        System.out.println(future.get());//获取到结果前阻塞

        service.shutdown();
    }

3. Executors 线程池工具

Executors是线程池的工具类,类似于集合框架的Collections工具类。

已经预定义了一些线程池,设置参数后可以直接使用。

image.png

4. 线程池的关闭

线程不是立即关闭(终结)的,关闭和终结是两个不同的状态。

线程池的关闭即遍历每个线程然后触发线程的中断(interrupt)。

    public static void main(String[] args) throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(5); //execute submit
        for (int i = 0; i < 6; i++) {
            service.execute(() -> {
                try {
                    TimeUnit.MILLISECONDS.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName());
            });
        }
        System.out.println(service);

        // 关闭线程
        service.shutdown();
        // isTerminated() 为false说明线程run方法还未执行完毕
        System.out.println(service.isTerminated());
        //此时线程已经处在关闭状态,但是还未终结(终结是线程执行完毕从run方法退出)
        System.out.println(service.isShutdown());
        System.out.println(service);

        // 在休眠会查看
        TimeUnit.SECONDS.sleep(5);
        // true 所有线程已终结
        System.out.println(service.isTerminated());
        // true 所有线程已关闭
        System.out.println(service.isShutdown());
        System.out.println(service);
    }

// output
java.util.concurrent.ThreadPoolExecutor@76ccd017[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
false
true
java.util.concurrent.ThreadPoolExecutor@76ccd017[Shutting down, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
pool-1-thread-1
pool-1-thread-4
pool-1-thread-3
pool-1-thread-2
pool-1-thread-5
pool-1-thread-1
true
true
java.util.concurrent.ThreadPoolExecutor@76ccd017[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 6]

5. Futrue 异步任务

Future是异步作业,它表示提交作业未来的一个计算结果,在使用get方法获取结果时如果call()未返回结果则会阻塞当前线程。

FutrueTask是Futrue的实现类,可以接受Runnable、Callable实现类为一个执行作业。

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        
        FutureTask<Integer> task = new FutureTask<>(()->{
            TimeUnit.MILLISECONDS.sleep(500);
            return 1000;
        }); //new Callable () { Integer call();}
        
        new Thread(task).start();
        
        System.out.println(task.get()); //阻塞,知道call返回结果
    }
5.1 CompletableFuture

CompletableFuture是一个处理异步任务的线程工具,使用它可以对异步任务进行控制或者继续进行之后的操作(这点类似Stream)。

下面是个使用示例。

/**
 * 假设能够提供一个服务
 * 这个服务查询各大电商网站同一类产品的价格并汇总展示
 * 可以使用单线程一个一个查,也可以使用异步并行方式查
 */

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class TestCompletableFuture {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        long start, end;
        // 方案1,单线程计算
        /*start = System.currentTimeMillis();

        priceOfTM();
        priceOfTB();
        priceOfJD();

        end = System.currentTimeMillis();
        System.out.println("use serial method call! " + (end - start));*/

        // ----------------------------------------------------------------------------------

        // 方案2,异步并行计算
        start = System.currentTimeMillis();

        // 提交异步任务
        CompletableFuture<Double> futureTM = CompletableFuture.supplyAsync(()->priceOfTM());
        CompletableFuture<Double> futureTB = CompletableFuture.supplyAsync(()->priceOfTB());
        CompletableFuture<Double> futureJD = CompletableFuture.supplyAsync(()->priceOfJD());

        // 等待作业完成
        CompletableFuture.allOf(futureTM, futureTB, futureJD).join();

        // CompletableFuture 还可以链式进行其他操作
        /*CompletableFuture.supplyAsync(()->priceOfTM())
                .thenApply(String::valueOf)
                .thenApply(str-> "price " + str)
                .thenAccept(System.out::println);*/

        end = System.currentTimeMillis();
        System.out.println("use completable future! " + (end - start));

        // 阻塞等待结果
        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static double priceOfTM() {
        delay();
        return 1.00;
    }

    private static double priceOfTB() {
        delay();
        return 2.00;
    }

    private static double priceOfJD() {
        delay();
        return 3.00;
    }

    // 延迟工具类
    private static void delay() {
//        int time = new Random().nextInt(500);
        int time = 150;
        try {
            TimeUnit.MILLISECONDS.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.printf("After %s sleep!\n", time);
    }
}

二、Executors

Executors是线程工具类,提供了一套开箱即用的线程池,只需要简单设置参数即可使用。

1. SingleThreadExecutor

SingleThreadExecutor 单线程线程池, 只会有一个线程,少于一个线程会创建一个线程。

使用

    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();
        for(int i=0; i<5; i++) {
            final int j = i;
            service.execute(()->{
                System.out.println(j + " " + Thread.currentThread().getName());
            });
        }
    }

源码

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
	    // 核心线程数1,最大线程数1
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

2. 缓存线程池

使用

	public static void main(String[] args) throws InterruptedException {
		ExecutorService service = Executors.newCachedThreadPool();
		// 刚初始化,线程数0
		System.out.println(service);
		
		for (int i = 0; i < 2; i++) {
			service.execute(() -> {
				try {
					TimeUnit.MILLISECONDS.sleep(500);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(Thread.currentThread().getName());
			});
		}
		//size=2 线程数2,active threads=2,活跃线程数2
		System.out.println(service);
		
		//60秒内无作业处理线程就会被关闭
		TimeUnit.SECONDS.sleep(80);
		
		//size=0 线程数0,active threads=0 活跃线程0
		System.out.println(service);
		
	}

源码

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      //活跃时间60秒
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

2. FixedThreadPool

固定线程池一直保持固定个线程活跃。

使用

public class T09_FixedThreadPool {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 单线程处理判断20W个数是否为质数
	    // 测试1,单线程
	    long start = System.currentTimeMillis();
		getPrime(1, 200000); 
		long end = System.currentTimeMillis();
		System.out.println("单线程耗时:"+(end - start));

		// 测试2,多线程,线程池,4个线程
		final int cpuCoreNum = 4;

		ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);

		// 20W个数分四个作业
		MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20
		MyTask t2 = new MyTask(80001, 130000);
		MyTask t3 = new MyTask(130001, 170000);
		MyTask t4 = new MyTask(170001, 200000);

		// 计时
		start = System.currentTimeMillis();

		// 提交作业
		Future<List<Integer>> f1 = service.submit(t1);
		Future<List<Integer>> f2 = service.submit(t2);
		Future<List<Integer>> f3 = service.submit(t3);
		Future<List<Integer>> f4 = service.submit(t4);

		// 阻塞执行完成
		f1.get();
		f2.get();
		f3.get();
		f4.get();
		end = System.currentTimeMillis();
		System.out.println("线程池耗时:"+(end - start));
	}
	
	static class MyTask implements Callable<List<Integer>> {
		int startPos, endPos;
		
		MyTask(int s, int e) {
			this.startPos = s;
			this.endPos = e;
		}
		
		@Override
		public List<Integer> call() throws Exception {
			List<Integer> r = getPrime(startPos, endPos);
			return r;
		}
		
	}
	
	static boolean isPrime(int num) {
		for(int i=2; i<=num/2; i++) {
			if(num % i == 0) return false;
		}
		return true;
	}
	
	static List<Integer> getPrime(int start, int end) {
		List<Integer> results = new ArrayList<>();
		for(int i=start; i<=end; i++) {
			if(isPrime(i)) results.add(i);
		}
		
		return results;
	}
}

源码

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

4. ScheduledThreadPoolExecutor

定时任务线程池,指定固定个线程,延迟或重复执行作业。

使用

    // 设置一个定时任务,每2秒执行一次作业处理
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
        service.scheduleAtFixedRate(() -> {
            int s = new Random().nextInt(1000);
            try {
                TimeUnit.MILLISECONDS.sleep(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 耗时 " + s + " ms 处理了作业");
        }, 0, 2, TimeUnit.SECONDS);
    }

5. WorkStealingPool

newWorkStealingPool,这个是 JDK1.8 版本加入的一种线程池,stealing 翻译为抢断、窃取的意思。

特点:

  • 抢占工作
  • 作业无序执行

源码

    // 使用的是WorkJoinPool,与上边的几个不同
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
5.1 ForkJoinPool

https://zhuanlan.zhihu.com/p/90958193

三、ThreadPoolExecutor源码分析

1. 常用变量的解释

// 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// 4. 线程池有5种状态,按大小排序如下:RUNNING < SHUTDOWN < STOP < TIDYING < TERMINATED
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 5. `runStateOf()`,获取线程池状态,通过按位与操作,低29位将全部变成0
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 6. `workerCountOf()`,获取线程池worker数量,通过按位与操作,高3位将全部变成0
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 7. `ctlOf()`,根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }

/*
 * Bit field accessors that don't require unpacking ctl.
 * These depend on the bit layout and on workerCount being never negative.
 */
// 8. `runStateLessThan()`,线程池状态小于xx
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
// 9. `runStateAtLeast()`,线程池状态大于等于xx
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

2. 构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 基本类型参数校验
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    // 空指针校验
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    // 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime `中
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

3. 提交执行task的过程

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    // worker数量比核心线程数小,直接创建worker执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // worker数量超过核心线程数,任务直接进入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。
        // 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。
    // 这儿有3点需要注意:
    // 1. 线程池不是运行状态时,addWorker内部会判断线程池状态
    // 2. addWorker第2个参数表示是否创建核心线程
    // 3. addWorker返回false,则说明任务执行失败,需要执行reject操作
    else if (!addWorker(command, false))
        reject(command);
}

4. addworker源码解析

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 外层自旋
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价
        // (rs > SHUTDOWN) || 
        // (rs == SHUTDOWN && firstTask != null) || 
        // (rs == SHUTDOWN && workQueue.isEmpty())
        // 1. 线程池状态大于SHUTDOWN时,直接返回false
        // 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false
        // 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        // 内层自旋
        for (;;) {
            int wc = workerCountOf(c);
            // worker数量超过容量,直接返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 使用CAS的方式增加worker数量。
            // 若增加成功,则直接跳出外层循环进入到第二部分
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            // 线程池状态发生变化,对外层循环进行自旋
            if (runStateOf(c) != rs)
                continue retry;
            // 其他情况,直接内层循环进行自旋即可
            // else CAS failed due to workerCount change; retry inner loop
        } 
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // worker的添加必须是串行的,因此需要加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                // 这儿需要重新检查线程池状态
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // worker已经调用过了start()方法,则不再创建worker
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // worker创建并添加到workers成功
                    workers.add(w);
                    // 更新`largestPoolSize`变量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 启动worker线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

5. 线程池worker任务单元

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        // 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前worker
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // 省略代码...
}

6. 核心线程执行逻辑-runworker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 调用unlock()是为了让外部可以中断
    w.unlock(); // allow interrupts
    // 这个变量用于判断是否进入过自旋(while循环)
    boolean completedAbruptly = true;
    try {
        // 这儿是自旋
        // 1. 如果firstTask不为null,则执行firstTask;
        // 2. 如果firstTask为null,则调用getTask()从队列获取任务。
        // 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待
        while (task != null || (task = getTask()) != null) {
            // 这儿对worker进行加锁,是为了达到下面的目的
            // 1. 降低锁范围,提升性能
            // 2. 保证每个worker执行的任务是串行的
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果线程池正在停止,则对当前线程进行中断操作
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            // 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。
            // 这两个方法在当前类里面为空实现。
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                // 帮助gc
                task = null;
                // 已完成任务数加一 
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 自旋操作被退出,说明线程池正在结束
        processWorkerExit(w, completedAbruptly);
    }
}

(完)

0

评论区