专栏首页Java后端技术栈cwnait细说线程池---高级篇

细说线程池---高级篇

线程源码分析

上一篇中已经讲了线程池的原理。这一次来说说源码执行过程。建议先看看细说线程池---入门篇 细说线程池---中级篇

依然使用newFixedThreadPool()方法创建线程池。

看源码从execute(Runnable runable)开始。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        ///1.当前池中线程比核心数少,新建一个线程执行任务
        //workerCountOf计算出线程个数
        if (workerCountOf(c) < corePoolSize) {
            //线程池中创建一个线程worker
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        ////2.核心池已满,但任务队列未满,添加到队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //任务成功添加到队列以后,再次检查是否需要添加新的线程,
            // 因为已存在的线程可能被销毁了
            if (! isRunning(recheck) && remove(command))
                ////如果线程池处于非运行状态,
                // 并且把当前的任务从任务队列中
                //移除成功,则拒绝该任务
                reject(command);
            ////如果之前的线程已被销毁完,新建一个线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        ////3.核心池已满,队列已满,试着创建一个新线程
        else if (!addWorker(command, false))
            ////如果创建新线程失败了,说明线程池被关闭或者线程池完全满了, 拒绝任务
            reject(command);
    }

ctl 的作用

在线程池中,ctl 贯穿在线程池的整个生命周期中

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));

它是一个原子类,主要作用是用来保存线程数量和线程池的状态。我们来分析一下这段代码,其实比较有意思,他用到了位运算一个 int 数值是 32 个 bit 位,这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。我们来分析默认情况下,也就是 ctlOf(RUNNING)运行状态,调用了 ctlOf(int rs,int wc)方法;其中

private static int ctlOf(int rs, int wc) { return rs | wc; }

其中 RUNNING =-1 << COUNT_BITS ;-1 左移 29 位,

-1 的二进制是 32 个 1(1111 1111 11111111 1111 1111 1111 1111);

-1 的二进制计算方法原码是 1000…001 . 高位 1 表示符号位。然后对原码取反,高位不变得到 1111…110然后对反码进行+1 ,也就是补码操作, 最后得到 1111…1111

那么-1 <<左移 29 位, 也就是 【111】 表示;rs | wc 。二进制的 111 | 000 。

得到的结果仍然是 111。

那么同理可得其他的状态的 bit 位表示

//32-3
private static final int COUNT_BITS = Integer.SIZE - 3;
//将 1 的二进制向右位移 29 位,再减 1 表示最大线程容量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 运行状态保存在 int 值的高 3 位 ( 所有数值左移 29 位 )
// 接收新任务,并执行队列中的任务
private static final int RUNNING = -1 << COUNT_BITS;
// 不接收新任务,但是执行队列中的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 不接收新任务,不执行队列中的任务,中断正在执行中的任务
private static final int STOP = 1 << COUNT_BITS;
// 所有的任务都已结束,线程数量为 0,处于该状态的线程池即将调用 terminated()方法
private static final int TIDYING = 2 << COUNT_BITS;
// terminated()方法执行完成
private static final int TERMINATED = 3 << COUNT_BITS;

线程池状态变化图

addWorker

再回到上面源码中,当线程池中线程数小于核心线程数的时候:会调用 addWorker,顾名思义,其实就是要创建一个工作线程。我们来看看源码的实现源码比较长,看起来比较唬人,其实就做了两件事。

  1. 才用循环 CAS操作来将线程数加 1
  2. 新建一个线程并启用
  private boolean addWorker(Runnable firstTask, boolean core) {
        //goto 语句,避免死循环
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //如果线程处于非运行状态,并且 rs 不等于 SHUTDOWN 且 firstTask 不等于空且且
           // workQueue 为空,直接返回 false (表示不可添加 work 状态)
          //  1.  线程池已经 shutdown 后,还要添加新的任务,拒绝
           // 2.  (第二个判断) SHUTDOWN 状态不接受新任务,但仍然会执行已经加入任务队列的任
           // 务,所以当进入 SHUTDOWN 状态,而传进来的任务为空,并且任务队列不为空的时候,是允许添加
           // 新线程的 , 如果把这个条件取反,就表示不允许添加 worker
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;
            for (;;) { //自旋
                int wc = workerCountOf(c);//获得 Worker 工作线程数
                //如果工作线程数大于默认容量大小或者大于核心线程数大小,
                // 则直接返回 false 表示不能再添加 worker。
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //通过 cas 来增加工作线程数,
                if (compareAndIncrementWorkerCount(c))
                //如果 cas 失败,则直接重试
                break retry;
                // 再次获取 ctl 的值
                c = ctl.get();
                //这里如果不想等,说明线程的状态发生了变化,继续重试
                if (runStateOf(c) != rs)
                continue retry;
            }
        }
        //上面这段代码主要是对 worker 数量做原子+1 操作,
        // 下面的逻辑才是正式构建一个 worker
        boolean workerStarted = false; //工作线程是否启动的标识
        boolean workerAdded = false; //工作线程是否已经添加成功的标识
        Worker w = null;
        try {
            //构建一个 Worker,这个 worker 是什么呢?
            //我们 可以看到构造方法里面传入了一个 Runnable 对象
            w = new Worker(firstTask);
            final Thread t = w.thread; //从 worker 对象中取出线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); //这里有个重入锁,避免并发问题
                try {
                    int rs = runStateOf(ctl.get());
                   //只有当前线程池是正在运行状态,[或是 SHUTDOWN
                    // 且 firstTask 为空],才能添加到 workers 集合中
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                    //任务刚封装到 work 里面,还没 start,你封装的线程就是 alive,
                    // 几个意思?肯定是要抛异常出去的

                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); //将新创建的 Worker 添加到 workers 集合中
                        int s = workers.size();
                      //如果集合中的工作线程数大于最大线程数,
                      // 这个最大线程数表示线程池曾经出现过的最大线程数
                        if (s > largestPoolSize)
                            largestPoolSize = s; //更新线程池出现过的最大线程数
                        workerAdded = true;//表示工作线程创建成功了
                    }
                } finally {
                    mainLock.unlock(); //释放锁
                }
                if (workerAdded) {//如果 worker 添加成功
                    t.start();//启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                //如果添加失败,就需要做一件事,就是递减实际工作线
                //程数(还记得我们最开始的时候增加了工作线程数吗)
                addWorkerFailed(w);

        }
        //返回结果
        return workerStarted;
    }

