前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【源码阅读计划】浅析 Java 线程池工作原理及核心源码

【源码阅读计划】浅析 Java 线程池工作原理及核心源码

作者头像
yhlin
发布2023-02-27 17:01:10
3530
发布2023-02-27 17:01:10
举报
文章被收录于专栏:yhlin's blogyhlin's blog

为什么要用线程池?

  1. 降低资源消耗:通过重复利用现有的线程来执行任务,避免多次创建和销毁线程。
  2. 提高相应速度:因为省去了创建线程这个步骤,所以在拿到任务时,可以立刻开始执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  4. 提供附加功能:线程池的可拓展性使得我们可以自己加入新的功能,比如说定时、延时来执行某些线程。

线程池的设计

【源码阅读计划】浅析 Java 线程池工作原理及核心源码
【源码阅读计划】浅析 Java 线程池工作原理及核心源码

如上图所示,本文试图回答几个问题:

  1. 线程池如何维护自身状态(表示、获取、转移)?
  2. 线程池如何管理任务(任务获取,分配)?
  3. 线程池如何管理线程(表示、创建、执行任务、回收)?

线程池如何维护自身状态?

在 JDK 的 ThreadPoolExecutor 线程池中用一个原子整型来维护线程池的两个状态参数:

代码语言:javascript
复制
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) {return rs | wc;} // rs: runState, wc: workerCount

ctl 的高 3 位被用来表示线程池运行状态 runState, 其余 29 位用来表示线程池中的线程数量 workerCount

代码语言:javascript
复制
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
  1. RUNNING:能接受新提交的任务,并且也能处理阻塞队列中的任务;
  2. SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用 shutdown()方法进入该状态);
  3. STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
  4. TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为 0,线程池进入该状态后会调用 terminated() 方法进入 TERMINATED 状态。
  5. TERMINATED:在 terminated() 方法执行完后进入该状态,默认 terminated()方法中什么也没有做。 进入 TERMINATED 的条件如下:
    • 线程池不是 RUNNING 状态;
    • 线程池状态不是 TIDYING 状态或 TERMINATED 状态;
    • 如果线程池状态是 SHUTDOWN 并且 workerQueue 为空;
    • workerCount 为 0;
    • 设置 TIDYING 状态成功

五大状态的轮转过程:

【源码阅读计划】浅析 Java 线程池工作原理及核心源码
【源码阅读计划】浅析 Java 线程池工作原理及核心源码

二者分别通过下面两个函数获取:

代码语言:javascript
复制
    private static int runStateOf(int c)     {return c & ~CAPACITY;}
    private static int workerCountOf(int c)  {return c & CAPACITY;}

线程池如何管理任务?

如图 1 所示,当用户提交一个任务时,线程池应该根据其状态做出不同的响应,对应的函数为 execute() 函数:

代码语言:javascript
复制
    public void execute(Runnable command) {if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();                                  // 获取状态表示
        if (workerCountOf(c) < corePoolSize) {               // 1. 如果当前线程数小于核心线程数,直接新建线程执行任务
            if (addWorker(command, true))
                return;
            c = ctl.get();}
        if (isRunning(c) && workQueue.offer(command)) {     // 2. 如果核心线程数已满,且是运行状态并且队列未满,添加任务至队列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))    // 再次检查运行状态,如果不是运行状态就从队列中删除任务,删除成功后执行拒绝策略,因为此时线程池状态不是 RUNNING
                reject(command);
            else if (workerCountOf(recheck) == 0)           // 如果当前线程数为 0,而我们又刚刚添加了一个任务,就新建一个空任务的线程,它会去轮询任务队列执行刚刚新增的任务
                addWorker(null, false);
        }
        else if (!addWorker(command, false))                // 添加失败,执行拒绝策略
            reject(command);
    }
execute 函数执行过程(分配)
  1. 首先检测线程池运行状态,如果不是 RUNNING,则直接拒绝,线程池要保证在 RUNNING 的状态下执行任务。
  2. 如果 workerCount < corePoolSize,则 创建 并启动一个线程来执行新提交的任务。
  3. 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务 添加 到该阻塞队列中。
  4. 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则 创建 并启动一个线程来执行新提交的任务。
  5. 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据 拒绝 策略来处理该任务, 默认的处理方式是直接抛异常。

