前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【每周三面】源码角度说说Java线程池

【每周三面】源码角度说说Java线程池

作者头像
趣学程序-shaofeer
发布2020-05-18 14:56:54
3840
发布2020-05-18 14:56:54
举报
文章被收录于专栏:upuptop的专栏upuptop的专栏

本文来源:http://yeming.me/2016/05/07/threadPool1/

线程池类图

本篇文章我们先从左边这条线 Executor==>ExcutorService==>AbstractExecutorService==>ThreadPoolExecutor来分析一下。

  • 上面url继承类图,线程池的最顶层的接口是Executor,这个接口只有一个方法void execute(Runnable command)
  • ExecutorService继承Executor,新增了submit(Runnable(Callable)),shutDown,shutDownNow等几个主要方法
  • AbstractExecutorService实现了上面的ExecutorService接口的若干个方法。
  • ThreadPoolExecutor继承AbstractExecutorService,实现了线程池的一些主要的方法execute(Runnable)。

AbstractExecutorService

AbstractExecutorService实现了submit方法,代码如下:

submit(Callable task)方法

代码语言:javascript
复制
  1. public <T> Future<T> submit(Callable<T> task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<T> ftask = newTaskFor(task);
  4. execute(ftask);
  5. return ftask;
  6. }

newTaskFor(Callable callable)方法

代码语言:javascript
复制
  1. protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  2. return new FutureTask<T>(callable);
  3. }

上面的FutureTask实现了RunnableFuture接口,RunnableFuture继承了 Runnable和Future接口。Runnable接口只有一个void run方法,Future接口有cancel(boolean),V get(),V get(long timeout, TimeUnit unit),boolean isCancelled(),boolean isDone()方法。

ThreadPoolExecutor

接着上面的AbstractExecutorService.submit方法,会调用到execute(ftask),这个execute方法就是ThreadPoolExecutor中的。我们接下来就以execute方法作为起点来分析。

execute

代码语言:javascript
复制
  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. int c = ctl.get();
  5. if (workerCountOf(c) < corePoolSize) {
  6. if (addWorker(command, true))
  7. return;
  8. c = ctl.get();
  9. }
  10. if (isRunning(c) && workQueue.offer(command)) {
  11. int recheck = ctl.get();
  12. if (! isRunning(recheck) && remove(command))
  13. reject(command);
  14. else if (workerCountOf(recheck) == 0)
  15. addWorker(null, false);
  16. }
  17. else if (!addWorker(command, false))
  18. reject(command);
  19. }
  • 首先检查当前工作线程数是否小于corePoolSize,若小于,则添加一个worker来处理这个任务(commadn),添加任务成功则返回.
  • 如果线程还处于running状态,并且任务成功添加到queue中,重新检查一次线程池的状态,若线程池非running,则从queue中删除任务,成功则调用reject,这里根据拒绝策略来执行;若当前工作的线程数为0,则添加一个worker(addWorker(null, false),这里要注意,这次的addWorker的参数和上面第一次的不一样)
  • 如果添加worker失败,也执行reject方法。 addWorker
代码语言:javascript
复制
  1. private boolean addWorker(Runnable firstTask, boolean core) {
  2. retry:
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN &&
  8. ! (rs == SHUTDOWN &&
  9. firstTask == null &&
  10. ! workQueue.isEmpty()))
  11. return false;
  12. for (;;) {
  13. int wc = workerCountOf(c);
  14. if (wc >= CAPACITY ||
  15. wc >= (core ? corePoolSize : maximumPoolSize))
  16. return false;
  17. if (compareAndIncrementWorkerCount(c))
  18. break retry;
  19. c = ctl.get(); // Re-read ctl
  20. if (runStateOf(c) != rs)
  21. continue retry;
  22. // else CAS failed due to workerCount change; retry inner loop
  23. }
  24. }
  25. boolean workerStarted = false;
  26. boolean workerAdded = false;
  27. Worker w = null;
  28. try {
  29. w = new Worker(firstTask);
  30. final Thread t = w.thread;
  31. if (t != null) {
  32. final ReentrantLock mainLock = this.mainLock;
  33. mainLock.lock();
  34. try {
  35. // Recheck while holding lock.
  36. // Back out on ThreadFactory failure or if
  37. // shut down before lock acquired.
  38. int rs = runStateOf(ctl.get());
  39. if (rs < SHUTDOWN ||
  40. (rs == SHUTDOWN && firstTask == null)) {
  41. if (t.isAlive()) // precheck that t is startable
  42. throw new IllegalThreadStateException();
  43. workers.add(w);
  44. int s = workers.size();
  45. if (s > largestPoolSize)
  46. largestPoolSize = s;
  47. workerAdded = true;
  48. }
  49. } finally {
  50. mainLock.unlock();
  51. }
  52. if (workerAdded) {
  53. t.start();
  54. workerStarted = true;
  55. }
  56. }
  57. } finally {
  58. if (! workerStarted)
  59. addWorkerFailed(w);
  60. }
  61. return workerStarted;
  62. }

