线程池 execute() 的工作逻辑

最近在看《Java并发编程的艺术》回顾线程池的原理和参数的时候发现一个问题,如果 corePoolSize = 0 且 阻塞队列是无界的。线程池将如何工作?

我们先回顾一下书里面描述线程池execute()工作的逻辑:

  1. 如果当前运行的线程,少于corePoolSize,则创建一个新的线程来执行任务。
  2. 如果运行的线程等于或多于 corePoolSize,将任务加入 BlockingQueue。
  3. 如果 BlockingQueue 内的任务超过上限,则创建新的线程来处理任务。
  4. 如果创建的线程数是单钱运行的线程超出 maximumPoolSize,任务将被拒绝策略拒绝。

看了这四个步骤,其实描述上是有一个漏洞的。如果核心线程数是0,阻塞队列也是无界的,会怎样?如果按照上文的逻辑,应该没有线程会被运行,然后线程无限的增加到队列里面。然后呢?

于是我做了一下试验看看到底会怎样?

public class threadTest {    
    private final static ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());    
        
    public static void main(String[] args) {
        AtomicInteger atomicInteger = new AtomicInteger();        
        while (true) {
            executor.execute(() -> {
                System.out.println(atomicInteger.getAndAdd(1));
            });
        }
    }
}

结果里面的System.out.println(atomicInteger.getAndAdd(1));语句执行了,与上面的描述矛盾了。到底发生了什么?线程池创建线程的逻辑是什么?我们还是从源码来看看到底线程池的逻辑是什么?

ctl

要了解线程池,我们首先要了解的线程池里面的状态控制的参数 ctl。

  • 线程池的ctl是一个原子的 AtomicInteger。
  • 这个ctl包含两个参数 :
    • workerCount 激活的线程数
    • runState 当前线程池的状态
  • 它的低29位用于存放当前的线程数, 因此一个线程池在理论上最大的线程数是 536870911; 高 3 位是用于表示当前线程池的状态, 其中高三位的值和状态对应如下:
    • 111: RUNNING
    • 000: SHUTDOWN
    • 001: STOP
    • 010: TIDYING
    • 110: TERMINATED

为了能够使用 ctl 线程池提供了三个方法:

    // Packing and unpacking ctl
    // 获取线程池的状态
    private static int runStateOf(int c)     { return c & ~CAPACITY; }    
    // 获取线程池的工作线程数
    private static int workerCountOf(int c)  { return c & CAPACITY; }    
    // 根据工作线程数和线程池状态获取 ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

execute

外界通过 execute 这个方法来向线程池提交任务。

先看代码:

 public void execute(Runnable command) {        
        if (command == null)           
            throw new NullPointerException();        
        int c = ctl.get();        
        //如果工作线程数小于核心线程数,
        if (workerCountOf(c) < corePoolSize) {            
            //执行addWork,提交为核心线程,提交成功return。提交失败重新获取ctl
            if (addWorker(command, true))                
            return;
            c = ctl.get();
        }        
        //如果工作线程数大于核心线程数,则检查线程池状态是否是正在运行,且将新线程向阻塞队列提交。
        if (isRunning(c) && workQueue.offer(command)) {            
            //recheck 需要再次检查,主要目的是判断加入到阻塞队里中的线程是否可以被执行
            int recheck = ctl.get();               
            //如果线程池状态不为running,将任务从阻塞队列里面移除,启用拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);            
            // 如果线程池的工作线程为零,则调用addWoker提交任务
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }        
        //添加非核心线程失败,拒绝
        else if (!addWorker(command, false))            
            reject(command);
    }

addWorker

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:        
        for (;;) {            
            int c = ctl.get();            
            //获取线程池状态
            int rs = runStateOf(c);            
            // Check if queue empty only if necessary.
            // 判断是否可以添加任务。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))                
                return false;            
            for (;;) {               
                 //获取工作线程数量
                int wc = workerCountOf(c);                
                //是否大于线程池上限,是否大于核心线程数,或者最大线程数
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))                    
                    return false;                
                //CAS 增加工作线程数
                if (compareAndIncrementWorkerCount(c))                    
                    break retry;
                c = ctl.get();  // Re-read ctl
                //如果线程池状态改变,回到开始重新来
                if (runStateOf(c) != rs)                    
                    continue retry;                
               // else CAS failed due to workerCount change; retry inner loop
            }
        }        
                    
        boolean workerStarted = false;        
        boolean workerAdded = false;
        Worker w = null;        
        //上面的逻辑是考虑是否能够添加线程,如果可以就cas的增加工作线程数量
        //下面正式启动线程
        try {            
            //新建worker
            w = new Worker(firstTask);            
            //获取当前线程
            final Thread t = w.thread;            
            if (t != null) {                
                //获取可重入锁
                final ReentrantLock mainLock = this.mainLock;                
                //锁住
                mainLock.lock();                
                try {                    
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get()); 
                    // rs < SHUTDOWN ==> 线程处于RUNNING状态
                    // 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {                        
                        // 当前线程已经启动,抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();                        
                        //workers 是一个 HashSet 必须在 lock的情况下操作。
                        workers.add(w);                        
                        int s = workers.size();                        
                        //设置 largeestPoolSize 标记workAdded
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }                
                //如果添加成功,启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {            
            //启动线程失败,回滚。
            if (! workerStarted)
                addWorkerFailed(w);
        }        
        return workerStarted;
    }

