前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ThreadPoolExecutor线程池设计思路

ThreadPoolExecutor线程池设计思路

作者头像
大忽悠爱学习
发布2022-10-04 18:09:02
4290
发布2022-10-04 18:09:02
举报
文章被收录于专栏:c++与qt学习

ThreadPoolExecutor线程池设计思路


引言

本篇文章我将会尝试用自己的语言,用我个人的想法来阐述一下ThreadPoolExecutor的设计思路。

首先,我们来看一下ThreadPoolExecutor的继承体系:

在这里插入图片描述
在这里插入图片描述
代码语言:javascript
复制
public interface Executor {
    void execute(Runnable command);
}

Executor意为执行器,作为顶层抽象接口,提供了一个execute方法来执行传入的任务,而该任务如何被执行,则是由不同实现子类来决定的。

  • 按任务执行方式的不同,我们就能构思出下面一副架构图
在这里插入图片描述
在这里插入图片描述

其次,我们还需要考虑到对任务管理接口的统一,例如: 停止任务,提交任务等… 。 因为,每个不同的子类实现,都有对任务管理的需求,因此我们需要统一任务管理相关接口,防止混乱。

任务管理相关接口由ExecutorService为我们规定好了:

代码语言:javascript
复制
public interface ExecutorService extends Executor {

    //启动有序关闭,其中执行先前提交的任务,但不会接受新任务。如果已经关闭,调用没有额外的效果。
    //此方法不等待先前提交的任务完成执行。使用awaitTermination来做到这一点。
    void shutdown();

    //尝试停止所有正在执行的任务,停止等待任务的处理,并返回等待执行的任务列表。
    //此方法不等待主动执行的任务终止。使用awaitTermination来做到这一点。
    //除了尽最大努力停止处理正在执行的任务之外,没有任何保证.
    List<Runnable> shutdownNow();

    //如果此执行程序已关闭,则返回true。
    boolean isShutdown();

    //调用shutdown或shutdownNow后,如果所有任务都已完成,则返回true。
    boolean isTerminated();

    //在shutdown请求后阻塞,直到所有任务都完成执行,或者发生超时,或者当前线程被中断,以先发生者为准。
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    //提交一个有返回值的任务以供执行,并返回一个表示该任务待处理结果的 Future。 Future 的get方法将在成功完成后返回任务的结果。
    <T> Future<T> submit(Callable<T> task);

    //提交Runnable任务以执行并返回代表该任务的Future。Future的get方法将在成功完成后返回给定的result。
    <T> Future<T> submit(Runnable task, T result);

    //提交Runnable任务以执行并返回代表该任务的Future。Future 的get方法将在成功完成后返回null。
    Future<?> submit(Runnable task);

    //执行给定的任务,返回一个 Futures 列表,在所有完成时保存它们的状态和结果。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    //执行给定的任务,当全部完成或超时到期时,返回保存其状态和结果的 Futures 列表。返回时,未完成的任务将被取消。
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    //执行给定的任务,返回已成功完成的任务的结果(即不抛出异常)。正常或异常返回时,取消未完成的任务。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    //执行给定任务,超时前返回已成功完成的任务的结果(即不抛出异常)。正常或异常返回时,取消未完成的任务。
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

如果我们想要获取任务的返回值,可以通过Executors体系下提供的Future来获取。

对于任务执行器而言,还有一个非常重要的就是任务队列,毕竟一下子来了很多任务,又无法及时处理,那就需要先找个地方存一下嘛。

相关具体实现,如下图所示:

在这里插入图片描述
在这里插入图片描述

这里先不对Future具体实现进行讲解,大家先了解即可


ThreadPoolExecutor

关于线程池一些常见概念,我这里就不多解释了,不清楚的可以看下面这篇文章:

为什么需要线程池?

ThreadPoolExecutor的核心工作流程其实就如下面这幅图所示一般:

在这里插入图片描述
在这里插入图片描述

看起来似乎很简单,但是如果要实现,又该从哪里写起来呢?


线程池相关属性

线程池的设计离不开一堆参数来记录线程池当前的状态,那么具体应该记录哪些状态呢?

代码语言:javascript
复制
public class ThreadPoolExecutor extends AbstractExecutorService {

    // 控制变量-存放线程池状态和线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    // 任务队列,必须是阻塞队列
    private final BlockingQueue<Runnable> workQueue;

    // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
    private final HashSet<Worker> workers = new HashSet<>();
    
    // 全局锁
    private final ReentrantLock mainLock = new ReentrantLock();

    // awaitTermination方法使用的等待条件变量
    private final Condition termination = mainLock.newCondition();

    // 记录峰值线程数
    private int largestPoolSize;
    
    // 记录已经成功执行完毕的任务数
    private long completedTaskCount;
    
    // 线程工厂,用于创建新的线程实例
    private volatile ThreadFactory threadFactory;

    // 拒绝执行处理器,对应不同的拒绝策略
    private volatile RejectedExecutionHandler handler;
    
    // 空闲线程等待任务的时间周期,单位是纳秒
    private volatile long keepAliveTime;
    
    // 是否允许核心线程超时,如果为true则keepAliveTime对核心线程也生效
    private volatile boolean allowCoreThreadTimeOut;
    
    // 核心线程数
    private volatile int corePoolSize;

    // 线程池容量
    private volatile int maximumPoolSize;

