概述
ThreadPoolExecutor 是 JDK 中线程池的实现类,它的继承结构如下:
本文主要分析 ThreadPoolExecutor 类的主要方法和实现原理(部分代码暂未涉及,后面有机会再行分析),以后再分析 Executor 和 ExecutorService 接口的相关内容。
代码分析
成员变量
该类中的成员变量较多,下面分析一些主要的。
// 该变量是一个原子整型变量,保存了线程池的状态和线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3=29// 线程的最大容量(即池内允许的最大线程数)// 00011111 11111111 11111111 11111111,即 29 个 1,超过 5 亿private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 2^29-1
// runState is stored in the high-order bits// 线程池的运行状态,保存在 ctl 的高位private static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;
这里用了一个原子整型(AtomicInteger,可以理解为线程安全的 Integer 类,占用 4 个字节,32 位)变量 ctl 来表示线程池的运行状态和线程池内部的线程数量。其中高 3 位表示线程池的运行状态,低 29 位表示线程池中线程的数量。
线程池的状态有以下 5 种:
1. RUNNING: 接受新的任务,并且处理任务队列中的任务;
2. SHUTDOWN: 不接受新的任务,但处理任务队列中的任务;
3. STOP: 不接受新的任务,不处理任务队列中的任务,并且中断正在进行的任务;
4. TIDYING: 所有的任务都已终结,工作线程的数量为 0;
5. TERMINATED: 执行 terminated() 方法后进入该状态,terminated() 方法默认实现为空。
这些状态之间的转换流程及触发条件如图所示:
接下来看其他成员变量:
// 任务队列(阻塞队列)private final BlockingQueue<Runnable> workQueue;// 互斥锁private final ReentrantLock mainLock = new ReentrantLock();// 工作线程集合private final HashSet<Worker> workers = new HashSet<Worker>();// 锁对应的条件private final Condition termination = mainLock.newCondition();// 线程池创建过的最大线程数量private int largestPoolSize;// 已完成任务的数量private long completedTaskCount;// 线程工厂类,用于创建线程private volatile ThreadFactory threadFactory;// 拒绝策略private volatile RejectedExecutionHandler handler;// 空闲线程的存活时间private volatile long keepAliveTime;/* * 核心线程是否允许超时 * 默认为 false,表示核心线程即使处于空闲状态也继续存活; * 若为 true,核心线程同样受到 keepAliveTime 的超时约束 */private volatile boolean allowCoreThreadTimeOut;// 核心池大小private volatile int corePoolSize;// 最大池大小private volatile int maximumPoolSize;// 默认拒绝策略private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
这里有几个重要的成员变量:
corePoolSize: 核心池大小;
maximumPoolSize: 最大池大小,线程池中能同时存在的最大线程数,大于等于 corePoolSize;
workQueue: 工作/任务队列,是一个阻塞队列,可参考前文「JDK源码分析-BlockingQueue」的分析。
为便于理解,这里先大概描述下向线程池提交任务的流程,后面再分析其代码实现:
① 初始化一个容量为 corePoolSize 的池子;
② 刚开始,每来一个任务就在池中创建一个线程去执行该任务,直到池中的容量到达 corePoolSize;
③ 此时若再来任务,则把这些任务放到 workQueue 中;
④ 若 workQueue 也满了,则继续创建线程执行任务,直到线程数量达到 maximumPoolSize;
⑤ 若 workQueue 已满,且线程数量达到 maximumPoolSize,此时若还有任务到来,则执行拒绝策略(handler)。
keepAliveTime & allowCoreThreadTimeOut
其中 keepAliveTime 表示空闲线程的存活时间,这两个值有一定关联:
若 allowCoreThreadTimeOut 为 false (默认),且线程数量超出 corePoolSize,则空闲时间超过 keepAliveTime 的线程会被关闭(最多保留 corePoolSize 个线程存活);
若将 allowCoreThreadTimeOut 设为 true,核心池的线程也会受该超时的影响而关闭。
构造器
ThreadPoolExecutor 内部有多个构造器,但最终都是调用下面这个:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}
构造器参数虽然比较多,但基本都是简单的赋值,前面已经分析过这些成员变量的含义,这里不再赘述。下面分析它的核心方法 execute。
在此之前,先看几个常用方法:
// Packing and unpacking ctl// 根据 ctl 和 CAPACITY 得到线程池的运行状态private static int runStateOf(int c) { return c & ~CAPACITY; }// 根据 ctl 和 CAPACITY 得到线程池中的线程数量private static int workerCountOf(int c) { return c & CAPACITY; }// 将线程池运行状态和线程数量合并为 ctlprivate static int ctlOf(int rs, int wc) { return rs | wc; }
execute 方法代码如下:
// command 是一个 Runnable 对象,也就是用户提交执行的任务public void execute(Runnable command) { // 提交的任务为空时抛出异常 if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ // 获取当前 ctl (存有线程池状态和线程数量) int c = ctl.get(); // 若当前工作线程数量小于核心池大小(coolPoolSize) // 则在核心池中新增一个工作线程,并将该任务交给这个线程执行 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 重新获取(存在并发可能) c = ctl.get(); } // 若执行到这里,表示池中线程数量 >= corePoolSize,或者上面 addWorker 失败 // 若线程池处于 RUNNING 状态,并且该任务(command)成功添加到任务队列 if (isRunning(c) && workQueue.offer(command)) { // 再次获取 ctl 值 int recheck = ctl.get(); // 若线程池不是运行状态,则要把上面添加的任务从队列中移除并执行拒绝策略 //(可理解为“回滚”操作) if (! isRunning(recheck) && remove(command)) // 执行拒绝策略 reject(command); // 若此时池中没有线程,则新建一个 // PS: 这里是防止任务提交后,池中没有存活的线程了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 根据上述代码分析,若执行到这里,可分为以下两种情况: // ① 线程池不是 RUNNING 状态; // ② 线程池处于 RUNNING 状态,且实际线程数量 workCount >= corePoolSize, // 并且,添加到 workQueue 失败(已满) // 此时,则需要和 maximumPoolSize 进行比较, // 若 workCount <= maximumPoolSize, 则新建一个线程去执行该任务; // 否则,即 workCount > maximumPoolSize (饱和),则执行拒绝策略 else if (!addWorker(command, false)) // 执行拒绝策略 reject(command);}
该方法描述的就是一个任务提交到线程池的流程,主要执行逻辑如下:
1. 若正在运行的线程数少于 corePoolSize,则创建一个新的线程,并将传入的任务(command)作为它的第一个任务执行。
2. 若运行的线程数不小于 corePoolSize,则将新来的任务添加到任务队列(workQueue)。若入队成功,仍需再次检查是否需要增加一个线程(上次检查之后现有的线程可能死了,或者进入该方法时线程池 SHUTDOWN 了,此时需要执行回滚);若池中没有线程则新建一个(确保 SHUTDOWN 状态也能执行队列中的任务)。
3. 若任务不能入队(队列已满),则创建新的线程并执行任务,若失败(超过 maximumPoolSize),则表示线程池关闭或者已经饱和,因此拒绝该任务。
为了便于理解,可参考下面的流程图:
下面分析 Worker 类及 addWorker 方法。
内部嵌套类 Worker
// 继承自 AQS,且实现了 Runnable 接口private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ // 运行的第一个任务,可能为空 Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 初始化 thread this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // 其他一些 AQS 相关的方法不再一一列举}
可以看到 Worker 类继承自 AQS,它的实现与 ReentrantLock 有一些类似,可对比前文「JDK源码分析-ReentrantLock」分析。而且,Worker 类实现了 Runnable 接口,它的 run 方法是将自身作为参数传递给了外部类的 runWorker 方法,下面分析这两个方法。
addWorker 方法
// firstTask: 第一个任务,可为空// core: 是否为核心池,true 是,false 为最大池private boolean addWorker(Runnable firstTask, boolean core) { // 该循环的主要作用就是增加 workCount 计数,增加成功后再新增 Worker 对象 retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); /* * rs >= SHUTDOWN 表示线程池不再接受新的任务 * 该判断条件分为以下三种: * ① 线程池处于 STOP, TYDING 或 TERMINATED 状态; * ② 线程池处于 SHUTDOWN 状态,且 firstTask 不为空; * ③ 线程池处于 SHUTDOWN 状态,且 workQueue 为空 * 满足任一条件即返回 false. */ // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 超出最大容量 CAPACITY,或者超出初始设置的核心池/最大池数量,则返回 false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // CAS 方式增加 ctl 的 workerCount 数量(该循环的主要目的) if (compareAndIncrementWorkerCount(c)) break retry; // 若增加失败则退出循环 c = ctl.get(); // Re-read ctl // 运行状态改变 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 标记 Worker 是否启动、是否添加成功 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 将 firstTask 封装成 Worker 对象 w = new Worker(firstTask); // 获取 thread 对象 t final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); // 若线程池状态小于 SHUTDOWN,即为 RUNNING 状态; // 或者为 SHUTDOWN 状态,且 firstTask 为空, // 表示不再接受新的任务,但会继续执行队列中的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 添加到工作线程集合(HashSet) workers.add(w); // 更新最大计数 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 标记 Worker 添加成功 workerAdded = true; } } finally { mainLock.unlock(); } // 若成功添加到工作线程集合,则启动线程执行任务 if (workerAdded) { // 启动线程 t.start(); workerStarted = true; } } } finally { // Worker 启动失败,执行回滚操作 if (! workerStarted) addWorkerFailed(w); } return workerStarted;}
runWorker 方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // task 不为空时才执行,循环执行 // getTask 是从 workQueue 中获取任务 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 若线程池状态 >= STOP,则需要中断该线程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 中断线程 wt.interrupt(); try { // 任务执行前调用该方法 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任务执行后调用该方法 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // getTask 返回空,说明任务队列没有任务了 processWorkerExit(w, completedAbruptly); }}
可以看到这里有 beforeExecute 和 afterExecute 方法,分别表示提交的任务执行前后做的事情,在 ThreadPoolExecutor 类中这两个都是空方法。我们可以通过继承 ThreadPoolExecutor 类并重写这两个方法来定制自己的需求。
getTask 方法:
// 从任务队列(阻塞队列)中取任务private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out?
for (;;) { // 获取线程池运行状态 int c = ctl.get(); int rs = runStateOf(c);
// Check if queue empty only if necessary. /* * 线程池运行状态 rs >= SHUTDOWN,表示非 RUNNING 状态 * 该判断条件有两个: * 1. rs >= STOP; * 2. rs == SHUTDOWN,且工作队列为空 * 若满足上述条件中的一个,则将线程数量(workerCount)减少 1,返回 null */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); // 减少工作现场数量 return null; // 返回 null 表示会从池中移除一个 Worker } int wc = workerCountOf(c); // Are workers subject to culling? // 是否要移除 Worker boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 线程数大于 maximumPoolSize,或者需要移除 Worker if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; // 返回空意味着会减少移除一个 Worker continue; } try { // 从 workQueue 中获取任务(Runnable 对象) Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}
该方法主要是从任务队列 workQueue 中获取任务,并且控制池内线的程数量。
拒绝策略
拒绝策略 RejectedExecutionHandler 是一个接口,它只有一个 rejectedExecution 方法,代码如下:
public interface RejectedExecutionHandler { // 执行拒绝策略 void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}
它在 ThreadPoolExecutor 中的几个实现类如下:
ThreadPoolExecutor 默认的拒绝策略为 AbortPolicy,代码如下:
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 抛出异常 throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }}
可以看到,该策略就是直接抛出 RejectedExecutionException 异常。其他拒绝策略代码也都相对简单,不再一一列举。值得一提的是,如果我们对这几种策略都不满意,可以自定义拒绝策略(实现 RejectedExecutionHandler 接口)。
小结
本文主要分析了线程池 ThreadPoolExecutor 类的主要成员变量和核心方法实现,主要包括一个任务(Runnable)的提交流程。
该类稍微有些复杂,分析时首先要搞清楚任务提交的流程以及主要成员变量(workQueue、corePoolSize、maximumPoolSize、keepAliveTime、allowCoreThreadTimeOut 等)的含义,接下来再分析会更清晰。