前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程池源码解读

线程池源码解读

作者头像
付威
发布2023-10-17 15:28:23
1290
发布2023-10-17 15:28:23
举报

线程池的在 Java并发中使用最多的一种手段,也是性能和易用性相对来说比较均衡的方式,下面我们就一起探索先线程池的原理。

线程池分配线程流程

对于线程池的使用,在这篇文章中就不过多的赘述,首先我们先看下线程池的分配线程的逻辑。

我们知道,在创建线程池的有 7 个核心的参数:

corePoolSize:核心线程数

maximumPoolSize:最大线程数

keepAliveTime:空闲线程存活时间

TimeUnit: 单位

workQueue:阻塞队列

ThreadFactory: 线程工厂

RejectedExecutionHandler: 拒绝策略

在这 7 个参数中,其中我们最重要的几个参数是 corePoolSize,maximumPoolSize,workQueue ,这三个参数来决定线程池主要的线程数和任务队列长度。

具体的流程图如下(图片来自网上,侵删):

image-20220813132419857
image-20220813132419857

构造函数的理解

构造函数的是我们创建线程池的第一步,可以简单的看下,搞清楚内部的变量是如何赋值的。

代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,
                             int maximumPoolSize,
                             long keepAliveTime,
                             TimeUnit unit,
                             BlockingQueue<Runnable> workQueue,
                             ThreadFactory threadFactory,
                             RejectedExecutionHandler handler) {
       if (corePoolSize < 0 ||
           maximumPoolSize <= 0 ||
           maximumPoolSize < corePoolSize ||
           keepAliveTime < 0)
           throw new IllegalArgumentException();
       if (workQueue == null || threadFactory == null || handler == null)
           throw new NullPointerException();
       this.acc = System.getSecurityManager() == null ?
               null :
               AccessController.getContext();
       this.corePoolSize = corePoolSize;
       this.maximumPoolSize = maximumPoolSize;
       this.workQueue = workQueue;
       // 这个是重点
       // Timeout in nanoseconds for idle threads waiting for work. Threads use this timeout when there are more than corePoolSize present or if allowCoreThreadTimeOut. Otherwise they wait forever for new work.
      // 等待工作的线程池的超时 NS 时间,当线程多于核心线程数据数时候或者 allowCoreThreadTimeOut==true,现成会用此次的线程超时时间。 否则 他们会永远等待
       this.keepAliveTime = unit.toNanos(keepAliveTime);
       this.threadFactory = threadFactory;
       this.handler = handler;
   }

几个特殊变量的含义

在阅读代码时候,会有几个变量的障碍,因为设计的过于巧妙,所以看起来稍微有点晦涩。在下面的代码里,已经注释了相关代码的含义:

代码语言:javascript
复制
// ctl= 11100000 00000000 00000000 00000000|0= RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// Interger.SIZE 值是 32 这里的值是 32-3=29 
private static final int COUNT_BITS = Integer.SIZE - 3;
// 实际是 1<<29-1 
// 00100000 00000000 00000000 00000000 - 1
// 00011111 11111111 11111111 11111111
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
// -1 << 29 
// 11111111 11111111 11111111 11111111 << 29
// 11100000 00000000 00000000 00000000
// 取前 3 位 111
private static final int RUNNING    = -1 << COUNT_BITS;
// 000 
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 010
private static final int STOP       =  1 << COUNT_BITS;
// 100
private static final int TIDYING    =  2 << COUNT_BITS;
// 110
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// CAPACITY = 00011111 11111111 11111111 11111111
// ~CAPACITY= 11100000 00000000 00000000 00000000
// c & ~CAPACITY
// 因为~CAPACITY 头三位为 111 ,所以&运算都是它本身
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 同理 CAPACITY 也都是本身
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

核心分配线程逻辑

