前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >JDK源码分析-ThreadPoolExecutor

JDK源码分析-ThreadPoolExecutor

作者头像
WriteOnRead
发布2019-08-30 21:03:27
3580
发布2019-08-30 21:03:27
举报
文章被收录于专栏:WriteOnReadWriteOnRead

概述

ThreadPoolExecutor 是 JDK 中线程池的实现类,它的继承结构如下:

本文主要分析 ThreadPoolExecutor 类的主要方法和实现原理(部分代码暂未涉及,后面有机会再行分析),以后再分析 Executor 和 ExecutorService 接口的相关内容。

代码分析

成员变量

该类中的成员变量较多,下面分析一些主要的。

代码语言:javascript
复制
// 该变量是一个原子整型变量,保存了线程池的状态和线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3=29// 线程的最大容量(即池内允许的最大线程数)// 00011111 11111111 11111111 11111111,即 29 个 1,超过 5 亿private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 2^29-1
// runState is stored in the high-order bits// 线程池的运行状态,保存在 ctl 的高位private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;

这里用了一个原子整型(AtomicInteger,可以理解为线程安全的 Integer 类,占用 4 个字节,32 位)变量 ctl 来表示线程池的运行状态和线程池内部的线程数量。其中高 3 位表示线程池的运行状态,低 29 位表示线程池中线程的数量。

线程池的状态有以下 5 种:

1. RUNNING: 接受新的任务,并且处理任务队列中的任务;

2. SHUTDOWN: 不接受新的任务,但处理任务队列中的任务;

3. STOP: 不接受新的任务,不处理任务队列中的任务,并且中断正在进行的任务;

4. TIDYING: 所有的任务都已终结,工作线程的数量为 0;

5. TERMINATED: 执行 terminated() 方法后进入该状态,terminated() 方法默认实现为空。

这些状态之间的转换流程及触发条件如图所示:

接下来看其他成员变量:

代码语言:javascript
复制
// 任务队列(阻塞队列)private final BlockingQueue<Runnable> workQueue;// 互斥锁private final ReentrantLock mainLock = new ReentrantLock();// 工作线程集合private final HashSet<Worker> workers = new HashSet<Worker>();// 锁对应的条件private final Condition termination = mainLock.newCondition();// 线程池创建过的最大线程数量private int largestPoolSize;// 已完成任务的数量private long completedTaskCount;// 线程工厂类,用于创建线程private volatile ThreadFactory threadFactory;// 拒绝策略private volatile RejectedExecutionHandler handler;// 空闲线程的存活时间private volatile long keepAliveTime;/* * 核心线程是否允许超时 * 默认为 false,表示核心线程即使处于空闲状态也继续存活; *   若为 true,核心线程同样受到 keepAliveTime 的超时约束 */private volatile boolean allowCoreThreadTimeOut;// 核心池大小private volatile int corePoolSize;// 最大池大小private volatile int maximumPoolSize;// 默认拒绝策略private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

这里有几个重要的成员变量:

corePoolSize: 核心池大小;

maximumPoolSize: 最大池大小,线程池中能同时存在的最大线程数,大于等于 corePoolSize;

workQueue: 工作/任务队列,是一个阻塞队列,可参考前文「JDK源码分析-BlockingQueue」的分析。

为便于理解,这里先大概描述下向线程池提交任务的流程,后面再分析其代码实现:

① 初始化一个容量为 corePoolSize 的池子;

② 刚开始,每来一个任务就在池中创建一个线程去执行该任务,直到池中的容量到达 corePoolSize;

③ 此时若再来任务,则把这些任务放到 workQueue 中;

④ 若 workQueue 也满了,则继续创建线程执行任务,直到线程数量达到 maximumPoolSize;

⑤ 若 workQueue 已满,且线程数量达到 maximumPoolSize,此时若还有任务到来,则执行拒绝策略(handler)。

keepAliveTime & allowCoreThreadTimeOut

其中 keepAliveTime 表示空闲线程的存活时间,这两个值有一定关联:

