线程池深入原理
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。----摘自维基百科
我们在Android或者Java开发中,日常所使用的就是ThreadPoolExecutor
了,我们先来看下如何使用一个线程池来代替多线程开发。
使用线程池
1```kotlin
2// 创建一个核心线程数为5,最大线程数为10,空闲线程存活时间为60s的线程池对象
3val threadPoolExecutor = ThreadPoolExecutor(
4 5, 10, 60,
5 TimeUnit.MINUTES,
6 ArrayBlockingQueue<Runnable>(100),
7 RejectedExecutionHandler { _, _ -> println("reject submit thread to thread pool") }
8)
9// 测试
10for (i in 1..10) {
11 threadPoolExecutor.execute { println("execute thread is:${Thread.currentThread().name}") }
12}
13// 结果
14// execute thread is:pool-1-thread-1
15// execute thread is:pool-1-thread-1
16// execute thread is:pool-1-thread-1
17// execute thread is:pool-1-thread-1
18// execute thread is:pool-1-thread-5
19// execute thread is:pool-1-thread-5
20// execute thread is:pool-1-thread-4
21// execute thread is:pool-1-thread-3
22// execute thread is:pool-1-thread-2
23// execute thread is:pool-1-thread-1
24```
从结果就可以看出来,执行时间操作,但是只创建了5个线程,另外5次都是复用线程的。这样就达到了复用存在的线程、减少对象的创建和销毁的额外开销;并且可以控制最大线程数,也就是控制了最大并发数。
知道如何使用一个线程池还不够,我们需要看看ThreadPoolExecutor
是如何创建、复用这些线程的。下面我们看看创建ThreadPoolExecutor
对象的几个参数:
构造方法
1```java
2/**
3 * 创建一个ThreadPoolExecutor对象
4 *
5 * @param corePoolSize 核心线程数,这些线程会一直在线程池中,除非设置了 allowCoreThreadTimeOut
6 * @param maximumPoolSize 最大线程数,运行线程创建的最大值
7 * @param keepAliveTime 当线程数>核心线程数的时候,这个值就是空闲且非核心线程存活的时间
8 * @param unit keepAliveTime的单位
9 * @param workQueue 保存task的队列,直到执行execute()方法执行
10 * @param threadFactory ThreadFactory是一个接口,里面只有Thread newThread(Runnable r)方法,用来创建线程,
11 * 默认采用Executors.defaultThreadFactory()
12 * @param handler 拒绝处理任务时的策略,如果线程池满了且所有线程都不处于空闲状态,
13 * 通过RejectedExecutionHandler接口的rejectedExecution(Runnable r, ThreadPoolExecutor executor)来处理传进来的Runnable
14 * 系统提供了四种:CallerRunsPolicy(), AbortPolicy(), DiscardPolicy(), DiscardOldestPolicy()
15 * 默认采用new AbortPolicy()
16 */
17 public ThreadPoolExecutor(int corePoolSize,
18 int maximumPoolSize,
19 long keepAliveTime,
20 TimeUnit unit,
21 BlockingQueue<Runnable> workQueue,
22 ThreadFactory threadFactory,
23 RejectedExecutionHandler handler){
24 if (corePoolSize < 0 ||
25 maximumPoolSize <= 0 ||
26 maximumPoolSize < corePoolSize ||
27 keepAliveTime < 0)
28 throw new IllegalArgumentException();
29 if (workQueue == null || threadFactory == null || handler == null)
30 throw new NullPointerException();
31 this.acc = System.getSecurityManager() == null ?
32 null :
33 AccessController.getContext();
34 this.corePoolSize = corePoolSize;
35 this.maximumPoolSize = maximumPoolSize;
36 this.workQueue = workQueue;
37 this.keepAliveTime = unit.toNanos(keepAliveTime);
38 this.threadFactory = threadFactory;
39 this.handler = handler;
40 }
41```
我在方法头注释中我都一一解释了几个参数的作用,还有几点需要注意的就是:
-
核心线程数不能小于0;
-
最大线程数不能小于0;
-
最大线程数不能小于核心线程数;
-
空闲线程的存活时间不能小于0;
通过上面的解释我们很明白的知道前面几个参数的作用,但是最后两个参数我们并不能通过表面的解释通晓它,既然不能通过表象看懂他俩,那就看看默认的实现是如何做的,这样在接下来的源码分析中很有帮助。
ThreadFactory:线程工厂
ThreadFactory
是一个接口,里面只由唯一的 Thread newThread(Runnable r);
方法,此方法是用来创建线程的,从接口中我们得到的就只有这么多,下面我们看看 Executors
默认的 DefaultThreadFactory
类:
1```java
2// 静态内部类
3static class DefaultThreadFactory implements ThreadFactory {
4 // 线程池的标识,从1开始没创建一个线程池+1
5 private static final AtomicInteger poolNumber = new AtomicInteger(1);
6 // 线程组
7 private final ThreadGroup group;
8 // 线程名中的结尾标识,从1开始每创建一个线程+1
9 private final AtomicInteger threadNumber = new AtomicInteger(1);
10 // 线程名
11 private final String namePrefix;
12
13 DefaultThreadFactory() {
14 SecurityManager s = System.getSecurityManager();
15 group = (s != null) ? s.getThreadGroup() :
16 Thread.currentThread().getThreadGroup();
17 namePrefix = "pool-" +
18 poolNumber.getAndIncrement() +
19 "-thread-";
20 }
21
22 public Thread newThread(Runnable r) {
23 Thread t = new Thread(group, r,
24 namePrefix + threadNumber.getAndIncrement(),
25 0);
26 if (t.isDaemon())
27 t.setDaemon(false);
28 if (t.getPriority() != Thread.NORM_PRIORITY)
29 t.setPriority(Thread.NORM_PRIORITY);
30 return t;
31 }
32}
33```
RejectedExecutionHandler:拒绝处理任务的策略
RejectedExecutionHandler
也是一个接口,并且也只提供了唯一的 void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
方法。我们可以自定义策略,也可以用上面提到的封装好的四种策略,先看一下四种策略分别怎么拒绝任务的:
-
CallerRunsPolicy
1public static class CallerRunsPolicy implements RejectedExecutionHandler {
2 /**
3 * Creates a {@code CallerRunsPolicy}.
4 */
5 public CallerRunsPolicy() {
6 }
7 /**
8 * 如果线程池还没关闭,那么就再次执行这个Runnable
9 */
10 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
11 if (!e.isShutdown()) {
12 r.run();
13 }
14 }
15}
-
AbortPolicy
1public static class AbortPolicy implements RejectedExecutionHandler {
2 /**
3 * Creates an {@code AbortPolicy}.
4 */
5 public AbortPolicy() {
6 }
7 /**
8 * 这个策略就是抛出异常,不做其他处理
9 */
10 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
11 throw new RejectedExecutionException("Task " + r.toString() +
12 " rejected from " +
13 e.toString());
14 }
15}
-
DiscardPolicy
1public static class DiscardPolicy implements RejectedExecutionHandler {
2 /**
3 * Creates a {@code DiscardPolicy}.
4 */
5 public DiscardPolicy() {
6 }
7 /**
8 * 什么也不做,也就是抛弃了这个Runnable
9 */
10 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
11 }
12}
-
DiscardOldestPolicy
:
1public static class DiscardOldestPolicy implements RejectedExecutionHandler {
2 /**
3 * Creates a {@code DiscardOldestPolicy} for the given executor.
4 */
5 public DiscardOldestPolicy() {
6 }
7 /**
8 * 1. 线程池未关闭
9 * 2. 获取队列中的下一个Runnable
10 * 3. 获取到了,但是不对它进行处理,也就是抛弃它
11 * 4. 执行我们传过来的这个Runnable
12 */
13 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
14 if (!e.isShutdown()) {
15 e.getQueue().poll();
16 e.execute(r);
17 }
18 }
19}
重要的参数
除了上述构造方法中的几个参数外,线程池还有几个比较核心的参数,如下:
1```java
2public class ThreadPoolExecutor extends AbstractExecutorService {
3
4 // ctl 的低29位表示线程池中的线程数,高3位表示当前线程状态
5 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
6 // 29
7 private static final int COUNT_BITS = Integer.SIZE - 3;
8 // (2^29) -1
9 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
10
11 // 运行状态:接受新任务并处理排队的任务
12 private static final int RUNNING = -1 << COUNT_BITS;
13 // 关闭状态:不接受新任务,但处理排队的任务
14 private static final int SHUTDOWN = 0 << COUNT_BITS;
15 // 停止状态:不接受新任务,不处理排队的任务,中断正在进行的任务
16 private static final int STOP = 1 << COUNT_BITS;
17 // 整理状态:整理状态,所有任务已终止,workerCount为零,线程将运行terminate()方法
18 private static final int TIDYING = 2 << COUNT_BITS;
19 // 终止状态:terminate()方法执行完成
20 private static final int TERMINATED = 3 << COUNT_BITS;
21
22 // 表示线程是否允许或停止
23 private static int runStateOf(int c) { return c & ~CAPACITY; }
24 // 线程的有效数量
25 private static int workerCountOf(int c) { return c & CAPACITY; }
26 private static int ctlOf(int rs, int wc) { return rs | wc; }
27
28 ......后面的源码暂时省略
29}
30```
execute:执行
1```java
2public void execute(Runnable command) {
3 if (command == null)
4 throw new NullPointerException();
5 int c = ctl.get();
6 // 如果运行中的线程数小于核心线程数,执行addWorker(command, true)创建新的核心Thread执行任务
7 if (workerCountOf(c) < corePoolSize) {
8 if (addWorker(command, true))
9 return;
10 c = ctl.get();
11 }
12 // 1. 已经满足:运行中的线程数大于核心线程数,但是小于最大线程数
13 // 2. 需要满足:线程池在运行状态
14 // 3. 需要满足:添加到工作队列中成功
15 if (isRunning(c) && workQueue.offer(command)) {
16 int recheck = ctl.get();
17 // 如果线程不在运行状态,就从工作队列中移除command
18 // 并且执行拒绝策略
19 if (!isRunning(recheck) && remove(command))
20 reject(command);
21 // 线程池处于运行状态,但是没有线程,则addWorker(null, false)
22 // 至于这里为什么要传入一个null,因为在最外层的if条件中我们已经将Runnable添加到工作队列中了
23 // 而且在runWorker()源码中也可以得到答案,如果传入的Runnable为空,就会去工作队列中取task。
24 else if (workerCountOf(recheck) == 0)
25 addWorker(null, false);
26 }
27 // 执行addWorker()创建新的非核心线程Thread执行任务
28 // addWorker() 失败,执行拒绝策略
29 else if (!addWorker(command, false))
30 reject(command);
31}
32```
从上面源码中可以看出,execute()
一个新的任务,主要有以下这几种情况:
-
核心线程未满,直接新建核心线程并执行任务;
-
核心线程满了,工作队列未满,将任务添加到工作队列中;
-
核心线程和工作队列都满,但是最大线程数未达到,新建线程并执行任务;
-
上面条件都不满足,那么就执行拒绝策略。
更形象的可以看下方流程图:

addWorker(Runnable , boolean):添加Worker
1```java
2private boolean addWorker(Runnable firstTask, boolean core) {
3 // 标记外循环,比如在内循环中break retry就直接跳出外循环
4 retry:
5 for (; ; ) {
6 int c = ctl.get();
7 int rs = runStateOf(c);
8
9 // 直接返回false有以下3种情况:
10 // 1. 线程池状态为STOP、TIDYING、TERMINATED
11 // 2. 线程池状态不是running状态,并且firstTask不为空
12 // 3. 线程池状态不是running状态,并且工作队列为空
13 if (rs >= SHUTDOWN &&
14 !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
15 return false;
16
17 for (; ; ) {
18 int wc = workerCountOf(c);
19 // 如果添加的是核心线程,但是运行的线程数大于等于核心线程数,那么就不添加了,直接返回
20 // 如果添加的是非核心线程,但是运行的线程数大于等于最大线程数,那么也不添加,直接返回
21 if (wc >= CAPACITY ||
22 wc >= (core ? corePoolSize : maximumPoolSize))
23 return false;
24 // 增加workerCount的值 +1
25 if (compareAndIncrementWorkerCount(c))
26 // 跳出外循环
27 break retry;
28 c = ctl.get(); // 重新检查线程池状态
29 if (runStateOf(c) != rs)
30 continue retry;
31 // 重新检查的状态和之前不合,再次从外循环进入
32 }
33 }
34
35 boolean workerStarted = false;
36 boolean workerAdded = false;
37 Worker w = null;
38 try {
39 w = new Worker(firstTask);
40 final Thread t = w.thread;
41 if (t != null) {
42 // 线程池重入锁
43 final ReentrantLock mainLock = this.mainLock;
44 // 获得锁
45 mainLock.lock();
46 try {
47 // Recheck while holding lock.
48 // Back out on ThreadFactory failure or if
49 // shut down before lock acquired.
50 int rs = runStateOf(ctl.get());
51 // 线程池在运行状态或者是线程池关闭同时Runnable也为空
52 if (rs < SHUTDOWN ||
53 (rs == SHUTDOWN && firstTask == null)) {
54 if (t.isAlive()) // precheck that t is startable
55 throw new IllegalThreadStateException();
56 // 想Worker中添加新的Worker
57 workers.add(w);
58 int s = workers.size();
59 if (s > largestPoolSize)
60 largestPoolSize = s;
61 workerAdded = true;
62 }
63 } finally {
64 // 释放锁
65 mainLock.unlock();
66 }
67 // 如果添加成功,启动线程
68 if (workerAdded) {
69 t.start();
70 workerStarted = true;
71 }
72 }
73 } finally {
74 if (!workerStarted)
75 addWorkerFailed(w);
76 }
77 return workerStarted;
78}
79```
addWorker()
主要就是在满足种种条件(上述源码中解释了)后,新建一个Worker
对象,并添加到HashSet<Worker> workers
中去,最后调用新建Worker
对象的Thread
变量的start()
方法。
Worker类
Worker
是一个继承了AQS
并实现了Runnable
的内部类,我们重点看看它的run()
方法,因为上面addWorker()
中,t.start()
触发的就是它的run()
方法:
1```java
2private final class Worker
3 extends AbstractQueuedSynchronizer
4 implements Runnable {
5 /**
6 * This class will never be serialized, but we provide a
7 * serialVersionUID to suppress a javac warning.
8 */
9 private static final long serialVersionUID = 6138294804551838833L;
10
11 /**
12 * Thread this worker is running in. Null if factory fails.
13 */
14 final Thread thread;
15 /**
16 * Initial task to run. Possibly null.
17 */
18 Runnable firstTask;
19 /**
20 * Per-thread task counter
21 */
22 volatile long completedTasks;
23
24 /**
25 * Creates with given first task and thread from ThreadFactory.
26 *
27 * @param firstTask the first task (null if none)
28 */
29 Worker(Runnable firstTask) {
30 setState(-1); // inhibit interrupts until runWorker
31 this.firstTask = firstTask;
32 // 这边是把Runnable传给了Thread,也就是说Thread.run()就是执行了下面的run()方法
33 this.thread = getThreadFactory().newThread(this);
34 }
35
36 /**
37 * Delegates main run loop to outer runWorker
38 */
39 public void run() {
40 runWorker(this);
41 }
42}
43```
run()
方法实际调用了runWorker(Worker)
方法
runWorker(Worker)方法:
1```java
2 final void runWorker(Worker w) {
3 Thread wt = Thread.currentThread();
4 Runnable task = w.firstTask;
5 w.firstTask = null;
6 w.unlock(); // 释放锁,允许中断
7 boolean completedAbruptly = true;
8 try {
9 // 1. worker中的task不为空
10 // 2. 如果worker的task为空,那么取WorkerQueue的task
11 while (task != null || (task = getTask()) != null) {
12 w.lock();
13 // If pool is stopping, ensure thread is interrupted;
14 // if not, ensure thread is not interrupted. This
15 // requires a recheck in second case to deal with
16 // shutdownNow race while clearing interrupt
17 if ((runStateAtLeast(ctl.get(), STOP) ||
18 (Thread.interrupted() &&
19 runStateAtLeast(ctl.get(), STOP))) &&
20 !wt.isInterrupted())
21 wt.interrupt();
22 try {
23 // 这是一个空方法,可由子类实现
24 beforeExecute(wt, task);
25 Throwable thrown = null;
26 try {
27 // 执行task
28 task.run();
29 }
30 .... 省略
31 // 这是一个空方法,可由子类实现
32 finally {
33 afterExecute(task, thrown);
34 }
35 } finally {
36 task = null;
37 w.completedTasks++;
38 w.unlock();
39 }
40 }
41 completedAbruptly = false;
42 } finally {
43 processWorkerExit(w, completedAbruptly);
44 }
45 }
46```
getTask():
1```java
2private Runnable getTask() {
3 // 进入死循环
4 for (; ; ) {
5 try {
6 // 为true的条件:
7 // allowCoreThreadTimeOut=true: 核心线程需根据keepAliveTime超时等待
8 // 核心线程数已满
9 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
10 // 如果timed为true,执行BlockQueue.poll(),这个操作在取不到task的时候会等待keepAliveTime,然后返回null
11 // 如果timed为false,执行BlockQueue.take(),这个操作在队列为空的时候一直阻塞
12 Runnable r = timed ?
13 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
14 workQueue.take();
15 if (r != null)
16 return r;
17 }
18 }
19}
20```
线程池的源码按照上述的几个方法(execute(runnable)
-> addWorker(runnable,core)
-> Worker
-> runWorker(worker)
-> getTask()
)的顺序来分析,你就可以很清晰的将运作过程了解清楚,同事构造方法和几个重要的参数一定要懂,不然对于后面的源码分析很受阻碍,相信大家通过这篇文章可以加深对线程池的理解。
源码分析的文章还在不断的更新,如果本文章你发现有不正确或者不足之处,欢迎你在下方留言或者扫描下方的二维码留言也可!
长按识别图中二维码
原文始发于微信公众号(Taonce):线程池深入原理