代码语言:javascript
复制
public void execute(Runnable command) {
       if (command == null)
           throw new NullPointerException();
       int c = ctl.get();
       // 1. 如果小于核心线程数,直接addWork,第二个参数 true 代表是核心线程
       if (workerCountOf(c) < corePoolSize) {
           if (addWorker(command, true))
               return;
           c = ctl.get();
       }
       // 2. 
       // 如果当前线程池是 RUNNING 状态,且能够 offer 进队列则进行 recheck(为什么要进行 recheck)
       // 如果 recheck 线程池不是 RUNNING 状态,且能移除当前 command对象成功,则直接 reject 
       // - 为了防止 add任务后,线程池调用了 shutdown 方法。
       // 否则 判断当前数量为 0 直接 addWorker 一个空的任务。 
       // isRuning c<SHUTDOWN 只有 runing
       if (isRunning(c) && workQueue.offer(command)) {
           int recheck = ctl.get();
           //如果出现刚入队列,线程池就被 shutdown了,任务就会被移除。
           if (! isRunning(recheck) && remove(command))
               reject(command);
           //这里是什么场景?核心线程数为0了?
           // 这就是前面说的我们可以设置核心线程数完成任务后就被销毁,那么核心线程数就为0了,
           //那么刚刚队列中的任务怎么执行呢,就需要使用使用创建非核心线程数来执行任务了(可以忽略,因为不会这么设置)
           //addWorker 会同时创建任务和线程,这个 addWorker(null,false) ,代表只开线程处理任务,不添加新任务。
           else if (workerCountOf(recheck) == 0)
               addWorker(null, false);
       }
       //3. 如果添加任务失败,则直接 reject 
       else if (!addWorker(command, false))
           reject(command);
   }

addWork 方法

这个是线程池添加任务的核心线程

代码语言:javascript
复制
private boolean addWorker(Runnable firstTask, boolean core) {
      retry:
      for (;;) {
          int c = ctl.get();
          int rs = runStateOf(c);

          // Check if queue empty only if necessary.
         // 1. rs >= SHUTDOWN 代表不是 running 状态
         // 转换 rs >= SHUTDOWN &&  ( rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
         // 表示当前线程池处于SHUTDOWN 状态后,新增的任务不为空的,直接返回,代表添加任务失败
         // 表示当前线程池处于SHUTDOWN 状态后,核心线程数为0,新增非核心线程数来处理任务,但是队列为空,直接返回,代表添加任务失败.

         // 这里返回 false 有可能会触发 reject 方法
          if (rs >= SHUTDOWN &&
              ! ( rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()) )
              return false;

         
          for (;;) {
             // 此处保证,如果是核心线程和非核心线程都会返回 false ,但是如果是核心线程则不需要校验返回值
              int wc = workerCountOf(c);
              if (wc >= CAPACITY ||
                  wc >= (core ? corePoolSize : maximumPoolSize))
                  return false;
              // CAS 更新线程池的数量 ,更新成功完后会跳出 retry 
              if (compareAndIncrementWorkerCount(c))
                  break retry;
              // 这里是 CAS 更新失败逻辑
              c = ctl.get();  // Re-read ctl
              if (runStateOf(c) != rs)
                  continue retry;
              // else CAS failed due to workerCount change; retry inner loop
          }
      }

      boolean workerStarted = false;
      boolean workerAdded = false;
      Worker w = null;
      try {
          // 创建一个 Workder 对象,Worker 继承自 Runabler 对象
          w = new Worker(firstTask);
          final Thread t = w.thread;
          if (t != null) {
              //这里为什么需要一个 CAS 锁?
              //1. 避免 HashSet 不安全 ,锁为什么不加到  workers.add(w); 这里?
              //2. 
              final ReentrantLock mainLock = this.mainLock;
              mainLock.lock();
              try {
                  //再次校验线程状态
                  // 这里为什么再次校验,防止调用了 addWorker 未完成,就直接调用了 shutdown()
                  int rs = runStateOf(ctl.get());
  
                  if (rs < SHUTDOWN ||
                      (rs == SHUTDOWN && firstTask == null)) {
                      // 如果线程刚分配就已经在运行了,说明是不合法的流程
                      if (t.isAlive()) // precheck that t is startable
                          throw new IllegalThreadStateException();
                      workers.add(w);
                      int s = workers.size();
                      // largestPoolSize 标记当前线程池最大的线程数
                      if (s > largestPoolSize)
                          largestPoolSize = s;
                      workerAdded = true;
                  }
              } finally {
                  mainLock.unlock();
              }
              if (workerAdded) {
                 // 启动线程,调用 worker 中的 run 方法
                  t.start();
                  workerStarted = true;
              }
          }
      } finally {
          // 哪些场景会添加失败?
          //1. 上面 recheck 的时候。
          if (! workerStarted)
              addWorkerFailed(w);
      }
      return workerStarted;
  }

Woker 源码解析