(1)这个判断逻辑比较复杂,我们先来看下

代码语言:javascript
复制
  1. if (rs >= SHUTDOWN &&
  2. ! (rs == SHUTDOWN &&
  3. firstTask == null &&
  4. ! workQueue.isEmpty()))
  5. return false;

若当前状态大于SHUTDOWN,显然if判断条件为ture,直接returnfalse。(很好理解,线程池处于关闭状态,肯定不让新添加worker了) 若当前状态小于SHUTDOWN,if判断条件为false,接着往下走(线程池为RUNNING状态,很好理解) 若当前状态等于SHUTDOWN:若firstTask等于null并且工作队列有任务,则if判断条件为false,代码不会return,会继续往下运行;若firstTask不等于null或者工作队列为空,则判断条件为true,会return false(这个也好理解,我们知道SHUTDOWN状态,线程池不再接受新的任务,但是已经在工作队列中的任务还是要完成才行。所以若first等于null,并且工作队列有任务,还要继续往下走。若相反,则不会往下走) (2)判断当前工作线程数

代码语言:javascript
复制
  1. for (;;) {
  2. int wc = workerCountOf(c);
  3. if (wc >= CAPACITY ||
  4. wc >= (core ? corePoolSize : maximumPoolSize))
  5. return false;
  6. if (compareAndIncrementWorkerCount(c))
  7. break retry;
  8. c = ctl.get(); // Re-read ctl
  9. if (runStateOf(c) != rs)
  10. continue retry;
  11. // else CAS failed due to workerCount change; retry inner loop
  12. }

当前工作线程数没有超过线程池设置的参数的限制,则利用CAS添加一个worker,并跳出外层的for循环,继续向下运行。否则返回false,添加worker失败。(3) 完成了上述1 2步骤后,会执行new Worker(firstTask),Thread t = w.thread并再次检查线程池的状态,若合法,则向工作线程池HashSet中添加当前worker,并执行t.start。此时才开启了子线程来执行任务。

子线程run方法

上面步骤3调用了t.start,会开启一个子线程来运行Worker中的run方法。

代码语言:javascript
复制
  1. public void run() {
  2. runWorker(this);
  3. }
  4. final void runWorker(Worker w) {
  5. Thread wt = Thread.currentThread();
  6. Runnable task = w.firstTask;
  7. w.firstTask = null;
  8. w.unlock(); // allow interrupts
  9. boolean completedAbruptly = true;
  10. try {
  11. while (task != null || (task = getTask()) != null) {
  12. w.lock();
  13. // If pool is stopping, ensure thread is interrupted;
  14. // if not, ensure thread is not interrupted. This
  15. // requires a recheck in second case to deal with
  16. // shutdownNow race while clearing interrupt
  17. if ((runStateAtLeast(ctl.get(), STOP) ||
  18. (Thread.interrupted() &&
  19. runStateAtLeast(ctl.get(), STOP))) &&
  20. !wt.isInterrupted())
  21. wt.interrupt();
  22. try {
  23. beforeExecute(wt, task);
  24. Throwable thrown = null;
  25. try {
  26. task.run();
  27. }finally {
  28. afterExecute(task, thrown);
  29. }
  30. } finally {
  31. task = null;
  32. w.completedTasks++;
  33. w.unlock();
  34. }
  35. }
  36. completedAbruptly = false;
  37. } finally {
  38. processWorkerExit(w, completedAbruptly);
  39. }
  40. }