这里有一点要注意,就是在将任务添加到队列中后,做了一个 recheck,这是因为在往阻塞队列中添加任务地时候,有可能阻塞队列已满,需要等待其他的任务移出队列,在这个过程中,线程池的状态可能会发生变化,所以需要double check

getTask 函数(获取)
代码语言:javascript
复制
/**
 * Performs blocking or timed wait for a task, depending on
 * current configuration settings, or returns null if this worker
 * must exit because of any of:
 * 1. There are more than maximumPoolSize workers (due to
 *    a call to setMaximumPoolSize).
 * 2. The pool is stopped.
 * 3. The pool is shutdown and the queue is empty.
 * 4. This worker timed out waiting for a task, and timed-out
 *    workers are subject to termination (that is,
 *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
 *    both before and after the timed wait, and if the queue is
 *    non-empty, this worker is not the last thread in the pool.
 *
 * @return task, or null if the worker must exit, in which case
 *         workerCount is decremented
 */
private Runnable getTask() {
    boolean timedOut = false; // 最近一次从阻塞队列中获取任务是否超时?for (;;) {int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 为 true 的情况:// 1. 线程池为非 RUNNING 状态 且线程池正在停止
        // 2. 线程池状态为非 RUNNING 状态 且阻塞队列为空
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();     // 将 workCount 减 1
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // timed 变量用于判断是否需要进行超时控制。// allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;// wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;// 对于超过核心线程数量的这些线程,需要进行超时控制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /*
         * wc > maximumPoolSize 的情况是因为可能在此方法执行阶段同时执行了 setMaximumPoolSize 方法;* timed && timedOut 如果为 true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
         * 超时说明队列中获取不到任务,即不需要这么多线程,因此可以适当减少非核心线程
         * 接下来判断,如果有效线程数量大于 1,或者阻塞队列是空的,那么尝试将 workerCount 减 1;* 如果减 1 失败,则返回重试。* 如果 wc == 1 时,也就说明当前线程是线程池中唯一的一个线程了。*/
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c)) // 减少非核心线程数量
                return null;
            continue;   // 重试
        }

        try {
            // 从阻塞队列获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // poll:等待 keepAliveTime, 若队列为空,返回 null
                workQueue.take(); // take: 若队列为空,直接阻塞 
            if (r != null)
                return r;
            timedOut = true;  // r 为空,表示超时了,返回循环重试
        } catch (InterruptedException retry) {timedOut = false; // 如果获取任务时当前线程发生了中断,则设置 timedOut 为 false 并返回循环重试}
    }
}

这里重要的地方是 第二个 if 判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行 execute 方法时,如果当前线程池的线程数量超过了 corePoolSize 且小于 maximumPoolSize,并且 workQueue 已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是 timedOut 为 true 的情况,说明 workQueue 已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多余的非核心线程销毁掉,保持线程数量在 corePoolSize 即可。

线程池如何管理线程?

Worker 类(表示)

Worker 类继承了 AbstractQueuedSynchronizer 类并且实现了 Runnable 接口。之所以继承 AbstractQueuedSynchronizer 类是因为线程池有一个需求是要获取线程的运行状态(工作中,空闲中)。Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现呢?可以看到 tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的。