worker 继承自 AbstractQueuedSynchronizerRunnable,本质还是一个线程对象

代码语言:javascript
复制
final class Worker extends AbstractQueuedSynchronizer implements Runnable  

Worker(Runnable firstTask) {
          setState(-1); // inhibit interrupts until runWorker
          this.firstTask = firstTask;
          this.thread = getThreadFactory().newThread(this);
      }

      /** Delegates main run loop to outer runWorker  */
      public void run() {
          runWorker(this);
      }

runWorker代码

这个是线程调用了 start 方法,start 方法会调用 run 方法,run 方法会调用 task 中的 run 方法,进而间接的开线程调用了业务方法。

代码语言:javascript
复制
final void runWorker(Worker w) {
       Thread wt = Thread.currentThread();
       Runnable task = w.firstTask;
       w.firstTask = null;
      // 这里为什么要先释放锁??
       w.unlock(); // allow interrupts
       boolean completedAbruptly = true;
       try {
           // 如果获取任务不为空,getTask()方法
           while (task != null || (task = getTask()) != null) {
               w.lock();
               // If pool is stopping, ensure thread is interrupted;
               // if not, ensure thread is not interrupted.  This
               // requires a recheck in second case to deal with
               // shutdownNow race while clearing interrupt
               if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                     &&!wt.isInterrupted())
                   wt.interrupt();
               try {
                   // 线程池 目前没有用,WorkerPoolExecutor会记录当前哪些 worker 列表
                   beforeExecute(wt, task);
                   Throwable thrown = null;
                   try {
                       //调用 run 方法,只是方法的调用,不是线程的启动
                       task.run();
                   } catch (RuntimeException x) {
                       thrown = x; throw x;
                   } catch (Error x) {
                       thrown = x; throw x;
                   } catch (Throwable x) {
                       thrown = x; throw new Error(x);
                   } finally {
                       afterExecute(task, thrown);
                   }
               } finally {
                   // 执行完成 ,任务赋值为空,下次循环就直接从队列中拿任务了
                   task = null;
                   w.completedTasks++;
                   w.unlock();
               }
           }
           completedAbruptly = false;
       } finally {
           //异常跳出 while 循环,处理动作
           processWorkerExit(w, completedAbruptly);
       }
   }

getTask() 获取任务方法

这个getTask()方法是获取任务的方法,也是线程池线程能够复用的逻辑,在一个 while 循环中,一直拉取队列任务。

代码语言:javascript
复制
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果线程池不是 RUNNING 且 状态> STOP 或者 队列为空
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
            
             //如果允许核心线程数超时,或者是非核心线程 timed 才为 true.
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //如果 (线程数大于最大线程 或者 已经超时)并且(线程数>1||队列为空)
           //如果允许核心线程超时且已经超时且队列中任务为空  则直接减少线程数据退出死循环返回空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
               // 释放一个线程
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
               // 调用对应的 aqs 方法,拉取对应的 task
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

processWorkerExit

代码语言:javascript
复制
private void processWorkerExit(Worker w, boolean completedAbruptly) {
      // 如果出现异常,则将线程池中线程数量-1
      if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
           decrementWorkerCount();

       final ReentrantLock mainLock = this.mainLock;
       mainLock.lock();
       try {
           //完成数++,workers 移除
           completedTaskCount += w.completedTasks;
           workers.remove(w);
       } finally {
           mainLock.unlock();
       }

       tryTerminate();

       int c = ctl.get();
      //如果线程池是RUNNING SHUTDOWN .
       if (runStateLessThan(c, STOP)) {
           //非用户任务异常,也就是手动执行的中断操作
           if (!completedAbruptly) {
               int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
               //如果队列中还有待执行的任务,那么必须要保证线程池中至少有一个线程,否则就创新一个新的非核心线程
               if (min == 0 && ! workQueue.isEmpty())
                   min = 1;
               if (workerCountOf(c) >= min)
                   return; // replacement not needed
           }
            // 开线程拉取任务处理
           addWorker(null, false);
       }
   }

整体的流程图

(图来自网上,侵删)

img
img
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-08-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 线程池分配线程流程
  • 构造函数的理解
  • 几个特殊变量的含义
  • 核心分配线程逻辑
  • addWork 方法
  • Woker 源码解析
  • runWorker代码
  • getTask() 获取任务方法
  • processWorkerExit
  • 整体的流程图
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档