上述worker不断通过getTask()方法,从workQueue中获取任务;若没有获取到任务,则调用processWorkerExit方法。

getTask()

代码语言:javascript
复制
  1. private Runnable getTask() {
  2. boolean timedOut = false; // Did the last poll() time out?
  3. for (;;) {
  4. int c = ctl.get();
  5. int rs = runStateOf(c);
  6. // Check if queue empty only if necessary.
  7. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  8. decrementWorkerCount();
  9. return null;
  10. }
  11. int wc = workerCountOf(c);
  12. // Are workers subject to culling?
  13. boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
  14. if ((wc > maximumPoolSize || (timed && timedOut))
  15. && (wc > 1 || workQueue.isEmpty())) {
  16. if (compareAndDecrementWorkerCount(c))
  17. return null;
  18. continue;
  19. }
  20. try {
  21. Runnable r = timed ?
  22. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  23. workQueue.take();
  24. if (r != null)
  25. return r;
  26. timedOut = true;
  27. } catch (InterruptedException retry) {
  28. timedOut = false;
  29. }
  30. }
  31. }

getTask方法是一个无限的for循环方法,它首先判断当前线程池的状态

代码语言:javascript
复制
  1. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
  2. decrementWorkerCount();
  3. return null;
  4. }

这个判断也很好理解,若rs==SHUTDOWN,workQueue为空,显然应该直接返回null,并提前是工作的worker减一。(getTask返回null,runWorker方法会调用processWorkerExit从HashSet中remove当前worker);若rs>大于SHUTDOWN(这个对应线程池的shutDownNow方法,工作队列中等待的任务不再执行);其他情况,说明线程池处于运行状态,继续往下运行。然后根据当前线程池设置的最大线程数,以及是否允许线coreThread超时间以及workQueue的状态来判断是否通过CAS操作来是线程数减一并return null。最后我们要关注下下面这个从工作队列中取得任务的三目运算。

代码语言:javascript
复制
  1. Runnable r = timed ?
  2. workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
  3. workQueue.take();

若timed为ture(设置allowCoreThreadTimeOut为true),则超过了等待的时间还没有从workQueue中取得任务则r = null,此时就有可能造成即使workerCount小于corePoolSize,当前的worker也可能被回收。若timed为false,则调用阻塞方法从workQueue中获取任务,newFixedThreadPool就会一直调用这个阻塞方法,从而达到不显示关闭线程池的情况下,即使workQueue为空,也能维持固定的工作线程的个数。

shutDown(shutDownNow)方法

代码语言:javascript
复制
  1. public List<Runnable> shutdownNow() {
  2. List<Runnable> tasks;
  3. final ReentrantLock mainLock = this.mainLock;
  4. mainLock.lock();
  5. try {
  6. checkShutdownAccess();
  7. //shutDwonNow为STOP,shutDown为SHUTDOWN
  8. advanceRunState(STOP);(advanceRunState(SHUTDOWN);)
  9. interruptWorkers();(interruptIdleWorkers)
  10. //shutDownNow专用
  11. tasks = drainQueue();
  12. //shutDown专用 ScheduledThreadPoolExecutor回调
  13. onShutdown();
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. tryTerminate();
  18. return tasks;
  19. }

shutDown和shutDownnNow方法区别(代码层面):

  • shutDownNow:advanceRunState(STOP),interruptWorkers shutDown:advanceRunState(shutDown),interruptIdleWorkers
  • shutDown多了个onShutdown();ScheduledThreadPoolExecutor复写了onShutDown方法。
  • shutDownNow方法工作队列中还未完成的任务。
  • interruptIdleWorkers

interruptIdleWorkers与interruptWorkers

(1)shutDownNow

代码语言:javascript
复制
  1. private void interruptWorkers() {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers)
  6. w.interruptIfStarted();
  7. } finally {
  8. mainLock.unlock();
  9. }
  10. }

显然这个是中断所有的线程 (2)shutDown

代码语言:javascript
复制
  1. private void interruptIdleWorkers(boolean onlyOne) {
  2. final ReentrantLock mainLock = this.mainLock;
  3. mainLock.lock();
  4. try {
  5. for (Worker w : workers) {
  6. Thread t = w.thread;
  7. if (!t.isInterrupted() && w.tryLock()) {
  8. try {
  9. t.interrupt();
  10. } catch (SecurityException ignore) {
  11. } finally {
  12. w.unlock();
  13. }
  14. }
  15. if (onlyOne)
  16. break;
  17. }
  18. } finally {
  19. mainLock.unlock();
  20. }
  21. }