    // 省略其他代码
}    

这其中部分状态是随着线程池运行起来后,动态改变的,还有一部分是需要用户提前设置好的,相当于一个指标,例如: 核心线程数,最大线程数等等…

用户可以在ThreadPoolExecutor的构造参数中进行设置:

代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

可以自定义核心线程数、线程池容量(最大线程数)、空闲线程等待任务周期、任务队列、线程工厂、拒绝策略。下面简单分析一下每个参数的含义和作用:

  • corePoolSize:int类型,核心线程数量。
  • maximumPoolSize:int类型,最大线程数量,也就是线程池的容量。
  • keepAliveTime:long类型,线程空闲等待时间,也和工作线程的生命周期有关,下文会分析。
  • unit:TimeUnit类型,keepAliveTime参数的时间单位,实际上keepAliveTime最终会转化为纳秒。
  • workQueue:BlockingQueue< Runnable >类型,等待队列或者叫任务队列。
  • threadFactory:ThreadFactory类型,线程工厂,用于创建工作线程(包括核心线程和非核心线程),默认使用Executors.defaultThreadFactory()作为内建线程工厂实例,一般自定义线程工厂才能更好地跟踪工作线程。
  • handler:RejectedExecutionHandler类型,线程池的拒绝执行处理器,更多时候称为拒绝策略,拒绝策略执行的时机是当阻塞队列已满、没有空闲的线程(包括核心线程和非核心线程)并且继续提交任务。提供了4种内建的拒绝策略实现:
    • AbortPolicy:直接拒绝策略,也就是不会执行任务,直接抛出RejectedExecutionException,这是默认的拒绝策略。
    • DiscardPolicy:抛弃策略,也就是直接忽略提交的任务(通俗来说就是空实现)。
    • DiscardOldestPolicy:抛弃最老任务策略,也就是通过poll()方法取出任务队列队头的任务抛弃,然后执行当前提交的任务。
    • CallerRunsPolicy:调用者执行策略,也就是当前调用Executor#execute()的线程直接调用任务Runnable#run(),一般不希望任务丢失会选用这种策略,但从实际角度来看,原来的异步调用意图会退化为同步调用。

线程池状态记录

如同Java线程有五种状态一般,线程池也应该具有不同的状态,并且每种状态之前的切换和行为表现都不同,ThreadPoolExecutor通过ctl属性的前三位来存放线程池当前状态,后29位则存放当前线程池中工作线程数量。

代码语言:javascript
复制
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//COUNT_BITS =29
private static final int COUNT_BITS = Integer.SIZE - 3;
//000 111....111 三个0,29个1
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;

//线程池的五种状态,每个值只有前三位不同
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// 通过ctl值获取运行状态
private static int runStateOf(int c)     { return c & ~COUNT_MASK; }
// 通过ctl值获取工作线程数
private static int workerCountOf(int c)  { return c & COUNT_MASK; }

// 通过运行状态和工作线程数计算ctl的值,或运算
private static int ctlOf(int rs, int wc) { return rs | wc; }

private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

// CAS操作线程数增加1
private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

// CAS操作线程数减少1
private boolean compareAndDecrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect - 1);
}

// 线程数直接减少1
private void decrementWorkerCount() {
    ctl.addAndGet(-1);
}

接下来分析一下线程池的状态变量,工作线程上限数量位的长度是COUNT_BITS,它的值是Integer.SIZE - 3,也就是正整数29:

我们知道,整型包装类型Integer实例的大小是4 byte,一共32 bit,也就是一共有32个位用于存放0或者1。 在ThreadPoolExecutor实现中,使用32位的整型包装类型存放工作线程数和线程池状态。 其中,低29位用于存放工作线程数,而高3位用于存放线程池状态,所以线程池的状态最多只能有2^3种。 工作线程上限数量为2^29 - 1,超过5亿,这个数量在短时间内不用考虑会超限。

接着看工作线程上限数量掩码COUNT_MASK,它的值是(1 < COUNT_BITS) - l,也就是1左移29位,再减去1,如果补全32位,它的位视图如下:

在这里插入图片描述
在这里插入图片描述

然后就是线程池的状态常量,这里只详细分析其中一个,其他类同,这里看RUNNING状态:

代码语言:javascript
复制
// -1的补码为:111-11111111111111111111111111111
// 左移29位后:111-00000000000000000000000000000
// 10进制值为:-536870912 
// 高3位111的值就是表示线程池正在处于运行状态
private static final int RUNNING = -1 << COUNT_BITS;

控制变量ctl的组成就是通过线程池运行状态rs和工作线程数wc通过或运算得到的:

代码语言:javascript
复制
// rs=RUNNING值为:111-00000000000000000000000000000
// wc的值为0:000-00000000000000000000000000000
// rs | wc的结果为:111-00000000000000000000000000000
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { 
    return rs | wc; 
}

那么我们怎么从ctl中取出高3位?上面源码中提供的runStateOf()方法就是提取运行状态:

代码语言:javascript
复制
// 先把COUNT_MASK取反(~COUNT_MASK),得到:111-00000000000000000000000000000
// ctl位图特点是:xxx-yyyyyyyyyyyyyyyyyyyyyyyyyyyyyy
// 两者做一次与运算即可得到高3位xxx
private static int runStateOf(int c){ 
    return c & ~COUNT_MASK; 
}

同理,取出低29位只需要把ctl和COUNT_MASK(000-11111111111111111111111111111)做一次与运算即可。

小结一下线程池的运行状态常量:

在这里插入图片描述
在这里插入图片描述

这里有一个比较特殊的技巧,由于运行状态值存放在高3位,所以可以直接通过十进制值(甚至可以忽略低29位,直接用ctl进行比较,或者使用ctl和线程池状态常量进行比较)来比较和判断线程池的状态:

RUNNING(-536870912) < SHUTDOWN(0) < STOP(536870912) < TIDYING(1073741824) < TERMINATED(1610612736)

下面这三个方法就是使用这种技巧:

代码语言:javascript
复制
// ctl和状态常量比较,判断是否小于
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}

// ctl和状态常量比较,判断是否小于或等于
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}

// ctl和状态常量SHUTDOWN比较,判断是否处于RUNNING状态
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}

最后是线程池状态的跃迁图:

在这里插入图片描述
在这里插入图片描述

PS:线程池源码中有很多中间变量用了简单的单字母表示,例如c就是表示ctl、wc就是表示worker count、rs就是表示running status


execute执行任务

ThreadPoolExecutor需要将任务交给线程池来执行,下面我们就来看看,究竟是如何实现的:

代码语言:javascript
复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //获取线程池状态: 前3位表示线程池状态,后29位表示存活线程数量
    int c = ctl.get();
    //1.在核心线程数未满之前,先创建核心线程来执行任务
    //通过位运算拿到工作线程数量,判断当前存活的工作线程数量是否小于核心线程数量
    if (workerCountOf(c) < corePoolSize) {
        //如果核心线程创建成功则直接返回
        if (addWorker(command, true))
            return;
        //这里说明创建核心线程失败,需要更新ctl的临时变量c
        c = ctl.get();
    }
    //2. 将任务先放到任务队列中去
    // 走到这里说明创建新的核心线程失败,也就是当前工作线程数大于等于corePoolSize
    // 判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务(放入任务失败返回false)
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
          // 这里是向任务队列投放任务成功,对线程池的运行中状态做二次检查
        // 如果线程池二次检查状态是非运行中状态,则从任务队列移除当前的任务调用拒绝策略处理之(也就是移除前面成功入队的任务实例)
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 走到下面的else if分支,说明有以下的前提:
        // 0、待执行的任务已经成功加入任务队列
        // 1、线程池可能是RUNNING状态
        // 2、传入的任务可能从任务队列中移除失败(移除失败的唯一可能就是任务已经被执行了)
        //  或者我们设置核心线程数量为0
        
        // 如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null - 返回
        // 也就是创建的非核心线程不会马上运行,而是等待获取任务队列的任务去执行 
        // 如果工作线程数量此时不为零,那么就让存活的工作线程去任务队列取活干
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3.任务队列已满,下面尝试创建非核心线程来执行任务
    else if (!addWorker(command, false))
        reject(command);
}

这里简单分析一下整个流程:

  1. 如果当前工作线程总数小于corePoolSize,则直接创建核心线程执行任务(任务实例会传入直接用于构造工作线程实例)。
  2. 如果当前工作线程总数大于等于corePoolSize,判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务,这里会二次检查线程池运行状态,如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null。
  3. 如果向任务队列投放任务失败(任务队列已经满了),则会尝试创建非核心线程传入任务实例执行。
  4. 如果创建非核心线程失败,此时需要拒绝执行任务,调用拒绝策略处理任务。

如果一个任务成功加入任务队列,我们依然需要二次检查是否需要添加一个工作线程(因为所有存活的工作线程有可能在最后一次检查之后已经终结)或者执行当前方法的时候线程池是否已经shutdown了。所以我们需要二次检查线程池的状态,必须时把任务从任务队列中移除或者在没有可用的工作线程的前提下新建一个工作线程

任务提交流程从调用者的角度来看如下:

在这里插入图片描述
在这里插入图片描述

工作线程抽象为Worker

因为我们需要对线程池中每个线程的状态进行记录,因此就无法用一个单纯的List< Thread > 集合来保存所有工作线程,还需要对工作线程做一层包装,我们来看一下ThreadPoolExecutor是如何做的:

代码语言:javascript
复制
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{

    // 保存ThreadFactory创建的线程实例,如果ThreadFactory创建线程失败则为null
    final Thread thread;
    // 保存传入的Runnable任务实例
    Runnable firstTask;
    // 记录每个线程完成的任务总数
    volatile long completedTasks;
    
    // 唯一的构造函数,传入任务实例firstTask,注意可以为null
    Worker(Runnable firstTask) {
        // 禁止线程中断,直到runWorker()方法执行
        setState(-1);
        this.firstTask = firstTask;
        // 通过ThreadFactory创建线程实例,注意一下Worker实例自身作为Runnable用于创建新的线程实例
        this.thread = getThreadFactory().newThread(this);
    }

    // 委托到外部的runWorker()方法,注意runWorker()方法是线程池的方法,而不是Worker的方法
    public void run() {
        runWorker(this);
    }

    //  是否持有独占锁,state值为1的时候表示持有锁,state值为0的时候表示已经释放锁
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    // 独占模式下尝试获取资源,这里没有判断传入的变量,直接CAS判断0更新为1是否成功,成功则设置独占线程为当前线程
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    
    // 独占模式下尝试是否资源,这里没有判断传入的变量,直接把state设置为0
    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }
    
    // 加锁
    public void lock()        { acquire(1); }

    // 尝试加锁
    public boolean tryLock()  { return tryAcquire(1); }

    // 解锁
    public void unlock()      { release(1); }

    // 是否锁定
    public boolean isLocked() { return isHeldExclusively(); }
    
    // 启动后进行线程中断,注意这里会判断线程实例的中断标志位是否为false,只有中断标志位为false才会中断
    //在当前线程持有锁,并且没产生中断的情况下,会触发中断
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

