专栏首页xiaoheikeThreadPoolExecutor 优雅关闭线程池的原理.md

ThreadPoolExecutor 优雅关闭线程池的原理.md

经典关闭线程池代码

ExecutorService executorService = Executors.newFixedThreadPool(10);

executorService.shutdown();
while (!executorService.awaitTermination(10, TimeUnit.SECONDS)) {
    System.out.println("线程池中还有任务在处理");
}

shutdown 做了什么?

先上源码

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 线程安全性检查
        checkShutdownAccess();
        // 更新线程池状态为 SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 尝试关闭空闲线程
        interruptIdleWorkers();
        // 空实现
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    // 尝试中止线程池
    tryTerminate();
}

每个方法都有特定的目的,其中 checkShutdownAccess()advanceRunState(SHUTDOWN)比较简单,所以这里不再描述了,而 interruptIdleWorkers()tryTerminate()

interruptIdleWorkers 做了什么?

关闭当前空闲线程。

onlyOne = true:至多关闭一个空闲worker,可能关闭0个。

onlyOne = false:遍历所有的worker,只要是空闲的worker就关闭。

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 尝试获得锁,如果获得锁则表明该worker是空闲状态
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

w.tryLock():该方法并不会阻塞,尝试一次如果不成功就返回false,成功则放回true。

在方法 runWorker 中一旦work获得了任务,就会调用调用了 w.lock(),从而倘若 worker 是无锁状态,就是空闲状态。

因此 Worker 之所以继承 AbstractQueuedSynchronizer 实际上是为了达到用无锁,有锁标识worker空闲态与忙碌态,从而方便控制worker的销毁操作。

tryTerminate 做了什么?

这个只是尝试将线程池的状态置为 TERMINATE 态,如果还有worker在执行,则尝试关闭一个worker。

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) || // 是运行态
            runStateAtLeast(c, TIDYING) ||  // 是 TIDYING 或者 TERMINATE 态
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // SHUTDOWN 态且队列中有任务
            return;
        if (workerCountOf(c) != 0) {
            // 还存在 worker,则尝试关闭一个worker
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 通过CAS,先置为 TIDYING 态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                // TIDYING 态
                try {
                    // 空方法
                    terminated();
                } finally {
                    // 最终更新为 TERMINATED 态
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

通过如下的方法简单做一些过滤。因为状态是从 TIDYING 态往后才到 TERMINATE 态。从这个过滤条件,可以看出如果是STOP态也是会通过的。不过如果线程池到了STOP应该就不会再使用了吧,所以也是不会有什么影响。

if (isRunning(c) || // 是运行态
runStateAtLeast(c, TIDYING) ||  // 是 TIDYING 或者 TERMINATE 态
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // SHUTDOWN 态且队列中有任务
    return;

如果该线程池中有worker,则中止1个worker。通过上面的过滤条件,到了这一步,那么肯定是 SHUTDOWN 态(忽略 STOP 态),并且任务队列已经被处理完成。

if (workerCountOf(c) != 0) {
    // 还存在 worker,则尝试关闭一个worker
    interruptIdleWorkers(ONLY_ONE);
    return;
}

如果该线程池中有多个worker,终止1个worker之后 tryTerminate() 方法就返回了,那么剩下的worker在哪里被处理的呢?(看后面的解答)

假设线程池中的worker都已经关闭并且队列中也没有任务,那么后面的代码将会将线程池状态置为 TERMINATE 态。terminate() 是空实现,用于有需要的自己实现处理,线程池关闭之后的逻辑。

awaitTermination 做了什么

这个方法只是判断当前线程池是否为 TERMINATED 态,如果不是则睡眠指定的时间,如果睡眠中途线程池变为终止态则会被唤醒。这个方法并不会处理线程池的状态变更的操作,纯粹是做状态的判断,所以得要在循环里边做判断。

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (;;) {
            if (runStateAtLeast(ctl.get(), TERMINATED)) // 是否为 TERMINATED 态
                return true;
            if (nanos <= 0)
                return false;
            nanos = termination.awaitNanos(nanos);  // native 方法,睡一觉
        }
    } finally {
        mainLock.unlock();
    }
}

问题

从 shutdown() 方法源码来看有很大概率没有完全线程池,而awaitTermination() 方法则只是判断线程池状态,并没有关闭线程池状态,那么剩下的worker什么时候促发关闭呢?关键代码逻辑在一个worker被关闭之后,触发了哪些事情。

runWorker

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 省略其他不需要的代码
           try{
               task.run();
            } finally {
                task = null;
                w.unlock();
            }
        }
        // 用于标识是否是task.run报错
        completedAbruptly = false;
    } finally {
        // 线程关闭的时候调用
        processWorkerExit(w, completedAbruptly);
    }
}