注意onlyOne参数,这个只有在调用tryTerminate()方法里面,会调用interruptIdleWorkers(true),其他情况都是interruptIdleWorkers(false),所以对于shutDown方法,也是尝试中断所有还没有被中断的线程。3)tryTerminate 上面(2)中提到了tryTerminate方法,接下来就来看下这个方法

代码语言:javascript
复制
  1. final void tryTerminate() {
  2. for (;;) {
  3. int c = ctl.get();
  4. if (isRunning(c) ||
  5. runStateAtLeast(c, TIDYING) ||
  6. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
  7. return;
  8. if (workerCountOf(c) != 0) { // Eligible to terminate
  9. interruptIdleWorkers(ONLY_ONE);
  10. return;
  11. }
  12. final ReentrantLock mainLock = this.mainLock;
  13. mainLock.lock();
  14. try {
  15. if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
  16. try {
  17. terminated();
  18. } finally {
  19. ctl.set(ctlOf(TERMINATED, 0));
  20. termination.signalAll();
  21. }
  22. return;
  23. }
  24. } finally {
  25. mainLock.unlock();
  26. }
  27. // else retry on failed CAS
  28. }
  29. }

从上述代码可以看出,若线程池状态为SHUTDOWN,workQueue为空,工作线程数为0或者线程池状态为STOP,工作线程数为0,都最终会把线程池状态设置为TERMINATED,并且唤醒所有因为调用awaitTermination()方法阻塞在termination.awaitNanos(nanos)还未醒过来的线程。

代码语言:javascript
复制
  1. public boolean awaitTermination(long timeout, TimeUnit unit)
  2. throws InterruptedException {
  3. long nanos = unit.toNanos(timeout);
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. try {
  7. for (;;) {
  8. if (runStateAtLeast(ctl.get(), TERMINATED))
  9. return true;
  10. if (nanos <= 0)
  11. return false;
  12. nanos = termination.awaitNanos(nanos);
  13. }
  14. } finally {
  15. mainLock.unlock();
  16. }
  17. }

上述tryTerminate方法,在addWorkerFailed(),processWorkerExit(),shutDown(),shutDownNow(),remove(Runnable task)方法中都会调用到。

线程池5种状态解释

上面经常提到线程池的运行状态,这里稍作解释一下。

代码语言:javascript
复制
  1. private static final int RUNNING = -1 << COUNT_BITS;
  2. private static final int SHUTDOWN = 0 << COUNT_BITS;
  3. private static final int STOP = 1 << COUNT_BITS;
  4. private static final int TIDYING = 2 << COUNT_BITS;
  5. private static final int TERMINATED = 3 << COUNT_BITS;

种状态的定义

  • RUNNING: 接受新的任务,处理workQueue中的任务。
  • SHUTDOWN: 不接受新的任务,但是会继续完成workQueue中的任务
  • STOP: 不接受新的任务,也不处理workQueue中未完成的任务,尝试中断所有运行中的任务
  • TIDYING: 所有任务已经完成, 工作线程数为0,线程池状态变成TIDYING随之将会调用terminated()方法。
  • TERMINATED: terminated()方法已经完成

5种状态相互转换

  • RUNNING -> SHUTDOWN: 调用shutdown()方法,也许隐式在finalize()方法
  • (RUNNING or SHUTDOWN) -> STOP: 调用shutdownNow()方法
  • SHUTDOWN -> TIDYING: workQueue和pool都为空
  • STOP -> TIDYING: pool为空
  • TIDYING -> TERMINATED: terminated()方法完成
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-04-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 趣学程序 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 线程池类图
  • AbstractExecutorService
    • submit(Callable task)方法
      • newTaskFor(Callable callable)方法
      • ThreadPoolExecutor
        • execute
          • 子线程run方法
            • getTask()
              • shutDown(shutDownNow)方法
                • interruptIdleWorkers与interruptWorkers
                • 线程池5种状态解释
                  • 种状态的定义
                    • 5种状态相互转换
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档