execute中添加工作线程

ThreadPoolExecutor的execute方法,会在核心线程未满和队列已满,最大线程数未满的情况下,分别调用addWorker方法添加核心线程和工作线程。

下面我们来分析一下addWorker的实现:

代码语言:javascript
复制
// 添加工作线程,如果返回false说明没有新创建工作线程,如果返回true说明创建和启动工作线程成功
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:  
    // 注意这是一个死循环 - 最外层循环
    //c的前三位是线程池状态,后面29位是工作线程数量
    for (int c = ctl.get();;) {
        // 这个是十分复杂的条件,这里先拆分多个与(&&)条件:
        // 1. 线程池状态至少为SHUTDOWN状态,也就是rs >= SHUTDOWN(0)
        // 2. 线程池状态至少为STOP状态,也就是rs >= STOP(1),或者传入的任务实例firstTask不为null,或者任务队列为空
        // 其实这个判断的边界是线程池状态为shutdown状态下,不会再接受新的任务,在此前提下如果状态已经到了STOP、或者传入任务不为空、或者任务队列为空(已经没有积压任务)都不需要添加新的线程
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
        // 注意这也是一个死循环 - 二层循环
        for (;;) {
            // 这里每一轮循环都会重新获取工作线程数wc
            // 1. 如果传入的core为true,表示将要创建核心线程,通过wc和corePoolSize判断,如果wc >= corePoolSize,则返回false表示创建核心线程失败
            // 1. 如果传入的core为false,表示将要创非建核心线程,通过wc和maximumPoolSize判断,如果wc >= maximumPoolSize,则返回false表示创建非核心线程失败
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            // 成功通过CAS更新工作线程数wc,则break到最外层的循环---满足条件,说明新增工作线程数这个CAS操作成功完成
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 走到这里说明了通过CAS更新工作线程数wc失败,这个时候需要重新判断线程池的状态是否由RUNNING已经变为SHUTDOWN
            c = ctl.get();  // Re-read ctl
            // 如果线程池状态已经由RUNNING已经变为SHUTDOWN,则重新跳出到外层循环继续执行
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // 如果线程池状态依然是RUNNING,CAS更新工作线程数wc失败说明有可能是并发更新导致的失败,则在内层循环重试即可 
            // else CAS failed due to workerCount change; retry inner loop 
        }
    }
    // 标记工作线程是否启动成功
    boolean workerStarted = false;
    // 标记工作线程是否创建成功
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 传入任务实例firstTask创建Worker实例,Worker构造里面会通过线程工厂创建新的Thread对象,所以下面可以直接操作Thread t = w.thread
        // 这一步Worker实例已经创建,但是没有加入工作线程集合或者启动它持有的线程Thread实例
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            // 这里需要全局加锁,因为会改变一些指标值和非线程安全的集合
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                // 这里主要在加锁的前提下判断ThreadFactory创建的线程是否存活或者判断获取锁成功之后线程池状态是否已经更变为SHUTDOWN
                // 1. 如果线程池状态依然为RUNNING,则只需要判断线程实例是否存活,需要添加到工作线程集合和启动新的Worker
                // 2. 如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker
                // 对于2,换言之,如果线程池处于SHUTDOWN状态下,同时传入的任务实例firstTask不为null,则不会添加到工作线程集合和启动新的Worker
                // 这一步其实有可能创建了新的Worker实例但是并不启动(临时对象,没有任何强引用),这种Worker有可能成功下一轮GC被收集的垃圾对象
                if (isRunning(c) ||
                    (runStateLessThan(c, STOP) && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 把创建的工作线程实例添加到工作线程集合
                    workers.add(w);
                    int s = workers.size();
                    // 尝试更新历史峰值工作线程数,也就是线程池峰值容量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 这里更新工作线程是否启动成功标识为true,后面才会调用Thread#start()方法启动真实的线程实例
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
            if (workerAdded) {
                t.start();
                // 标记线程启动成功
                workerStarted = true;
            }
        }
    } finally {
        // 线程启动失败,需要从工作线程集合移除对应的Worker
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

// 添加Worker失败
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 从工作线程集合移除之
        if (w != null)
            workers.remove(w);
        // wc数量减1    
        decrementWorkerCount();
        // 基于状态判断尝试终结线程池
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}

addWorker的流程看上去貌似很复杂,其实不难,请看下图:

在这里插入图片描述
在这里插入图片描述

Worker是如何run的

addWorker方法再新创建一个工作线程并启动后就结束了,那么工作线程又是如何运行的呢? 又是如何从任务队列获取任务执行的呢?

线程池调用shutDown需要停止池内的线程,那么又是如何做到的呢?

这一切的奥秘都在runWorker方法内部:

代码语言:javascript
复制
final void runWorker(Worker w) {
    // 获取当前线程,实际上和Worker持有的线程实例是相同的
    Thread wt = Thread.currentThread();
    // 获取Worker中持有的初始化时传入的任务对象,这里注意存放在临时变量task中
    Runnable task = w.firstTask;
    // 设置Worker中持有的初始化时传入的任务对象为null
    w.firstTask = null;
    // 由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许线程中断
    w.unlock(); 
    // 记录线程是否因为用户异常终结,默认是true
    boolean completedAbruptly = true;
    try {
        // 初始化任务对象不为null,或者从任务队列获取任务不为空(从任务队列获取到的任务会更新到临时变量task中)
        // getTask()由于使用了阻塞队列,这个while循环如果命中后半段会处于阻塞或者超时阻塞状态,getTask()返回为null会导致线程跳出死循环使线程终结
        while (task != null || (task = getTask()) != null) {
            // Worker加锁,本质是AQS获取资源并且尝试CAS更新state由0更变为1
            w.lock();
            // 如果线程池正在停止(也就是由RUNNING或者SHUTDOWN状态向STOP状态变更),那么要确保当前工作线程是中断状态
            // 否则,要保证当前线程不是中断状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                //如果线程池状态大于等于 STOP,那么意味着该线程也要中断
                //这里即使中断了,也会继续往下面执行任务
                wt.interrupt();
            try {
                // 钩子方法,任务执行前
                beforeExecute(wt, task);
                try {
                    task.run();
                    // 钩子方法,任务执行后 - 正常情况
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    // 钩子方法,任务执行后 - 异常情况
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                // 清空task临时变量,这个很重要,否则while会死循环执行同一个task
                task = null;
                // 累加Worker完成的任务数
                w.completedTasks++;
                // Worker解锁,本质是AQS释放资源,设置state为0
                w.unlock();
            }
        }
        // 走到这里说明某一次getTask()返回为null,线程正常退出
        completedAbruptly = false;
    } finally {
        // 处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出
        processWorkerExit(w, completedAbruptly);
    }
}

小结一下上面runWorker()方法的核心流程:

  • Worker先执行一次解锁操作,用于解除不可中断状态。
  • 通过while循环调用getTask()方法从任务队列中获取任务(当然,首轮循环也有可能是外部传入的firstTask任务实例)。
  • 如果线程池更变为STOP状态,则需要确保工作线程是中断状态并且进行中断处理,否则要保证工作线程必须不是中断状态。
  • 执行任务实例Runnale#run()方法,任务实例执行之前和之后(包括正常执行完毕和异常执行情况)分别会调用钩子方法beforeExecute()和afterExecute()。
  • while循环跳出意味着runWorker()方法结束和工作线程生命周期结束(Worker#run()生命周期完结),会调用processWorkerExit()处理工作线程退出的后续工作。
在这里插入图片描述
在这里插入图片描述

Worker从工作队列获取任务

代码语言:javascript
复制
private Runnable getTask() {
    // 记录上一次从队列中拉取的时候是否超时
    boolean timedOut = false; // Did the last poll() time out?
    // 注意这是死循环
    for (;;) {
        int c = ctl.get();

        // Check if queue empty only if necessary.
        // 第一个if:如果线程池状态至少为SHUTDOWN,也就是rs >= SHUTDOWN(0),则需要判断两种情况(或逻辑):
        // 1. 线程池状态至少为STOP(1),也就是线程池正在停止,一般是调用了shutdownNow()方法
        // 2. 任务队列为空
        // 如果在线程池至少为SHUTDOWN状态并且满足上面两个条件之一,则工作线程数wc减去1,然后直接返回null
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
        // 跑到这里说明线程池还处于RUNNING状态,重新获取一次工作线程数
        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed临时变量勇于线程超时控制,决定是否需要通过poll()此带超时的非阻塞方法进行任务队列的任务拉取
        // 1.allowCoreThreadTimeOut默认值为false,如果设置为true,则允许核心线程也能通过poll()方法从任务队列中拉取任务
        // 2.工作线程数大于核心线程数的时候,说明线程池中创建了额外的非核心线程,这些非核心线程一定是通过poll()方法从任务队列中拉取任务
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
        // 第二个if:
        // 1.wc > maximumPoolSize说明当前的工作线程总数大于maximumPoolSize,说明了通过setMaximumPoolSize()方法减少了线程池容量
        // 或者 2.timed && timedOut说明了线程命中了超时控制并且上一轮循环通过poll()方法从任务队列中拉取任务为null
        // 并且 3. 工作线程总数大于1或者任务队列为空,则通过CAS把线程数减去1,同时返回null,
        // CAS把线程数减去1失败会进入下一轮循环做重试
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果timed为true,通过poll()方法做超时拉取,keepAliveTime时间内没有等待到有效的任务,则返回null
            // 如果timed为false,通过take()做阻塞拉取,会阻塞到有下一个有效的任务时候再返回(一般不会是null)
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            // 这里很重要,只有非null时候才返回,null的情况下会进入下一轮循环
            if (r != null)
                return r;
            // 跑到这里说明上一次从任务队列中获取到的任务为null,一般是workQueue.poll()方法超时返回null
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

getTask的流程图展示如下:

在这里插入图片描述
在这里插入图片描述

对于第二步条件检查,这段逻辑大多数情况下是针对非核心线程。在execute()方法中,当线程池总数已经超过了corePoolSize并且还小于maximumPoolSize时,当任务队列已经满了的时候,会通过addWorker(task,false)添加非核心线程。而这里的逻辑恰好类似于addWorker(task,false)的反向操作,用于减少非核心线程,使得工作线程总数趋向于corePoolSize。如果对于非核心线程,上一轮循环获取任务对象为null,这一轮循环很容易满足timed && timedOut为true,这个时候getTask()返回null会导致Worker#runWorker()方法跳出死循环,之后执行processWorkerExit()方法处理后续工作,而该非核心线程对应的Worker则变成“游离对象”,等待被JVM回收。当allowCoreThreadTimeOut设置为true的时候,这里分析的非核心线程的生命周期终结逻辑同时会适用于核心线程。那么可以总结出keepAliveTime的意义:

  • 当允许核心线程超时,也就是allowCoreThreadTimeOut设置为true的时候,此时keepAliveTime表示空闲的工作线程的存活周期。
  • 默认情况下不允许核心线程超时,此时keepAliveTime表示空闲的非核心线程的存活周期。

在一些特定的场景下,配置合理的keepAliveTime能够更好地利用线程池的工作线程资源。

我们还需要注意一点: 如果阻塞获取任务的过程中,线程被中断了,那么获取任务也会立刻停止。


Worker收尾工作

processWorkerExit()方法是为将要终结的Worker做一次清理和数据记录工作(因为processWorkerExit()方法也包裹在runWorker()方法finally代码块中,其实工作线程在执行完processWorkerExit()方法才算真正的终结)。

代码语言:javascript
复制
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 因为抛出用户异常导致线程终结,直接使工作线程数减1即可
    // 如果没有任何异常抛出的情况下是通过getTask()返回null引导线程正常跳出runWorker()方法的while死循环从而正常终结,这种情况下,在getTask()中已经把线程数减1
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 全局的已完成任务记录数加上此将要终结的Worker中的已完成任务数
        completedTaskCount += w.completedTasks;
        // 工作线程集合中移除此将要终结的Worker
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
     
    // 见下一小节分析,用于根据当前线程池的状态判断是否需要进行线程池terminate处理
    tryTerminate();

    int c = ctl.get();
    // 如果线程池的状态小于STOP,也就是处于RUNNING或者SHUTDOWN状态的前提下:
    // 1.如果线程不是由于抛出用户异常终结,如果允许核心线程超时,则保持线程池中至少存在一个工作线程
    // 2.如果线程由于抛出用户异常终结,或者当前工作线程数小于min,那么直接添加一个新的非核心线程
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 如果允许核心线程超时,最小值为0,否则为corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            // 如果最小值为0,同时任务队列不空,则更新最小值为1
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 工作线程数大于等于最小值,直接返回不新增非核心线程
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

从上面的源码,我们可以得出下面一个关键结论:

在线程池状态处于Running或者Shutdown的前提下。

  1. 如果某个工作线程因为抛出用户异常而结束,那么会立马起一个新的非核心线程。

代码的后面部分区域,会判断线程池的状态,如果线程池是RUNNING或者SHUTDOWN状态的前提下,如果当前的工作线程由于抛出用户异常被终结,那么会新创建一个非核心线程。如果当前的工作线程并不是抛出用户异常被终结(正常情况下的终结),那么会这样处理:

  • allowCoreThreadTimeOut为true,也就是允许核心线程超时的前提下,如果任务队列空,则会通过创建一个非核心线程保持线程池中至少有一个工作线程。
  • allowCoreThreadTimeOut为false,如果工作线程总数大于corePoolSize则直接返回,否则创建一个非核心线程,也就是会趋向于保持线程池中的工作线程数量趋向于corePoolSize。

processWorkerExit()执行完毕之后,意味着该工作线程的生命周期已经完结。


尝试终结当前线程池

每个工作线程终结的时候都会调用tryTerminate()方法,该方法判断是否应该设置当前线程池状态为终结态。

代码语言:javascript
复制
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 是否不满足终结当前线程池的条件
        // 1.线程池处于RUNNING状态
        // 2.线程池至少为TIDYING状态,也就是TIDYING或者TERMINATED状态,意味着已经走到了下面的步骤,线程池即将终结
        // 3.线程池至少为STOP状态并且任务队列不为空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateLessThan(c, STOP) && ! workQueue.isEmpty()))
            return;
        // 工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // CAS设置线程池状态为TIDYING,如果设置成功则执行钩子方法terminated()
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
                    // 最后更新线程池状态为TERMINATED
                    ctl.set(ctlOf(TERMINATED, 0));
                    // 唤醒阻塞在termination条件的所有线程,这个变量的await()方法在awaitTermination()中调用
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

// 中断空闲的工作线程,onlyOne为true的时候,只会中断工作线程集合中的某一个线程
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 这里判断线程不是中断状态并且尝试获取锁成功的时候才进行线程中断
            //获取锁成功,意味着当前线程正在阻塞获取任务
            //线程执行任务的过程中,会持有锁,因此不会被判定处于空闲状态
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    //此时打断该线程,会再次在getTask方法内部执行循环体
                    //如果获取任务条件不再满足就返回null,然后被打断的线程也会进入结束状态
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            // 这里跳出循环,也就是只中断集合中第一个工作线程
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

这里有疑惑的地方是tryTerminate()方法的第二个if代码逻辑:工作线程数不为0,则中断工作线程集合中的第一个空闲的工作线程。方法API注释中有这样一段话:

代码语言:javascript
复制
If otherwise eligible to terminate but workerCount is nonzero, 
interrupts an idle worker to ensure that shutdown signals propagate.
当满足终结线程池的条件但是工作线程数不为0,
这个时候需要中断一个空闲的工作线程去确保线程池关闭的信号得以传播。

下面将会分析的shutdown()方法中会通过interruptIdleWorkers()中断所有的空闲线程,这个时候有可能有非空闲的线程在执行某个任务,执行任务完毕之后,如果它刚好是核心线程,就会在下一轮循环阻塞在任务队列的take()方法,如果不做额外的干预,它甚至会在线程池关闭之后永久阻塞在任务队列的take()方法中。为了避免这种情况,每个工作线程退出的时候都会尝试中断工作线程集合中的某一个空闲的线程,确保所有空闲的线程都能够正常退出。

interruptIdleWorkers()方法中会对每一个工作线程先进行tryLock()判断,只有返回true才有可能进行线程中断。我们知道runWorker()方法中,工作线程在每次从任务队列中获取到非null的任务之后,会先进行加锁Worker#lock()操作,这样就能避免线程在执行任务的过程中被中断,保证被中断的一定是空闲的工作线程。


手动调用关闭线程池

线程池关闭操作有几个相关的变体方法,先看shutdown():

代码语言:javascript
复制
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 权限校验,安全策略相关判断
        checkShutdownAccess();
        // 设置SHUTDOWN状态
        advanceRunState(SHUTDOWN);
        // 中断所有的空闲的工作线程
        interruptIdleWorkers();
        // 钩子方法
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    // 调用上面分析果敢的尝试terminate方法,使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED
    tryTerminate();
}

// 升提状态
private void advanceRunState(int targetState) {
    // assert targetState == SHUTDOWN || targetState == STOP;
    for (;;) {
        int c = ctl.get();
        // 线程池状态至少为targetState或者CAS设置状态为targetState则跳出循环
        if (runStateAtLeast(c, targetState) ||
            ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
            break;
    }
}

// 中断所有的空闲的工作线程
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

接着看shutdownNow()方法:

代码语言:javascript
复制
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 权限校验,安全策略相关判断
        checkShutdownAccess();
        // 设置STOP状态
        advanceRunState(STOP);
        // 中断所有的工作线程
        interruptWorkers();
        // 清空工作队列并且取出所有的未执行的任务
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
     // 调用上面分析果敢的尝试terminate方法,使状态更变为TIDYING,执行钩子方法terminated()后,最终状态更新为TERMINATED
    tryTerminate();
    return tasks;
}

// 遍历所有的工作线程,如果state > 0(启动状态)则进行中断
private void interruptWorkers() {
    // assert mainLock.isHeldByCurrentThread();
    for (Worker w : workers)
        w.interruptIfStarted();
}

shutdownNow()方法会把线程池状态先更变为STOP,中断所有的工作线程(AbstractQueuedSynchronizer的state值大于0的Worker实例,也就是包括正在执行任务的Worker和空闲的Worker),然后遍历任务队列,取出(移除)所有任务存放在一个列表中返回。


最后看awaitTermination()方法:

代码语言:javascript
复制
public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    // 转换timeout的单位为纳秒
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 循环等待直到线程池状态更变为TERMINATED,每轮循环等待nanos纳秒
        while (runStateLessThan(ctl.get(), TERMINATED)) {
            if (nanos <= 0L)
                return false;
            nanos = termination.awaitNanos(nanos);
        }
        return true;
    } finally {
        mainLock.unlock();
    }
}

awaitTermination()虽然不是shutdown()方法体系,但是它的处理逻辑就是确保调用此方法的线程会阻塞到tryTerminate()方法成功把线程池状态更新为TERMINATED后再返回,可以使用在某些需要感知线程池终结时刻的场景。

有一点值得关注的是:shutdown()方法只会中断空闲的工作线程,如果工作线程正在执行任务对象Runnable#run(),这种情况下的工作线程不会中断,而是等待下一轮执行getTask()方法的时候通过线程池状态判断正常终结该工作线程。


可重入锁mainLock成员变量

代码语言:javascript
复制
private final ReentrantLock mainLock = new ReentrantLock();
private final Condition termination = mainLock.newCondition();

先看了ThreadPoolExecutor内部成员属性mainLock的引用情况:

在这里插入图片描述
在这里插入图片描述

归结一下mainLock的使用场景:

在这里插入图片描述
在这里插入图片描述

这里分析一下线程池如何通过可重入锁和条件变量实现相对优雅地关闭。先看shutdown()方法:

代码语言:javascript
复制
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN);
        interruptIdleWorkers();
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}