Worker说明

我们发现 addWorker 方法只是构造了一个 Worker,并且把 firstTask 封装到 worker 中,它是做什么的呢?我们来看看

  1. 每个 worker,都是一条线程,同时里面包含了一个 firstTask,即初始化时要被首先执行的任务.
  2. 最终执行任务的,是 runWorker()方法Worker 类继承了 AQS,并实现了 Runnable 接口,注意其中的 firstTask 和 thread 属性:firstTask 用它来保存传入的任务;thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,是用来处理任务的线程。

在调用构造方法时,需要传入任务,这里通过 getThreadFactory().newThread(this);来新建一个线程,newThread 方法传入的参数是 this,因为 Worker 本身继承了 Runnable 接口,也就是一个线程,所以一个 Worker 对象在启动的时候会调用 Worker 类中的 run 方法。Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现呢?可以看到 tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的:lock 方法一旦获取了独占锁,表示当前线程正在执行任务中;那么它会有以下几个作用

  1. 如果正在执行任务,则不应该中断线程;
  2. 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
  3. 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态
  4. 之所以设置为不可重入,是因为我们不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁,这样会中断正在运行的线程
//继承了AQS还实现了Runnable
private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        //注意了,这才是真正执行task的线程,从构造函数可知是由
         //ThreadFactury 创建的
        final Thread thread;
        //这就是需要执行的 task
        Runnable firstTask;
        /** 每个线程完成任务的计数器*/
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            //// 初始状态 -1, 防止在调用 runWorker() ,
            // 也就是真正执行task前中断thread 。
            setState(-1);  
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

addWorkerFailed方法

addWorker方法中,如果添加 Worker 并且启动线程失败,则会做失败后的处理。这个方法主要做两件事

  1. 如果 worker 已经构造好了,则从 workers 集合中移除这个 worker
  2. 原子递减核心线程数(因为在 addWorker 方法中先做了原子增加)
  3. 尝试结束线程池
    private void addWorkerFailed(java.util.concurrent.ThreadPoolExecutor.Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        //上锁
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            //释放锁
            mainLock.unlock();
        }
    }

runWorker 方法

前面已经了解了 ThreadPoolExecutor的核心方法 addWorker,主要作用是增加工作线程,而 Worker 简单理解其实就是一个线程,里面重新了 run 方法,这块是线程池中执行任务的真正处理逻辑,也就是 runWorker方法,这个方法主要做几件事:

  1. 如果 task 不为空,则开始执行 task
  2. 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务
  3. 执行完毕后,通过 while 循环继续 getTask()取任务
  4. 如果 getTask()取到的任务依然是空,那么整个runWorker()方法执行完毕

本文分享自微信公众号 - Java后端技术栈(t-j20120622),作者:田老师

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-01-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 多线程面试题【基本概念和线程创建】

    推荐阅读:看完本文,再也不怕问java线程创建的几种方式了 创建线程主要有3种方式:

    用户4143945
  • 细说线程池--中级篇

    线程池的基本使用我们都清楚了,接下来我们来了解一下线程池的实现原理ThreadPoolExecutor是线程池的核心,提供了线程池的实现。ScheduledTh...

    用户4143945
  • 细说线程池---入门篇!!!

    什么是线程池? 在Java中,如果每个请求到达后就创建一个线程,创建和销毁线程花费的时间和消耗的系统资源都相当大,甚至可能要比在处理实际的用户请求的时间和资源...

    用户4143945
  • 多线程相关概念

    Noneplus
  • 多线程之线程池(三)

    OPice
  • jMeter 线程启动时间

    当把线程数量改为10个后,启动时间还是相差一点几秒,但是后启动的线程,处理时间明显比最先启动的慢一些,这里观察到了一个明显的服务器端排队现象。

    Jerry Wang
  • 深入聊聊Java多线程

      在没有学习Java多线程以前,总觉得多线程是个很神秘的东西,只有那些大神才能驾驭,新年假期没事就来学习和了解一下Java的多线程,本篇博客我们就来从头说一下...

    阿豪聊干货
  • Java并发编程的艺术(十)——线程池(1)

    线程池的作用 减少资源的开销 减少了每次创建线程、销毁线程的开销。 提高响应速度 每次请求到来时,由于线程的创建已经完成,故可以直接执行任务,因此提高...

    大闲人柴毛毛
  • Java 线程线程池初探

    所谓线程池,就是将多个线程放在一个池子里面(所谓池化技术),然后需要线程的时候不是创建一个线程,而是从线程池里面获取一个可用的线程,然后执行我们的任务。线程池的...

    纯洁的微笑
  • JAVA开发知识之Java的线程

    进程是什么,进程就是一个程序在运行中的一个实例.比如QQ.比如浏览器.这个就是多进程的状态. 意思就是可以同时运行多个程序. 可以打开任务管理器.观看我们的...

    IBinary

扫码关注云+社区

领取腾讯云代金券