专栏首页WriteOnReadJDK源码分析-ThreadPoolExecutor

JDK源码分析-ThreadPoolExecutor

概述

ThreadPoolExecutor 是 JDK 中线程池的实现类,它的继承结构如下:

本文主要分析 ThreadPoolExecutor 类的主要方法和实现原理(部分代码暂未涉及,后面有机会再行分析),以后再分析 Executor 和 ExecutorService 接口的相关内容。

代码分析

成员变量

该类中的成员变量较多,下面分析一些主要的。

// 该变量是一个原子整型变量,保存了线程池的状态和线程数量private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3=29// 线程的最大容量(即池内允许的最大线程数)// 00011111 11111111 11111111 11111111,即 29 个 1,超过 5 亿private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 2^29-1
// runState is stored in the high-order bits// 线程池的运行状态,保存在 ctl 的高位private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;

这里用了一个原子整型(AtomicInteger,可以理解为线程安全的 Integer 类,占用 4 个字节,32 位)变量 ctl 来表示线程池的运行状态和线程池内部的线程数量。其中高 3 位表示线程池的运行状态,低 29 位表示线程池中线程的数量。

线程池的状态有以下 5 种:

1. RUNNING: 接受新的任务,并且处理任务队列中的任务;

2. SHUTDOWN: 不接受新的任务,但处理任务队列中的任务;

3. STOP: 不接受新的任务,不处理任务队列中的任务,并且中断正在进行的任务;

4. TIDYING: 所有的任务都已终结,工作线程的数量为 0;

5. TERMINATED: 执行 terminated() 方法后进入该状态,terminated() 方法默认实现为空。

这些状态之间的转换流程及触发条件如图所示:

接下来看其他成员变量:

// 任务队列(阻塞队列)private final BlockingQueue<Runnable> workQueue;// 互斥锁private final ReentrantLock mainLock = new ReentrantLock();// 工作线程集合private final HashSet<Worker> workers = new HashSet<Worker>();// 锁对应的条件private final Condition termination = mainLock.newCondition();// 线程池创建过的最大线程数量private int largestPoolSize;// 已完成任务的数量private long completedTaskCount;// 线程工厂类,用于创建线程private volatile ThreadFactory threadFactory;// 拒绝策略private volatile RejectedExecutionHandler handler;// 空闲线程的存活时间private volatile long keepAliveTime;/* * 核心线程是否允许超时 * 默认为 false,表示核心线程即使处于空闲状态也继续存活; *   若为 true,核心线程同样受到 keepAliveTime 的超时约束 */private volatile boolean allowCoreThreadTimeOut;// 核心池大小private volatile int corePoolSize;// 最大池大小private volatile int maximumPoolSize;// 默认拒绝策略private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

这里有几个重要的成员变量:

corePoolSize: 核心池大小;

maximumPoolSize: 最大池大小,线程池中能同时存在的最大线程数,大于等于 corePoolSize;

workQueue: 工作/任务队列,是一个阻塞队列,可参考前文「JDK源码分析-BlockingQueue」的分析。

为便于理解,这里先大概描述下向线程池提交任务的流程,后面再分析其代码实现:

① 初始化一个容量为 corePoolSize 的池子;

② 刚开始,每来一个任务就在池中创建一个线程去执行该任务,直到池中的容量到达 corePoolSize;

③ 此时若再来任务,则把这些任务放到 workQueue 中;

④ 若 workQueue 也满了,则继续创建线程执行任务,直到线程数量达到 maximumPoolSize;

⑤ 若 workQueue 已满,且线程数量达到 maximumPoolSize,此时若还有任务到来,则执行拒绝策略(handler)。

keepAliveTime & allowCoreThreadTimeOut

其中 keepAliveTime 表示空闲线程的存活时间,这两个值有一定关联:

若 allowCoreThreadTimeOut 为 false (默认),且线程数量超出 corePoolSize,则空闲时间超过 keepAliveTime 的线程会被关闭(最多保留 corePoolSize 个线程存活);

若将 allowCoreThreadTimeOut 设为 true,核心池的线程也会受该超时的影响而关闭。

构造器

ThreadPoolExecutor 内部有多个构造器,但最终都是调用下面这个:

public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {    if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)        throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)        throw new NullPointerException();    this.acc = System.getSecurityManager() == null ?            null : AccessController.getContext();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;}

构造器参数虽然比较多,但基本都是简单的赋值,前面已经分析过这些成员变量的含义,这里不再赘述。下面分析它的核心方法 execute。

