前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程池实现原理-2

线程池实现原理-2

作者头像
Java识堂
发布2019-08-13 10:13:40
6330
发布2019-08-13 10:13:40
举报
文章被收录于专栏:Java识堂

前言

线程池实现原理-1

addWorker实现

在看addWorker方法之前,我们先看一个例子,了解一下retry的使用

  1. break retry 跳到retry处,且不再进入循环
  2. continue retry 跳到retry处,且再次进入循环
代码语言:javascript
复制
public static void main(String[] args) {
    breakRetry();
    continueRetry();
}

private static void breakRetry() {
    int i = 0;
    retry:
    for (; ; ) {
        System.out.println("start");
        for (; ; ) {
            i++;
            if (i == 4)
                break retry;
        }
    }
    //start 进入外层循环
    //4
    System.out.println(i);
}

private static void continueRetry() {
    int i = 0;
    retry:
    for(;;) {
        System.out.println("start");
        for(;;) {
            i++;
            if (i == 3)
                continue retry;
            System.out.println("end");
            if (i == 4)
                break retry;
        }
    }
    //start 第一次进入外层循环
    //end i=1输出
    //end i=2输出
    //start 再次进入外层循环
    //end i=4输出
    //4 最后输出
    System.out.println(i);
}

这里说一下Runnable 参数的含义

  1. firstTask != null 说明任务被添加了,我们需要启动一个线程去执行它
  2. fistTask == null 说明我只想启动一个线程去消费阻塞队列中的任务
代码语言:javascript
复制
// core为ture表示是核心线程,否则非核心线程
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /**
         * 将条件改为如下形式,方便理解
         * rs >= SHUTDOWN && (rs != SHUTDOWN || fistTask != null || workQueue.isEmpty)
         * 1.如果当前线程池的状态>SHUTDOWN,addWorker返回false,添加任务失败
         * 2.如果当前线程池的状态=SHUTDOWN,分为如下2种情况
         * (1)workQueue为空,fistTask == null 和fistTask != null的任务都不能
         * (2)workQueue不为空,可以添加fistTask != null的任务
         */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                // 1.是核心线程 >= corePoolSize
                // 2.非核心线程 >= maximumPoolSize
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 成功将线程数+1,跳到retry处,并且不再进入死循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 否则重新读取ctl
            c = ctl.get();  // Re-read ctl
            // 线程状态发生改变,跳到retry处,并且进入死循环
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 线程是否启动的标志位
    boolean workerStarted = false;
    // 线程封装成Worker对象,是否添加到线程池中的标志位
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                // 1.rs < SHUTDOWN 即 rs = RUNNING
                // 2.rs == SHUTDOWN && firstTask == null
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // 刷新了largestPoolSize
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 留心一下这里,后面会从这里开始讲起
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

仔细理解一下这段代码,其实就能理解,当线程池处于RUNNING 接受新任务,并且处理进入队列的任务,处于SHUTDOWN 不接受新任务,处理进入队列的任务,剩余状态都不会处理任务,上面代码中的注释有详细解释

代码语言:javascript
复制
if (rs >= SHUTDOWN &&
    ! (rs == SHUTDOWN &&
       firstTask == null &&
       ! workQueue.isEmpty()))
    return false;

线程池在执行任务的时候,会把任务对象包装成一个Worker对象,Worker对象是ThreadPoolExecutor的一个内部类,继承了AbstractQueuedSynchronizer,实现了一个独占锁,status值为0表示未锁定状态,status值为1表示锁定状态,实现了Runnable接口,在执行run方法的时候,它执行完初始化的firstTask后,还会从workQueue中取出任务执行,这样就不用新建一个线程执行任务,而是在一个线程中执行了好几个任务

Worker内部类

代码语言:javascript
复制
// 省略了一部分对锁的操作,简单的对AQS的一个实现
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

}

runWorker实现

当t.start()被执行后,run方法会执行runWorker方法,来看runWorker方法