先看看 addWork() 的两个参数,第一个是需要提交的线程 Runnable firstTask,第二个参数是 boolean 类型,表示是否为核心线程。

execute() 中有三处调用了 addWork() 我们逐一分析。

  • 第一次,条件 if (workerCountOf(c) < corePoolSize) 这个很好理解,工作线程数少于核心线程数,提交任务。所以 addWorker(command, true)
  • 第二次,如果 workerCountOf(recheck) == 0 如果worker的数量为0,那就 addWorker(null,false)。为什么这里是 null ?之前已经把 command 提交到阻塞队列了 workQueue.offer(command) 。所以提交一个空线程,直接从阻塞队列里面取就可以了。
  • 第三次,如果线程池没有 RUNNING 或者 offer 阻塞队列失败,addWorker(command,false),很好理解,对应的就是,阻塞队列满了,将任务提交到,非核心线程池。与最大线程池比较。

至此,重新归纳execute()的逻辑应该是:

  1. 如果当前运行的线程,少于corePoolSize,则创建一个新的线程来执行任务。
  2. 如果运行的线程等于或多于 corePoolSize,将任务加入 BlockingQueue。
  3. 如果加入 BlockingQueue 成功,需要二次检查线程池的状态如果线程池没有处于 Running,则从 BlockingQueue 移除任务,启动拒绝策略。
  4. 如果线程池处于 Running状态,则检查工作线程(worker)是否为0。如果为0,则创建新的线程来处理任务。如果启动线程数大于maximumPoolSize,任务将被拒绝策略拒绝。
  5. 如果加入 BlockingQueue 。失败,则创建新的线程来处理任务。
  6. 如果启动线程数大于maximumPoolSize,任务将被拒绝策略拒绝。

总结

回顾我开始提出的问题:

如果 corePoolSize = 0 且 阻塞队列是无界的。线程池将如何工作?

这个问题应该就不难回答了。

最后

《Java并发编程的艺术》是一本学习 java 并发编程的好书,在这里推荐给大家。

同时,希望大家在阅读技术数据的时候要仔细思考,结合源码,发现,提出问题,解决问题。这样的学习才能高效且透彻。

原文发布于微信公众号 - 犀利豆的技术空间(xilidou1)

原文发表时间:2018-02-23

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏GopherCoder

『Go 语言学习专栏』-- 第十四期

1903
来自专栏高性能服务器开发

(六)关于网络编程的一些实用技巧和细节

这些年,接触了形形色色的项目,写了不少网络编程的代码,从windows到linux,跌进了不少坑,由于网络编程涉及很多细节和技巧,一直想写篇文章来总结下这方面的...

3937
来自专栏Spark学习技巧

spark源码系列之内部通讯的三种机制

本文是以spark1.6.0的源码为例讲解。 Spark为协调各个组件完成任务及内部任务处理采用了多种方式进行了各个组件之间的通讯。总共三个部分牵涉的功能是: ...

3178
来自专栏Leetcode名企之路

【java】CountDownLatch运用场景(1)

CountDownLatch也叫闭锁,使得一(多)个主线程必须等待其他线程完成操作后再执行。 使用的方式是:CountDownLatch内部维护一个计数器,主线...

931
来自专栏北京马哥教育

Linux性能检测常用的10个基本命令

本文的内容主要来自对Netflix的一篇技术博客( Linux Performance Analysis in 60,000 Milliseconds (htt...

1725
来自专栏Spark学习技巧

大数据查询——HBase读写设计与实践

作者 | 汪婷编辑 | Vincent导语:本文介绍的项目主要解决 check 和 opinion2 张历史数据表(历史数据是指当业务发生过程中的完整中间流程和...

3469
来自专栏慎独

AVPlayer初体验之边下边播与视频缓存

2.2K5
来自专栏Java架构

看阿里大牛深入浅出Java线程池原理分析与使用

在我们的开发中“池”的概念并不罕见,有数据库连接池、线程池、对象池、常量池等等。下面我们主要针对线程池来一步一步揭开线程池的面纱。

1.2K4
来自专栏杨建荣的学习笔记

Python调用MySQL模块初试

学Python喊了很长时间了,总是因为各种各样的理由搁置,昨天想起来前同事推荐过一本Python的书《Python核心编程》第二版,就火速买了一本,Pyt...

3916
来自专栏Spring相关

spring websocket 和socketjs实现单聊群聊,广播的消息推送详解

随着互联网的发展,传统的HTTP协议已经很难满足Web应用日益复杂的需求了。近年来,随着HTML5的诞生,WebSocket协议被提出,它实现了浏览器与服务器的...

8315

扫码关注云+社区

领取腾讯云代金券