在此之前,先看几个常用方法:

// Packing and unpacking ctl// 根据 ctl 和 CAPACITY 得到线程池的运行状态private static int runStateOf(int c)     { return c & ~CAPACITY; }// 根据 ctl 和 CAPACITY 得到线程池中的线程数量private static int workerCountOf(int c)  { return c & CAPACITY; }// 将线程池运行状态和线程数量合并为 ctlprivate static int ctlOf(int rs, int wc) { return rs | wc; }

execute 方法代码如下:

// command 是一个 Runnable 对象,也就是用户提交执行的任务public void execute(Runnable command) {    // 提交的任务为空时抛出异常    if (command == null)        throw new NullPointerException();    /*     * Proceed in 3 steps:     *     * 1. If fewer than corePoolSize threads are running, try to     * start a new thread with the given command as its first     * task.  The call to addWorker atomically checks runState and     * workerCount, and so prevents false alarms that would add     * threads when it shouldn't, by returning false.     *     * 2. If a task can be successfully queued, then we still need     * to double-check whether we should have added a thread     * (because existing ones died since last checking) or that     * the pool shut down since entry into this method. So we     * recheck state and if necessary roll back the enqueuing if     * stopped, or start a new thread if there are none.     *     * 3. If we cannot queue task, then we try to add a new     * thread.  If it fails, we know we are shut down or saturated     * and so reject the task.     */    // 获取当前 ctl (存有线程池状态和线程数量)    int c = ctl.get();    // 若当前工作线程数量小于核心池大小(coolPoolSize)    // 则在核心池中新增一个工作线程,并将该任务交给这个线程执行    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        // 重新获取(存在并发可能)        c = ctl.get();    }    // 若执行到这里,表示池中线程数量 >= corePoolSize,或者上面 addWorker 失败    // 若线程池处于 RUNNING 状态,并且该任务(command)成功添加到任务队列    if (isRunning(c) && workQueue.offer(command)) {        // 再次获取 ctl 值        int recheck = ctl.get();        // 若线程池不是运行状态,则要把上面添加的任务从队列中移除并执行拒绝策略        //(可理解为“回滚”操作)        if (! isRunning(recheck) && remove(command))            // 执行拒绝策略                       reject(command);        // 若此时池中没有线程,则新建一个        // PS: 这里是防止任务提交后,池中没有存活的线程了        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    // 根据上述代码分析,若执行到这里,可分为以下两种情况:    // ① 线程池不是 RUNNING 状态;    // ② 线程池处于 RUNNING 状态,且实际线程数量 workCount >= corePoolSize,    //   并且,添加到 workQueue 失败(已满)    // 此时,则需要和 maximumPoolSize 进行比较,    //   若 workCount <= maximumPoolSize, 则新建一个线程去执行该任务;    //   否则,即 workCount > maximumPoolSize (饱和),则执行拒绝策略    else if (!addWorker(command, false))        // 执行拒绝策略        reject(command);}

该方法描述的就是一个任务提交到线程池的流程,主要执行逻辑如下:

1. 若正在运行的线程数少于 corePoolSize,则创建一个新的线程,并将传入的任务(command)作为它的第一个任务执行。

2. 若运行的线程数不小于 corePoolSize,则将新来的任务添加到任务队列(workQueue)。若入队成功,仍需再次检查是否需要增加一个线程(上次检查之后现有的线程可能死了,或者进入该方法时线程池 SHUTDOWN 了,此时需要执行回滚);若池中没有线程则新建一个(确保 SHUTDOWN 状态也能执行队列中的任务)。

3. 若任务不能入队(队列已满),则创建新的线程并执行任务,若失败(超过 maximumPoolSize),则表示线程池关闭或者已经饱和,因此拒绝该任务。

为了便于理解,可参考下面的流程图:

下面分析 Worker 类及 addWorker 方法。

内部嵌套类 Worker

// 继承自 AQS,且实现了 Runnable 接口private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable{    /** Thread this worker is running in.  Null if factory fails. */    final Thread thread;    /** Initial task to run.  Possibly null. */    // 运行的第一个任务,可能为空    Runnable firstTask;    /** Per-thread task counter */    volatile long completedTasks;    /**     * Creates with given first task and thread from ThreadFactory.     * @param firstTask the first task (null if none)     */    Worker(Runnable firstTask) {        setState(-1); // inhibit interrupts until runWorker        this.firstTask = firstTask;        // 初始化 thread        this.thread = getThreadFactory().newThread(this);    }    /** Delegates main run loop to outer runWorker  */    public void run() {        runWorker(this);    }    // 其他一些 AQS 相关的方法不再一一列举}

