专栏首页用户7621540的专栏线程池ThreadPoolExecutor源码分析

线程池ThreadPoolExecutor源码分析

本文源自 公-众-号 IT老哥 的分享

IT老哥,一个在大厂做高级Java开发的程序员,每天分享技术干货文章

前言

多线程是我们日常工作中很少能接触到的技术,但是面试的时候100%会被问到,万一工作中用到了基本不会,本篇咱们就来深入分析线程池的实现类ThreadPoolExecutor

1、构造方法

构造方法中有4个方法,本质上都是调用的下面这个构造方法:

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
 if (corePoolSize < 0 ||
  maximumPoolSize <= 0 ||
  maximumPoolSize < corePoolSize ||
  keepAliveTime < 0)
  throw new IllegalArgumentException();
 if (workQueue == null || threadFactory == null || handler == null)
  throw new NullPointerException();
 this.acc = System.getSecurityManager() == null ?
   null :
   AccessController.getContext();
 //线程池的核心线程数目
 this.corePoolSize = corePoolSize;
 //线程池的最大线程数目
 this.maximumPoolSize = maximumPoolSize;
 //阻塞的队列(存储的是待运行的线程)
 this.workQueue = workQueue;
 //线程空闲等待时间
 this.keepAliveTime = unit.toNanos(keepAliveTime);
 //线程工厂(主要作用是创建线程),一般是默认
 this.threadFactory = threadFactory;
 //工作队列满了时候的饱和策略
 this.handler = handler;
}

2、饱和策略

上面的构造方法中,我们着重需要注意的是饱和策略,线程池中定义了四种饱和策略:

1、CallerRunsPolicy

public static class CallerRunsPolicy implements RejectedExecutionHandler {
 public CallerRunsPolicy() { }
 //使用主线程执行新任务
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  if (!e.isShutdown()) {
   //此方法相同于同步方法
   r.run();
  }
 }
}

2、 AbortPolicy(线程池默认的策略)

public static class AbortPolicy implements RejectedExecutionHandler { 
 public AbortPolicy() { }

 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  //抛出 RejectedExecutionException来拒绝新任务的处理
  throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());
 }
}

3、DiscardPolicy

public static class DiscardPolicy implements RejectedExecutionHandler {
 public DiscardPolicy() { }
 //不执行任何操作,丢弃新任务
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
 }
}

4、DiscardOldestPolicy

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
 public DiscardOldestPolicy() { }
 //此策略将丢弃最早的未处理的任务
 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  if (!e.isShutdown()) {
   e.getQueue().poll();
   e.execute(r);
  }
 }
}

3、阻塞队列

咱们看下ThreadPoolExecutor的源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
  new LinkedBlockingQueue<Runnable>());
}

使用的是LinkedBlockingQueue作为阻塞队列,LinkedBlockingQueue的默认构造函数允许的队列长度是Integer.MAX_VALUE,若堆积大量的请求,可能会造成OOM

此处就是为什么《阿里巴巴 Java 开发手册》中不推荐使用Executors工具类创建线程池的原因,要求使用 ThreadPoolExecutor 构造函数的方式,让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。

4、execute方法

下面是执行流程图:

对照流程图,我们再来看源码:

//ctl中存放的是int值,int值得高低位保存了线程池运行的状态和有效线程的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) {
 return c & CAPACITY;
}
//任务队列
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) {
 //如果任务为null,则抛出异常
 if (command == null)
  throw new NullPointerException();
 //获取线程池状态和有效线程数
 int c = ctl.get();
 //以下有3步:
 //步骤1:
 //如果线程池工作的线程小于核心线程数
 if (workerCountOf(c) < corePoolSize) { 
  //则增加一个线程,并把该任务交给它去执行
  if (addWorker(command, true))
   //成功则返回
   return;
  //这里说明创建核心线程失败,需要再次获取临时变量c
  c = ctl.get();
 }
 //步骤2:
 // 走到这里说明创建新的核心线程失败,也就是当前工作线程数大于等于corePoolSize
 // 线程池的运行状态是RUNNING,并且尝试将新任务加入到阻塞队列,成功返回true
 if (isRunning(c) && workQueue.offer(command)) {
  //进入到这里,是已经向任务队列投放任务成功
  //再次获取线程池状态和有效线程数
  int recheck = ctl.get();
  //如果线程池状态不是RUNNING(线程池异常终止了),将线程从工作队列中移除
  if (! isRunning(recheck) && remove(command))
   //执行饱和策略
   reject(command);
  // 走到这里说明线程池状态可能是RUNNING
  // 也可能是移除线程任务失败了(失败的最大的可能是已经执行完毕了)
  //因为所有存活的工作线程有可能在最后一次检查之后已经终结,所以需要二次检查线程池工作线程的状态
  //这里博主也是看了半天,大家好好体会下
  else if (workerCountOf(recheck) == 0)
   //若当前线程池工作线程数为0,则新建一个线程并执行
   addWorker(null, false);
 }
 //步骤3:
 // 如果任务队列已满,就需要创建非核心线程
 // 如果新建非核心线程失败,则执行饱和策略
 else if (!addWorker(command, false))
  reject(command);
}

上面的方法多次调用了addWorker方法,我们跟踪进去看下源码:

// 添加工作线程,返回true则创建和启动工作线程成功;返回false则没有新创建工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
 retry:
 for (;;) {
  //获取线程池对应的int值
  int c = ctl.get();
  //获取线程池状态
  int rs = runStateOf(c);
  // Check if queue empty only if necessary.
  if (rs >= SHUTDOWN &&
   ! (rs == SHUTDOWN &&
      firstTask == null &&
      ! workQueue.isEmpty()))
   return false;
  for (;;) {
   //获取工作线程数
   int wc = workerCountOf(c);
   //工作线程数超过允许的“最大线程数”则返回false
   //core为true,“最大线程数”就是核心线程数,则表明创建核心线程数失败
   if (wc >= CAPACITY ||
    wc >= (core ? corePoolSize : maximumPoolSize))
    return false;
   // 成功通过CAS更新工作线程数wc,则break到最外层的循环
   if (compareAndIncrementWorkerCount(c))
    break retry;
   c = ctl.get();  // Re-read ctl
   // 如果线程的状态改变了就跳到外层循环执行
   if (runStateOf(c) != rs)
    continue retry;
   //如果CAS更新工作线程数wc失败,则可能是并发更新导致的失败,继续在内层循环重试即可
   // else CAS failed due to workerCount change; retry inner loop
  }
 }
 // 标记工作线程是否启动成功
 boolean workerStarted = false;
 //标记工作线程是否创建成功
 boolean workerAdded = false;
 //工作线程
 Worker w = null;
 try {
  //创建一个工作线程
  w = new Worker(firstTask);
  final Thread t = w.thread;
  if (t != null) {
   final ReentrantLock mainLock = this.mainLock;
   //获取锁
   mainLock.lock();
   try {
    // Recheck while holding lock.
    // Back out on ThreadFactory failure or if
    // shut down before lock acquired.
    int rs = runStateOf(ctl.get());
    // 再次确认"线程池状态"
    if (rs < SHUTDOWN ||
     (rs == SHUTDOWN && firstTask == null)) {
     if (t.isAlive()) // precheck that t is startable
      throw new IllegalThreadStateException();
     //把创建的工作线程实例添加到工作线程集合
     workers.add(w);
     /更新当前工作线程的峰值容量largestPoolSize
     int s = workers.size();
     if (s > largestPoolSize)
      largestPoolSize = s;
     workerAdded = true;
    }
   } finally {
    //释放锁
    mainLock.unlock();
   }
   //如果加入线程池成功
   if (workerAdded) {
    //启动线程
    t.start();
    workerStarted = true;
   }
  }
 } finally {
  //如果线程启动失败,则需要从工作线程集合移除对应线程
  if (! workerStarted)
   addWorkerFailed(w);
 }
 return workerStarted;
}

5、shutdown方法

线程池不用了,要关闭线程池,下面是源码:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    // 获取锁
    mainLock.lock();
    try {
        //校验是否有权限。
        checkShutdownAccess();
        //设置SHUTDOWN状态。
        advanceRunState(SHUTDOWN);
        //中断线程池中所有空闲线程。
        interruptIdleWorkers();
        //钩子函数
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } finally {
        //释放锁
        mainLock.unlock();
    }
    //尝试终止线程池
    tryTerminate();
}

结束语

本篇详细的分析了ThreadPoolExecutor的execute方法,耗费了不少时间。如果本文对你哪怕是有一点点的帮助,就值了。

云服务器云硬盘数据库(包括MySQL、Redis、MongoDB、SQL Server),CDN流量包,短信流量包,cos资源包,消息队列ckafka,点播资源包,实时音视频套餐,网站管家(WAF),大禹BGP高防(包含高防包及高防IP),云解析SSL证书,手游安全MTP移动应用安全云直播等等。

本文分享自微信公众号 - IT老哥(dys_family),作者:IT老哥

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-08-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java线程池详解

    线程能够充分合理地协调利用CPU、内存、I/O等系统资源,但是线程的创建需要开辟虚拟机栈、本地方法栈、程序计数器等线程私有空间,在线程销毁时需要回收这些系统资源...

    公众号 IT老哥
  • 全网最火Java面试题

    1.JAVA基础扎实、熟练运用设计模式、理解IO/NIO、反射、多线程编程、了解JVM原理;

    公众号 IT老哥
  • 学synchronized锁升级过程,吊打面试官

    哈喽,大家好,我是IT老哥,我们今天来讲讲synchronized这个锁,可能你们第一印象是这个锁太笨了,太重了,谁用谁是傻子,如果你是这样想的话,那么面试的时...

    公众号 IT老哥
  • Java多线程学习(二)——Thread类的方法使用

    停止线程意味着在线程处理完任务之前停掉正在做的操作,也就是放弃当前操作。有以下三种方法终止正在运行中的线程:

    小森啦啦啦
  • 从使用到原理学习Java线程池

    在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源。在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销...

    哲洛不闹
  • Java 多线程编程(“锁”事碎碎念)

    对于多个线程间的共享数据,悲观锁认为自己在使用数据的时候很有可能会有其它线程也刚好前来修改数据,因为在使用数据前都会加上锁,确保在使用过程中数据不会被其它线程修...

    叶志陈
  • Java 多线程编程(聊聊线程池)

    线程是一种昂贵的系统资源,其“昂贵”不仅在于创建线程所需要的资源开销,还在于使用过程中带来的资源消耗。一个系统能够支持同时运行的线程总数受限于该系统所拥有的处理...

    叶志陈
  • 编程体系结构(05):Java多线程并发

    线程是操作系统能够进行运算调度的最小单位,包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程...

    知了一笑
  • 新手一看就懂的线程池

    线程池是帮助我们管理线程的工具,它维护了多个线程,可以降低资源的消耗,提高系统的性能。

    好好学java

扫码关注云+社区

领取腾讯云代金券