这里shutdown()中除了tryTerminate(),其他它方法都是包裹在锁里面执行,确保工作线程集合稳定性以及关闭权限、确保状态变更串行化,中断所有工作线程并且避免工作线程”中断风暴”(多次并发调用shutdown()如果不加锁,会反复中断工作线程)。

代码语言:javascript
复制
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();  # <---  多了这一步
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

shutdownNow()方法其实加锁的目的和shutdown()差不多,不过多了一步:导出任务队列中的剩余的任务实例列表。awaitTermination()方法中使用到前面提到过的条件变量termination:

代码语言:javascript
复制
// 条件变量必须在锁代码块中执行,和synchronized关键字用法差不多
public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 死循环确保等待执行和状态变更为TERMINATED
        while (runStateLessThan(ctl.get(), TERMINATED)) {
            if (nanos <= 0L)
                return false;
            nanos = termination.awaitNanos(nanos);   # <-- 确保当前调用线程阻塞等待对应的时间或者线程池状态变更为TERMINATED,再退出等待
        }
        return true;
    } finally {
        mainLock.unlock();
    }
}

awaitTermination()方法的核心功能是:确保当前调用awaitTermination()方法的线程阻塞等待对应的时间或者线程池状态变更为TERMINATED,再退出等待返回结果,这样能够让使用者输入一个可以接受的等待时间进行阻塞等待,或者线程池在其他线程中被调用了shutdown()方法状态变更为TERMINATED就能正常解除阻塞。awaitTermination()方法的返回值为布尔值,true代表线程池状态变更为TERMINATED或者等待了输入时间范围内的时间周期被唤醒,意味则线程池正常退出,结果为false代表等待了超过输入时间范围内的时间周期,线程池的状态依然没有更变为TERMINATED。