可以看到 Worker 类继承自 AQS,它的实现与 ReentrantLock 有一些类似,可对比前文「JDK源码分析-ReentrantLock」分析。而且,Worker 类实现了 Runnable 接口,它的 run 方法是将自身作为参数传递给了外部类的 runWorker 方法,下面分析这两个方法。

addWorker 方法

// firstTask: 第一个任务,可为空// core: 是否为核心池,true 是,false 为最大池private boolean addWorker(Runnable firstTask, boolean core) {    // 该循环的主要作用就是增加 workCount 计数,增加成功后再新增 Worker 对象    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        /*         * rs >= SHUTDOWN 表示线程池不再接受新的任务         * 该判断条件分为以下三种:         *   ① 线程池处于 STOP, TYDING 或 TERMINATED 状态;         *   ② 线程池处于 SHUTDOWN 状态,且 firstTask 不为空;         *   ③ 线程池处于 SHUTDOWN 状态,且 workQueue 为空         * 满足任一条件即返回 false.         */        // Check if queue empty only if necessary.        if (rs >= SHUTDOWN &&            ! (rs == SHUTDOWN &&               firstTask == null &&               ! workQueue.isEmpty()))            return false;                for (;;) {            int wc = workerCountOf(c);            // 超出最大容量 CAPACITY,或者超出初始设置的核心池/最大池数量,则返回 false            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;            // CAS 方式增加 ctl 的 workerCount 数量(该循环的主要目的)            if (compareAndIncrementWorkerCount(c))                break retry; // 若增加失败则退出循环            c = ctl.get();  // Re-read ctl            // 运行状态改变            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }    }    // 标记 Worker 是否启动、是否添加成功    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {        // 将 firstTask 封装成 Worker 对象        w = new Worker(firstTask);        // 获取 thread 对象 t        final Thread t = w.thread;        if (t != null) {            final ReentrantLock mainLock = this.mainLock;            mainLock.lock();            try {                // Recheck while holding lock.                // Back out on ThreadFactory failure or if                // shut down before lock acquired.                int rs = runStateOf(ctl.get());                // 若线程池状态小于 SHUTDOWN,即为 RUNNING 状态;                // 或者为 SHUTDOWN 状态,且 firstTask 为空,                //   表示不再接受新的任务,但会继续执行队列中的任务                if (rs < SHUTDOWN ||                    (rs == SHUTDOWN && firstTask == null)) {                    if (t.isAlive()) // precheck that t is startable                        throw new IllegalThreadStateException();                    // 添加到工作线程集合(HashSet)                    workers.add(w);                    // 更新最大计数                    int s = workers.size();                    if (s > largestPoolSize)                        largestPoolSize = s;                    // 标记 Worker 添加成功                    workerAdded = true;                }            } finally {                mainLock.unlock();            }            // 若成功添加到工作线程集合,则启动线程执行任务            if (workerAdded) {                // 启动线程                t.start();                workerStarted = true;            }        }    } finally {        // Worker 启动失败,执行回滚操作        if (! workerStarted)            addWorkerFailed(w);    }    return workerStarted;}

runWorker 方法:

final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        // task 不为空时才执行,循环执行        // getTask 是从 workQueue 中获取任务        while (task != null || (task = getTask()) != null) {            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            // 若线程池状态 >= STOP,则需要中断该线程            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                // 中断线程                wt.interrupt();            try {                // 任务执行前调用该方法                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // 执行任务                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    // 任务执行后调用该方法                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        // getTask 返回空,说明任务队列没有任务了        processWorkerExit(w, completedAbruptly);    }}

可以看到这里有 beforeExecute 和 afterExecute 方法,分别表示提交的任务执行前后做的事情,在 ThreadPoolExecutor 类中这两个都是空方法。我们可以通过继承 ThreadPoolExecutor 类并重写这两个方法来定制自己的需求。

getTask 方法:

// 从任务队列(阻塞队列)中取任务private Runnable getTask() {    boolean timedOut = false; // Did the last poll() time out?
    for (;;) {        // 获取线程池运行状态        int c = ctl.get();        int rs = runStateOf(c);
        // Check if queue empty only if necessary.        /*         * 线程池运行状态 rs >= SHUTDOWN,表示非 RUNNING 状态         * 该判断条件有两个:         * 1. rs >= STOP;         * 2. rs == SHUTDOWN,且工作队列为空         * 若满足上述条件中的一个,则将线程数量(workerCount)减少 1,返回 null         */        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {            decrementWorkerCount(); // 减少工作现场数量            return null; // 返回 null 表示会从池中移除一个 Worker        }                int wc = workerCountOf(c);                // Are workers subject to culling?        // 是否要移除 Worker        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        // 线程数大于 maximumPoolSize,或者需要移除 Worker        if ((wc > maximumPoolSize || (timed && timedOut))            && (wc > 1 || workQueue.isEmpty())) {            if (compareAndDecrementWorkerCount(c))                return null; // 返回空意味着会减少移除一个 Worker            continue;        }                try {            // 从 workQueue 中获取任务(Runnable 对象)            Runnable r = timed ?                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                workQueue.take();            if (r != null)                return r;            timedOut = true;        } catch (InterruptedException retry) {            timedOut = false;        }    }}

该方法主要是从任务队列 workQueue 中获取任务,并且控制池内线的程数量。

拒绝策略

拒绝策略 RejectedExecutionHandler 是一个接口,它只有一个 rejectedExecution 方法,代码如下:

public interface RejectedExecutionHandler {    // 执行拒绝策略    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);}

它在 ThreadPoolExecutor 中的几个实现类如下:

ThreadPoolExecutor 默认的拒绝策略为 AbortPolicy,代码如下:

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {        // 抛出异常        throw new RejectedExecutionException("Task " + r.toString() +                                             " rejected from " +                                             e.toString());    }}

可以看到,该策略就是直接抛出 RejectedExecutionException 异常。其他拒绝策略代码也都相对简单,不再一一列举。值得一提的是,如果我们对这几种策略都不满意,可以自定义拒绝策略(实现 RejectedExecutionHandler 接口)。

小结

本文主要分析了线程池 ThreadPoolExecutor 类的主要成员变量和核心方法实现,主要包括一个任务(Runnable)的提交流程。

该类稍微有些复杂,分析时首先要搞清楚任务提交的流程以及主要成员变量(workQueue、corePoolSize、maximumPoolSize、keepAliveTime、allowCoreThreadTimeOut 等)的含义,接下来再分析会更清晰。

本文分享自微信公众号 - WriteOnRead(WriteOnRead),作者:jaxer

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • JDK源码分析-Hashtable

    与 HashMap 类似,Hashtable 也是散列表的实现。它的内部结构可以理解为「数组 + 链表」的形式,结构示意图如下:

    WriteOnRead
  • JDK源码分析-Map

    Map 是一个接口,它表示一种“键-值(key-value)”映射的对象(Entry),其中键是不重复的(值可以重复),且最多映射到一个值(可以理解为“映射”或...

    WriteOnRead
  • JDK源码分析-HashMap(1)

    HashMap 是 Java 开发中最常用的容器类之一,也是面试的常客。它其实就是前文「数据结构与算法笔记(二)」中「散列表」的实现,处理散列冲突用的是“链表法...

    WriteOnRead
  • Semaphore 源码分析

    maphore 源码分析 1. 在阅读源码时做了大量的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限,并且代码阅读起来没有 IDE 方便,所以在...

    lwen
  • 漫画:什么是分布式锁?

    利用Memcached的add命令。此命令是原子性操作,只有在key不存在的情况下,才能add成功,也就意味着线程得到了锁。

    后端技术探索
  • Semaphore 源码分析

    Semaphore 源码分析 1. 在阅读源码时做了大量的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限,并且代码阅读起来没有 IDE 方便,所...

    lwen
  • 面试总结

    海仔
  • C# dotnet 自己实现一个线程同步上下文

    昨天鹏飞哥问了我一个问题,为什么在控制台程序的主线程等待某个线程执行完成之后回来,是在其他线程执行的。而 WPF 在等待某个线程执行完成之后,可以回到主线程执行...

    林德熙
  • 厕读:每日一题,面试无忧

    8. GC线程是否为守护线程?() 答案:是 解析:线程分为守护线程和非守护线程(即用户线程)。 只要当前JVM实例中尚存在任何一个非守护线程没有结束,守护线程...

    ImportSource
  • java多线程并发控制countDownLatch和cyclicBarrier的使用

    java主线程等待所有子线程执行完毕在执行,这个需求其实我们在工作中经常会用到,比如用户下单一个产品,后台会做一系列的处理,为了提高效率,每个处理都可以用一个线...

    大道七哥

扫码关注云+社区

领取腾讯云代金券