若 allowCoreThreadTimeOut 为 false (默认),且线程数量超出 corePoolSize,则空闲时间超过 keepAliveTime 的线程会被关闭(最多保留 corePoolSize 个线程存活);

若将 allowCoreThreadTimeOut 设为 true,核心池的线程也会受该超时的影响而关闭。

构造器

ThreadPoolExecutor 内部有多个构造器,但最终都是调用下面这个:

代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {    if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)        throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)        throw new NullPointerException();    this.acc = System.getSecurityManager() == null ?            null : AccessController.getContext();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;}

构造器参数虽然比较多,但基本都是简单的赋值,前面已经分析过这些成员变量的含义,这里不再赘述。下面分析它的核心方法 execute。

在此之前,先看几个常用方法:

代码语言:javascript
复制
// Packing and unpacking ctl// 根据 ctl 和 CAPACITY 得到线程池的运行状态private static int runStateOf(int c)     { return c & ~CAPACITY; }// 根据 ctl 和 CAPACITY 得到线程池中的线程数量private static int workerCountOf(int c)  { return c & CAPACITY; }// 将线程池运行状态和线程数量合并为 ctlprivate static int ctlOf(int rs, int wc) { return rs | wc; }

execute 方法代码如下:

代码语言:javascript
复制
// command 是一个 Runnable 对象,也就是用户提交执行的任务public void execute(Runnable command) {    // 提交的任务为空时抛出异常    if (command == null)        throw new NullPointerException();    /*     * Proceed in 3 steps:     *     * 1. If fewer than corePoolSize threads are running, try to     * start a new thread with the given command as its first     * task.  The call to addWorker atomically checks runState and     * workerCount, and so prevents false alarms that would add     * threads when it shouldn't, by returning false.     *     * 2. If a task can be successfully queued, then we still need     * to double-check whether we should have added a thread     * (because existing ones died since last checking) or that     * the pool shut down since entry into this method. So we     * recheck state and if necessary roll back the enqueuing if     * stopped, or start a new thread if there are none.     *     * 3. If we cannot queue task, then we try to add a new     * thread.  If it fails, we know we are shut down or saturated     * and so reject the task.     */    // 获取当前 ctl (存有线程池状态和线程数量)    int c = ctl.get();    // 若当前工作线程数量小于核心池大小(coolPoolSize)    // 则在核心池中新增一个工作线程,并将该任务交给这个线程执行    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        // 重新获取(存在并发可能)        c = ctl.get();    }    // 若执行到这里,表示池中线程数量 >= corePoolSize,或者上面 addWorker 失败    // 若线程池处于 RUNNING 状态,并且该任务(command)成功添加到任务队列    if (isRunning(c) && workQueue.offer(command)) {        // 再次获取 ctl 值        int recheck = ctl.get();        // 若线程池不是运行状态,则要把上面添加的任务从队列中移除并执行拒绝策略        //(可理解为“回滚”操作)        if (! isRunning(recheck) && remove(command))            // 执行拒绝策略                       reject(command);        // 若此时池中没有线程,则新建一个        // PS: 这里是防止任务提交后,池中没有存活的线程了        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    // 根据上述代码分析,若执行到这里,可分为以下两种情况:    // ① 线程池不是 RUNNING 状态;    // ② 线程池处于 RUNNING 状态,且实际线程数量 workCount >= corePoolSize,    //   并且,添加到 workQueue 失败(已满)    // 此时,则需要和 maximumPoolSize 进行比较,    //   若 workCount <= maximumPoolSize, 则新建一个线程去执行该任务;    //   否则,即 workCount > maximumPoolSize (饱和),则执行拒绝策略    else if (!addWorker(command, false))        // 执行拒绝策略        reject(command);}

该方法描述的就是一个任务提交到线程池的流程,主要执行逻辑如下:

1. 若正在运行的线程数少于 corePoolSize,则创建一个新的线程,并将传入的任务(command)作为它的第一个任务执行。

2. 若运行的线程数不小于 corePoolSize,则将新来的任务添加到任务队列(workQueue)。若入队成功,仍需再次检查是否需要增加一个线程(上次检查之后现有的线程可能死了,或者进入该方法时线程池 SHUTDOWN 了,此时需要执行回滚);若池中没有线程则新建一个(确保 SHUTDOWN 状态也能执行队列中的任务)。