代码语言:javascript
复制
/**
 * Class Worker mainly maintains interrupt control state for
 * threads running tasks, along with other minor bookkeeping.
 * This class opportunistically extends AbstractQueuedSynchronizer
 * to simplify acquiring and releasing a lock surrounding each
 * task execution.  This protects against interrupts that are
 * intended to wake up a worker thread waiting for a task from
 * instead interrupting a task being run.  We implement a simple
 * non-reentrant mutual exclusion lock rather than use
 * ReentrantLock because we do not want worker tasks to be able to
 * reacquire the lock when they invoke pool control methods like
 * setCorePoolSize.  Additionally, to suppress interrupts until
 * the thread actually starts running tasks, we initialize lock
 * state to a negative value, and clear it upon start (in
 * runWorker).
 */
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;                                                // 持有的线程
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;                                                 // 第一次执行的任务
    /** Per-thread task counter */
    volatile long completedTasks;                                       // 完成的任务数量

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {setState(-1); // state 默认值设为 -1,控制未执行的新建线程不该被中断
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // 值为 0 表示未加锁状态(线程空闲)
    // The value 1 represents the locked state.
    // 值为 1 表示锁定状态(线程忙)protected boolean isHeldExclusively() {     // 判断是否被锁定(线程正在执行任务), 返回 true 表示加锁(排他的)return getState() != 0;}

    protected boolean tryAcquire(int unused) {  // 尝试获取独占锁锁
        if (compareAndSetState(0, 1)) {         // state 为 0 才会成功,不允许重入
            setExclusiveOwnerThread(Thread.currentThread());    // 设置当前线程占有锁
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {  // 尝试释放锁
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }                   // 获取独占锁,acuire 会调用 tryAcquire,tryAcquire 失败会中断线程
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }                   // 释放独占锁,runWorker 中用来设置允许中断(state+1=0)public boolean isLocked() { return isHeldExclusively(); }   // 检查是否被加锁

    void interruptIfStarted() { // 中断线程
        Thread t;
        // 判断是否可以中断线程:// 线程状态不是 -1(新建状态)且不为空且未被中断,就可以中断线程
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {t.interrupt();                      // 中断线程
            } catch (SecurityException ignore) {}}
    }
}

上述代码可以实现:

  1. lock 方法一旦获取了独占锁,表示当前线程正在执行任务中 (runWorker 函数在取到任务后会执行 lock() 方法后执行任务);
  2. 如果正在执行任务(state = 1),则不应该中断线程;
  3. 如果该线程现在不是独占锁的状态,也就是空闲(state = 0)的状态,说明它没有在处理任务,这时可以对该线程进行中断;
  4. 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;
  5. 之所以设置为不可重入,是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁。如果使用 ReentrantLock,它是可重入的,这样如果在任务中调用了如 setCorePoolSize 这类线程池控制的方法,会中断正在运行的线程;
addWorker 函数(创建)

addWorker 函数的作用是新建一个线程,其源码如下:

代码语言:javascript
复制
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /*
         * 这个 if 判断
         * 如果 rs >= SHUTDOWN,则表示此时不再接收新任务;* 接着判断以下 3 个条件,只要有 1 个不满足,则返回 false:*  1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
         *  2. firsTask 为空
         *  3. 阻塞队列不为空
         * 
         * 首先考虑 rs == SHUTDOWN 的情况;* 这种情况下不会接受新提交的任务,所以在 firstTask 不为空的时候会返回 false;* 然后,如果 firstTask 为空,并且 workQueue 也为空,则返回 false,* 因为队列中已经没有任务了,不需要再添加线程了
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;                                                   // 添加失败

        for (;;) {int wc = workerCountOf(c); 
            // 如果 wc 超过 CAPACITY,也就是 ctl 的低 29 位的最大值(二进制是 29 个 1),返回 false;// 这里的 core 是 addWorker 方法的第二个参数,如果为 true 表示根据 corePoolSize 来比较,若为 false 则根据 maximumPoolSize 来比较。if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))               // 参数 core 在此作用
                return false;
            if (compareAndIncrementWorkerCount(c))                          // CAS 尝试修改 workerCount
                break retry;                                                // 修改成功,退出 retry 代码块
            c = ctl.get();  // Re-read ctl                                  // 修改失败,重新获取 ctl
            if (runStateOf(c) != rs)                                        // 线程池运行状态发生改变,重新执行外层 for 循环
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 修改成功,执行新建线程操作
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {w = new Worker(firstTask);                                      // 新建 Worker 对象
        final Thread t = w.thread;                                      // 每个 Worker 对象都持有一个线程, 由线程工厂创建
        if (t != null) {                                                // 线程不为空, 互斥添加 Worker 对象
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();                                            // 加锁
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // rs < SHUTDOWN 表示是 RUNNING 状态;// 如果 rs 是 RUNNING 状态或者 rs 是 SHUTDOWN 状态并且 firstTask 为 null,向线程池中添加线程
                // 因为在 SHUTDOWN 时不会在添加新的任务,但还是会执行 workQueue 中的任务,所以新增一个无任务的线程可以让其从队列中获取任务
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);                                     // workers 是一个 HashSet 负责管理 Worker 对象
                    int s = workers.size();
                    if (s > largestPoolSize)                         // 记录线程池中出现的最大的线程数量             
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {mainLock.unlock();                                      // 解锁
            }
            if (workerAdded) {                                          // Worker 对象添加成功,立即执行线程
                t.start();                                              // 启动时会调用 Worker 类中的 run 方法,Worker 本身实现了 Runnable 接口,所以一个 Worker 类型的对象也是一个线程。workerStarted = true;
            }
        }
    } finally {if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
runWorker 函数(执行与回收)
代码语言:javascript
复制
/**
 * Main worker run loop.  Repeatedly gets tasks from queue and
 * executes them, while coping with a number of issues:
 *
 * 1. We may start out with an initial task, in which case we
 * don't need to get the first one. Otherwise, as long as pool is
 * running, we get tasks from getTask. If it returns null then the
 * worker exits due to changed pool state or configuration
 * parameters.  Other exits result from exception throws in
 * external code, in which case completedAbruptly holds, which
 * usually leads processWorkerExit to replace this thread.
 *
 * 2. Before running any task, the lock is acquired to prevent
 * other pool interrupts while the task is executing, and then we
 * ensure that unless pool is stopping, this thread does not have
 * its interrupt set.
 *
 * 3. Each task run is preceded by a call to beforeExecute, which
 * might throw an exception, in which case we cause thread to die
 * (breaking loop with completedAbruptly true) without processing
 * the task.
 *
 * 4. Assuming beforeExecute completes normally, we run the task,
 * gathering any of its thrown exceptions to send to afterExecute.
 * We separately handle RuntimeException, Error (both of which the
 * specs guarantee that we trap) and arbitrary Throwables.
 * Because we cannot rethrow Throwables within Runnable.run, we
 * wrap them within Errors on the way out (to the thread's
 * UncaughtExceptionHandler).  Any thrown exception also
 * conservatively causes thread to die.
 *
 * 5. After task.run completes, we call afterExecute, which may
 * also throw an exception, which will also cause thread to
 * die. According to JLS Sec 14.20, this exception is the one that
 * will be in effect even if task.run throws.
 *
 * The net effect of the exception mechanics is that afterExecute
 * and the thread's UncaughtExceptionHandler have as accurate
 * information as we can provide about any problems encountered by
 * user code.
 *
 * @param w the worker
 */
final void runWorker(Worker w) {Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts 设置为允许中断
    boolean completedAbruptly = true;   // 异常退出标志
    try {while (task != null || (task = getTask()) != null) {        // getTask 轮询阻塞队列
            w.lock();       // 加锁
            /*
            * 3 个判断:
                * 1、runStateAtLeast(ctl.get(), STOP)为真说明当前状态大于等于 STOP 此时需要给他一个中断信号
                * 2、wt.isInterrupted()查看当前是否设置中断状态如果为 false 则说明为设置中断状态
                * 3、Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 获取当前中断状态且清除中断状态 
                *    这个判断为真的话说明当前被设置了中断状态 (有可能是线程池执行的业务代码设置的,然后重置了) 且当前状态变成了大于等于 STOP 的状态了
                * 
             * 判断为真的两种情况:
                * 1、如果当前线程大于等于 STOP 且未设置中断状态 整个判断为 true 第一个 runStateAtLeast(ctl.get(), STOP)为 true !wt.isInterrupted()为 true
                * 2、第一次判断的时候不大于 STOP 且当前设置了中断状态 (Thread.interrupted() 把中断状态又刷新了) 且设置完了之后线程池状态大于等于 STOP 了
                *    Thread.interrupted() && runStateAtLeast(ctl.get(), STOP) 为 true !wt.isInterrupted()为 true
                *
            */
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) 
                && !wt.isInterrupted())
                wt.interrupt();         // 设置中断
            try {beforeExecute(wt, task);
                Throwable thrown = null;
                try {task.run();
                } catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);
                } finally {afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();         // 释放独占锁}
        }
        completedAbruptly = false;  // 若 while 循环中抛出异常这句就不会被执行,表示为异常退出循环
    } finally {processWorkerExit(w, completedAbruptly);
    }
}