reject拒绝任务方法

reject(Runnable command)方法很简单:

代码语言:javascript
复制
final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}

调用线程池持有的成员RejectedExecutionHandler实例回调任务实例和当前线程池实例。


钩子方法分析

到JDK11为止,ThreadPoolExecutor提供的钩子方法没有增加,有以下几个:

  • beforeExecute(Thread t, Runnable r):任务对象Runnable#run()执行之前触发回调。
  • afterExecute(Runnable r, Throwable t):任务对象Runnable#run()执行之后(包括异常完成情况和正常完成情况)触发回调。
  • terminated():线程池关闭的时候,状态更变为TIDYING成功之后会回调此方法,执行此方法完毕后,线程池状态会更新为TERMINATED。
  • onShutdown():shutdown()方法执行时候会回调此方法,API注释中提到此方法主要提供给ScheduledThreadPoolExecutor使用。

其中onShutdown()的方法修饰符为default,其他三个方法的修饰符为protected,必要时候可以自行扩展这些方法,可以实现监控、基于特定时机触发具体操作等等。


其他方法

线程池本身提供了大量数据统计相关的方法、扩容方法、预创建方法等等,这些方法的源码并不复杂,这里不做展开分析。

核心线程相关:

  • getCorePoolSize():获取核心线程数。
  • setCorePoolSize():重新设置线程池的核心线程数。
  • prestartCoreThread():预启动一个核心线程,当且仅当工作线程数量小于核心线程数量。
  • prestartAllCoreThreads():预启动所有核心线程。