3. 若任务不能入队(队列已满),则创建新的线程并执行任务,若失败(超过 maximumPoolSize),则表示线程池关闭或者已经饱和,因此拒绝该任务。

为了便于理解,可参考下面的流程图:

下面分析 Worker 类及 addWorker 方法。

内部嵌套类 Worker

代码语言:javascript
复制
// 继承自 AQS,且实现了 Runnable 接口private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable{    /** Thread this worker is running in.  Null if factory fails. */    final Thread thread;    /** Initial task to run.  Possibly null. */    // 运行的第一个任务,可能为空    Runnable firstTask;    /** Per-thread task counter */    volatile long completedTasks;    /**     * Creates with given first task and thread from ThreadFactory.     * @param firstTask the first task (null if none)     */    Worker(Runnable firstTask) {        setState(-1); // inhibit interrupts until runWorker        this.firstTask = firstTask;        // 初始化 thread        this.thread = getThreadFactory().newThread(this);    }    /** Delegates main run loop to outer runWorker  */    public void run() {        runWorker(this);    }    // 其他一些 AQS 相关的方法不再一一列举}

可以看到 Worker 类继承自 AQS,它的实现与 ReentrantLock 有一些类似,可对比前文「JDK源码分析-ReentrantLock」分析。而且,Worker 类实现了 Runnable 接口,它的 run 方法是将自身作为参数传递给了外部类的 runWorker 方法,下面分析这两个方法。

addWorker 方法

代码语言:javascript
复制
// firstTask: 第一个任务,可为空// core: 是否为核心池,true 是,false 为最大池private boolean addWorker(Runnable firstTask, boolean core) {    // 该循环的主要作用就是增加 workCount 计数,增加成功后再新增 Worker 对象    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        /*         * rs >= SHUTDOWN 表示线程池不再接受新的任务         * 该判断条件分为以下三种:         *   ① 线程池处于 STOP, TYDING 或 TERMINATED 状态;         *   ② 线程池处于 SHUTDOWN 状态,且 firstTask 不为空;         *   ③ 线程池处于 SHUTDOWN 状态,且 workQueue 为空         * 满足任一条件即返回 false.         */        // Check if queue empty only if necessary.        if (rs >= SHUTDOWN &&            ! (rs == SHUTDOWN &&               firstTask == null &&               ! workQueue.isEmpty()))            return false;                for (;;) {            int wc = workerCountOf(c);            // 超出最大容量 CAPACITY,或者超出初始设置的核心池/最大池数量,则返回 false            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;            // CAS 方式增加 ctl 的 workerCount 数量(该循环的主要目的)            if (compareAndIncrementWorkerCount(c))                break retry; // 若增加失败则退出循环            c = ctl.get();  // Re-read ctl            // 运行状态改变            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }    }    // 标记 Worker 是否启动、是否添加成功    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {        // 将 firstTask 封装成 Worker 对象        w = new Worker(firstTask);        // 获取 thread 对象 t        final Thread t = w.thread;        if (t != null) {            final ReentrantLock mainLock = this.mainLock;            mainLock.lock();            try {                // Recheck while holding lock.                // Back out on ThreadFactory failure or if                // shut down before lock acquired.                int rs = runStateOf(ctl.get());                // 若线程池状态小于 SHUTDOWN,即为 RUNNING 状态;                // 或者为 SHUTDOWN 状态,且 firstTask 为空,                //   表示不再接受新的任务,但会继续执行队列中的任务                if (rs < SHUTDOWN ||                    (rs == SHUTDOWN && firstTask == null)) {                    if (t.isAlive()) // precheck that t is startable                        throw new IllegalThreadStateException();                    // 添加到工作线程集合(HashSet)                    workers.add(w);                    // 更新最大计数                    int s = workers.size();                    if (s > largestPoolSize)                        largestPoolSize = s;                    // 标记 Worker 添加成功                    workerAdded = true;                }            } finally {                mainLock.unlock();            }            // 若成功添加到工作线程集合,则启动线程执行任务            if (workerAdded) {                // 启动线程                t.start();                workerStarted = true;            }        }    } finally {        // Worker 启动失败,执行回滚操作        if (! workerStarted)            addWorkerFailed(w);    }    return workerStarted;}