代码语言:javascript
复制
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 允许中断
    w.unlock(); // allow interrupts
    // 标识线程是不是异常终止的
    boolean completedAbruptly = true;
    try {
        // 先执行初始化的fistTask,执行完成后还会无限循环获取workQueue里的任务来执行
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 配合shutdownNow 方法
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 线程开始开始执行之前执行此方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 线程执行后执行
                    afterExecute(task, thrown);
                }
            } finally {
                // 运行过的置为null
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

这个方法需要注意的就是

  1. getTask()从阻塞队列中获取任务,如果队列中没有任务会被阻塞,并不会占用CPU资源
  2. 可以根据业务需要自定义beforeExecute和afterExecute方法

getTask实现

代码语言:javascript
复制
private Runnable getTask() {
    // 标记获取头结点并且移除头结点的方法是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 1.rs >= STOP
        // 2.rs == SHUTDOWN && workQueue.isEmpty()
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            // 用CAS将线程池中的数量-1,直到成功才会退出
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        // 1.核心线程允许被销毁
        // 2.核心线程数 > corePoolSize
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 1.timeOut为true,表示超时获取,workQueue没有任务,说明线程应该被销毁,但是还是要 && timed
        // 2.wc > maximumPoolSize肯定要删除线程了
        // 3.workQueue为空可以销毁线程,此时有可能所有线程都被销毁了
        // 4.workQueue不为空,只有wc > 1才能被删除
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // timed为true,超过keepAliveTime还是没有任务,返回null
            // timed为false,则一直阻塞等待任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

processWorkerExit实现

线程执行完毕执行的方法

代码语言:javascript
复制
// processWorkerExit在runWorker结束之后被调用
private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // 如果是异常终止,或者被中断,减少workerCount
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // Transitions to TERMINATED state if either (SHUTDOWN and pool
    // and queue empty) or (STOP and pool empty)
    tryTerminate();

    int c = ctl.get();
    // 状态为RUNNING或者SHUTDOWN
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 目前核心线程已经够用了,不用再创建
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 增加一个消费的线程
        addWorker(null, false);
    }
}

shutdown实现

代码语言:javascript
复制
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // 检查能否操作线程
        checkShutdownAccess();
        // 确保状态 >= SHUTDOWN
        advanceRunState(SHUTDOWN);
        // 中断所有的空闲线程
        interruptIdleWorkers();
        // ScheduledThreadPoolExecutor会重写这个方法,做一些其他的运算
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
代码语言:javascript
复制
// 中断空闲线程
private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}
代码语言:javascript
复制
// onlyOne为true则只中断一个空闲线程,否则全部中断
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            // 遍历Worker并执行中断操作,w.tryLock()保证了正在执行的Worker不会被中断
            // 因为正在运行的Worker再次获取锁会失败
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

这里需要注意的是不会中断正在运行的线程,因为正在运行的线程w.tryLock()会返回false

shutdownNow实现

代码语言:javascript
复制
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        // 确保状态 >= STOP
        advanceRunState(STOP);
        // 中断所有线程
        interruptWorkers();
        // 获取所有没有执行完成的task
        // 即将阻塞队列中的任务放到tasks中 
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
代码语言:javascript
复制
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
代码语言:javascript
复制
// 这个是Worker内部类的方法
void interruptIfStarted() {
    Thread t;
  // state的初始值为-1,运行到runWorker才允许中断
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
        try {
            t.interrupt();
        } catch (SecurityException ignore) {
        }
    }
}

shutdownNow会中断所有的线程,因为和shutdown相比在中断之前,不用获取锁

tryTerminate实现

代码语言:javascript
复制
// 将状态转换到TERMINATED
final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        // 以下几种状态不能转换为TERMINATED
        // 1.RUNNING状态
        // 2.TIDYING或TERMINATED
        // 3.SHUTDOWN状态,但是workQueue不为空
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    // 让子类去实现,做一些操作
                    terminated();
                } finally {
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

从上面可看出状态转换的条件

  1. SHUTDOWN想转化为TIDYING,需要workQueue为空,同时workerCount为0
  2. STOP转化为TIDYING,需要workerCount为0
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-06-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java识堂 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • addWorker实现
  • Worker内部类
  • runWorker实现
  • getTask实现
  • processWorkerExit实现
  • shutdown实现
    • shutdownNow实现
      • tryTerminate实现
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档