completedAbruptly:用于标识是否是task.run报错,如果值为 TRUE,则是task.run 异常;反之,则没有发生异常

从上面的核心代码来看,当一个worker被关闭之后会调用 processWorkerExit() 方法。看看它做了什么。

processWorkerExit 做了什么

对一个worker退出之后做善后工作,比如统计完成任务数,将线程池的关闭态传播下去,根据条件补充 worker。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }
    // 如果有worker则尝试关闭一个,否则置为TERMINATE态
    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

线程池关闭的关键就在于一个worker退出之后,会调用 tryTerminate() 方法,将退出的信号传递下去,这样其他的线程才能够被依次处理,最后线程池会变为 TERMINATE 态。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • mysql二进制文件操作语法(mysql binary log operate statements)

    在 mysql 配置文件中配置 log-bin,重启 mysql my.cnf (on Linux/unix) or my.ini (on Windows) ...

    用户3148308
  • Neo4j 查询已经创建的索引与约束

    在Neo4j 2.0之后为cypher语法增加了一些类似于DDL的语法,能够自己创建索引,约束等等。

    用户3148308
  • hibernate一级缓存

    Hibernate 一级缓存默认是打开,不需要任何的配置。实际上,你无法强制禁止它的使用。 如果你理解了一级缓存实际上和会话是关联的,就很容易理解一级缓存。总...

    用户3148308
  • Linkedin之后,这25个香饽饽科技公司即将被收购!

    并购市场着实是火了,的的确确On Fire了。就在前不久: Linkedin 262亿美元,Marketo 18亿美元,Demandware 28...

    人称T客
  • 7-51单片机ESP8266学习-AT指令(测试TCP服务器--51单片机程序配置8266,用手机TCP调试助手发信息给单片机控制小灯的亮灭)

    http://www.cnblogs.com/yangfengwu/p/8759294.html 如不做任何说明默认最头上的链接为上一篇链接,末尾的链接为下一篇...

    杨奉武
  • 两个字数字字符串求和

    付威
  • 初遇单例模式之双重检测

    为了让博客看起来不那么深入,我觉得可以让加入一点故事情节~ 锻炼一下以后写不动代码改写小说的能力~

    用户2141593
  • 为什么我抛弃了 Ubuntu?

    十多年来,我一直在使用Ubuntu的Linux发行版。但是,经过这么长时间以后,我第一次改变了心意。如今我开始使用Manjaro,而且感觉非常好!

    小小詹同学
  • 为什么我抛弃了 Ubuntu?

    Ubuntu 是很多开发者优先选择的 Linux 发行版之一,但是本文的作者在使用了十年之后却最终“抛弃”它转投“他人”怀抱。原因何在?

    程序员小强
  • 浪潮将并购哪些厂商实现国产一体化 IBM无奈拥抱倍受打击

    十月一前T哥参加了一场浪潮“K迁工程”全国巡展行”,浪潮对外正式推出面向关键应用整合的主机新品-天梭K1 910,天梭K1 910的问世使得浪潮主机业务完成了从...

    人称T客

扫码关注云+社区

领取腾讯云代金券