runWorker 方法:

代码语言:javascript
复制
final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        // task 不为空时才执行,循环执行        // getTask 是从 workQueue 中获取任务        while (task != null || (task = getTask()) != null) {            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            // 若线程池状态 >= STOP,则需要中断该线程            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                // 中断线程                wt.interrupt();            try {                // 任务执行前调用该方法                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // 执行任务                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    // 任务执行后调用该方法                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        // getTask 返回空,说明任务队列没有任务了        processWorkerExit(w, completedAbruptly);    }}

可以看到这里有 beforeExecute 和 afterExecute 方法,分别表示提交的任务执行前后做的事情,在 ThreadPoolExecutor 类中这两个都是空方法。我们可以通过继承 ThreadPoolExecutor 类并重写这两个方法来定制自己的需求。

getTask 方法:

代码语言:javascript
复制
// 从任务队列(阻塞队列)中取任务private Runnable getTask() {    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {        // 获取线程池运行状态        int c = ctl.get();        int rs = runStateOf(c);
        // Check if queue empty only if necessary.        /*         * 线程池运行状态 rs >= SHUTDOWN,表示非 RUNNING 状态         * 该判断条件有两个:         * 1. rs >= STOP;         * 2. rs == SHUTDOWN,且工作队列为空         * 若满足上述条件中的一个,则将线程数量(workerCount)减少 1,返回 null         */        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {            decrementWorkerCount(); // 减少工作现场数量            return null; // 返回 null 表示会从池中移除一个 Worker        }                int wc = workerCountOf(c);                // Are workers subject to culling?        // 是否要移除 Worker        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        // 线程数大于 maximumPoolSize,或者需要移除 Worker        if ((wc > maximumPoolSize || (timed && timedOut))            && (wc > 1 || workQueue.isEmpty())) {            if (compareAndDecrementWorkerCount(c))                return null; // 返回空意味着会减少移除一个 Worker            continue;        }                try {            // 从 workQueue 中获取任务(Runnable 对象)            Runnable r = timed ?                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                workQueue.take();            if (r != null)                return r;            timedOut = true;        } catch (InterruptedException retry) {            timedOut = false;        }    }}

该方法主要是从任务队列 workQueue 中获取任务,并且控制池内线的程数量。

拒绝策略

拒绝策略 RejectedExecutionHandler 是一个接口,它只有一个 rejectedExecution 方法,代码如下:

代码语言:javascript
复制
public interface RejectedExecutionHandler {    // 执行拒绝策略    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}

它在 ThreadPoolExecutor 中的几个实现类如下:

ThreadPoolExecutor 默认的拒绝策略为 AbortPolicy,代码如下:

代码语言:javascript
复制
public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        // 抛出异常        throw new RejectedExecutionException("Task " + r.toString() +                                             " rejected from " +                                             e.toString());    }}

可以看到,该策略就是直接抛出 RejectedExecutionException 异常。其他拒绝策略代码也都相对简单,不再一一列举。值得一提的是,如果我们对这几种策略都不满意,可以自定义拒绝策略(实现 RejectedExecutionHandler 接口)。

小结

本文主要分析了线程池 ThreadPoolExecutor 类的主要成员变量和核心方法实现,主要包括一个任务(Runnable)的提交流程。

该类稍微有些复杂,分析时首先要搞清楚任务提交的流程以及主要成员变量(workQueue、corePoolSize、maximumPoolSize、keepAliveTime、allowCoreThreadTimeOut 等)的含义,接下来再分析会更清晰。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 WriteOnRead 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
腾讯云代码分析
腾讯云代码分析(内部代号CodeDog)是集众多代码分析工具的云原生、分布式、高性能的代码综合分析跟踪管理平台,其主要功能是持续跟踪分析代码,观测项目代码质量,支撑团队传承代码文化。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档