执行流程:

  1. while 循环通过 getTask 函数不断地从阻塞队列中获取任务;
  2. if 判断:
    • 如果线程池状态大于等于 STOP(正在停止)则设置当前线程的中断状态(保证当前线程中断)
    • 如果线程池状态小于 STOP 则清除中断状态(保证当前线程不中断)
  3. 调用 task.run() 方法执行任务;
  4. 如果 task == null, 跳出 while 循环,执行回收函数销毁线程;
processWorkerExit 函数(销毁)
代码语言:javascript
复制
/**
 * Performs cleanup and bookkeeping for a dying worker. Called
 * only from worker threads. Unless completedAbruptly is set,
 * assumes that workerCount has already been adjusted to account
 * for exit.  This method removes thread from worker set, and
 * possibly terminates the pool or replaces the worker if either
 * it exited due to user task exception or if fewer than
 * corePoolSize workers are running or queue is non-empty but
 * there are no workers.
 *
 * @param w the worker
 * @param completedAbruptly if the worker died due to user exception
 */
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果 completedAbruptly 值为 true,则说明线程执行时出现了异常,需要将 workerCount 减 1;// 如果线程执行时没有出现异常,说明在 getTask()方法中已经已经对 workerCount 进行了减 1 操作,这里就不必再减了。if (completedAbruptly) 
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks; // 统计线程池完成任务数量
        workers.remove(w);                      // 从线程池中移除线程 Worker 对象引用
    } finally {mainLock.unlock();
    }

    tryTerminate(); // 根据线程池状态判断是否结束线程池

    int c = ctl.get();
    // 当线程池状态为 RUNNING 或 SHUTDOWN 时
    // 如果任务为异常结束 completedAbruptly=true, 直接 addWorker 新建线程;
    // 如果 allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个 worker;// 如果 allowCoreThreadTimeOut=false,workerCount 不少于 corePoolSize。if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min) // 判断当前有效线程是否大于 1,大于的话直接 return,否则会执行 addWorker 函数新建一个线程。return; // replacement not needed
        }
        addWorker(null, false);
    }
}

执行流程:

判断是否为异常退出,如果是说明线程执行时出现了异常,需要建 workerCount 减 1;

统计线程池完成任务数量,将 Worker 引用从 HashSet 中移除(会被 jvm 回收),相当于销毁线程;

根据线程池状态判断是否结束线程池;

当线程池状态为 RUNNING 或 SHUTDOWN 时:

如果任务为异常结束:

代码语言:javascript
复制
1. 如果允许核心线程超时,并且阻塞队列中有任务,至少保留一个线程
2. 如果不允许核心线程超时,且 workerCount 不少于 corePoolSize,直接返回。否则新建线程

一个小 Demo

代码语言:javascript
复制
package ThreadPool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author Lin YuHang
 * @date 2022/12/1 16:08
*/
public class ThreadPoolDemo {public static void main(String[] args) {
        final int taskCount = 50;
        AtomicInteger integer = new AtomicInteger(0);
        // 初始化线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,
                30,
                5,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(30));
        System.out.println(" 总任务数:" + taskCount);
        long start = System.currentTimeMillis();
        // 任务提交
        for (int i = 0; i < taskCount; i++) {Thread thread = new Thread(() -> {
                try {Thread.sleep(500);// 模拟执行耗时
                    System.out.println(" 已执行 " + integer.addAndGet(1) + " 个任务 ");
                } catch (InterruptedException e) {e.printStackTrace();
                }
            });
            try {
                // 注意这里我 try 起来了,默认拒绝策略会报错
                executor.execute(thread);
            } catch (Exception e) {System.out.println(e.getMessage());
            }
        }
        long end = 0;
        while (executor.getCompletedTaskCount() < 50) {end = System.currentTimeMillis();
        }
        System.out.println(" 任务总耗时:" + (end - start));
    }
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-01-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么要用线程池?
  • 线程池的设计
    • 线程池如何维护自身状态?
      • 线程池如何管理任务?
        • execute 函数执行过程(分配)
        • getTask 函数(获取)
      • 线程池如何管理线程?
        • Worker 类(表示)
        • addWorker 函数(创建)
        • runWorker 函数(执行与回收)
        • processWorkerExit 函数(销毁)
    • 一个小 Demo
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档