线程池容量相关:

  • getMaximumPoolSize():获取线程池容量。
  • setMaximumPoolSize():重新设置线程池的最大容量。

线程存活周期相关:

  • setKeepAliveTime():设置空闲工作线程的存活周期。
  • getKeepAliveTime():获取空闲工作线程的存活周期。

其他监控统计相关方法:

  • getTaskCount():获取所有已经被执行的任务总数的近似值。
  • getCompletedTaskCount():获取所有已经执行完成的任务总数的近似值。
  • getLargestPoolSize():获取线程池的峰值线程数(最大池容量)。
  • getActiveCount():获取所有活跃线程总数(正在执行任务的工作线程)的近似值。
  • getPoolSize():获取工作线程集合的容量(当前线程池中的总工作线程数)。

任务队列操作相关方法:

  • purge():移除任务队列中所有是Future类型并且已经处于Cancelled状态的任务。
  • remove():从任务队列中移除指定的任务。
  • BlockingQueue< Runnable > getQueue():获取任务队列的引用。

有部分属性值的设置有可能影响到线程池中的状态或者工作线程的增减等,例如核心线程数改变,有可能会直接增减Worker,这里就以ThreadPoolExecutor#setCorePoolSize()为例:

代码语言:javascript
复制
// 设置核心线程数量
public void setCorePoolSize(int corePoolSize) {
    // 输入值不能小于0或者大于线程池的容量
    if (corePoolSize < 0 || maximumPoolSize < corePoolSize)
        throw new IllegalArgumentException();
    // delta = 传入核心线程数和现存的核心线程数的差值
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    // 如果当前线程池工作线程的总量大于传入核心线程数,则中断所有的工作线程
    if (workerCountOf(ctl.get()) > corePoolSize)
        //getTask方法内部会动态的调整线程数量向核心线程数靠近
        interruptIdleWorkers();
    else if (delta > 0) {
        // 传入核心线程数和现存的核心线程数的差值大于0,也就是核心线程扩容
        // 计算传入核心线程数和现存的核心线程数的差值和任务队列中任务个数的最小值,并且添加这个最小值个数的工作线程池
        // 任务队列为空的情况下,k === 0,此时第一个条件 k--> 0就不满足,不会进入循环,那么这delta个需要创建的工作线程应该是在提交新任务的时候懒创建
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            // 如果任务队列为空,则跳出循环
            if (workQueue.isEmpty())
                break;
        }
    }
}

这里else if (delta > 0)后面的代码块中有一段描述,翻译一下:我们并不知道真正情况下”需要”多少新的工作线程。作为一种启发式处理方式,预先启动足够多的新的工作线程(直到数量为核心线程池大小)来处理队列中当前的任务,但如果在这样做时队列变为空,则停止创建新的工作线程。


参考

硬核干货:4W字从源码上分析JUC线程池ThreadPoolExecutor的实现原理

深度解读 java 线程池设计思想及源码实现

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-10-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ThreadPoolExecutor线程池设计思路
  • 引言
  • ThreadPoolExecutor
    • 线程池相关属性
      • 线程池状态记录
        • execute执行任务
          • 工作线程抽象为Worker
            • execute中添加工作线程
              • Worker是如何run的
                • Worker从工作队列获取任务
                  • Worker收尾工作
                    • 尝试终结当前线程池
                      • 手动调用关闭线程池
                        • 可重入锁mainLock成员变量
                          • reject拒绝任务方法
                            • 钩子方法分析
                              • 其他方法
                              • 参考
                              相关产品与服务
                              NAT 网关
                              NAT 网关(NAT Gateway)提供 IP 地址转换服务,为腾讯云内资源提供高性能的 Internet 访问服务。通过 NAT 网关,在腾讯云上的资源可以更安全的访问 Internet,保护私有网络信息不直接暴露公网;您也可以通过 NAT 网关实现海量的公网访问,最大支持1000万以上的并发连接数;NAT 网关还支持 IP 级流量管控,可实时查看流量数据,帮助您快速定位异常流量,排查网络故障。
                              领券
                              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档