前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >(juc系列)threadpoolexecutor源码学习

(juc系列)threadpoolexecutor源码学习

作者头像
呼延十
发布2021-10-18 10:46:36
4310
发布2021-10-18 10:46:36
举报
文章被收录于专栏:呼延呼延

前言

其实早在19年,就简单的写过ThreadPoolExecutor. 但是只涉及到了其中两个参数,理解也不深刻,今天重新看一下代码。

简介

这个类是Java中常用的线程池的一个类,相关的类图:

2021-10-11-17-38-50
2021-10-11-17-38-50

继承自父类: AbstractExecutorService,实现了ExecutorExecutorService接口.

使用一些池化的线程来执行每一个提交的任务,一般使用Executors的工厂方法来进行相关的配置.

线程池解决两个不同的问题:由于减少了每个任务的调用开销,它们通常在执行大量异步任务时提供更好的性能, 并且它们提供了一种限制和管理资源的方法,包括在执行集合时消耗的线程任务。 每个 ThreadPoolExecutor 还维护一些基本的统计信息,例如已完成的任务数。

为了在更加广泛的上下文中可用,这个类提供了许多可以调整的参数和可以扩展的挂钩. 但是强烈建议程序员使用Executors的工厂方法来进行这个类的创建.

如果你要手动创建和配置的话,以下是一些使用指南:

  • 核心线程数和最大线程数

线程池自动调整池子的大小,调整范围在corePoolSizemaximumPoolSize之间. 如果正在运行的线程少于核心线程数, 处理请求时将创建一个新的线程.即使当前有一些线程是空闲的. 如果当前运行的线程数少于最大线程数.只有当工作队列满了的时候,才会创建新的线程. 设置核心线程数等于最大线程数,就创建了一个固定大小的线程池. 设置一个无限大的最大线程数,就允许线程池拥有任意数量的线程. 通常,核心线程数和最大线程数只在调用构造方法时设置,但是也可以被对应的set方法修改.

  • 按需构建

默认情况下, 核心线程也是在有新的任务到来时才会初始化. 但是这一点可以被动态的重写,使用prestartCoreThreadprestartAllCoreThreads. 如果你创建线程时已经有一个不为空的队列,你可能想要预启动线程.

  • 创建新线程

使用ThreadFactory创建新线程. 默认使用Executors.defaultThreadFactory. 他创建的线程全在同一个ThreadGroup中. 有用相同的优先级,且不是守护线程. 使用其他的线程工厂,你可以修改线程的名字,线程组,优先级,是否是守护线程等等. 如果线程工厂在创建新线程时出错,调用newThread时返回null.执行器会继续,但是可能没有办法执行任务了. 线程应该有用”修改线程”的运行时权限. 如果工作线程或者其他线程没有取得这个权限,服务将退化.配置的更改可能不起效,一个终止的线程池可能还处在未完成状态中.

  • 活跃时间

如果一个线程池有超过核心线程数的线程数量,超过核心线程数的线程将在空闲超过keepAliveTime时间后被终止. 当线程池没有完全应用起来时,这提供了一个减少资源消耗的方法.如果之后线程池变得更加活跃,将新创建线程. 这个参数也可以动态的更改. 使用Long.Max_Value意味着空闲线程永远不会终止. 默认实现中,保持活跃策略只有在线程数大于核心线程时被应用. 但是allowCoreThreadTimeOut可以将这个策略也应用在核心线程上.

  • 排队

线程池应用一个BlockingQueue来持有提交的任务,这个队列的使用和线程池的大小有关系:

  • 如果运行的线程小于核心线程数,优先新建线程而不是入队等待.
  • 如果运行线程数大于等于核心线程数,新来的任务优先入队等待而不是新创建线程.
  • 如果一个任务不能入队,将会新创建一个线程。如果线程数已经到达最大线程数,这个任务将会被拒绝.

常见的排队策略有三个:

  • 直接交接

工作队列的一个默认实现是SynchronousQueue,他将任务直接交给线程,而不是使用其他方式来保留任务。 这种实现下,如果当前没有一个线程是立刻可用的,那么入队一个任务将会失败. 因此会创建一个新的线程. 这个策略避免了处理一系列内部依赖的任务时造成的锁定. 直接交接通常要求吴杰的最大线程数,以避免拒绝新任务. 当任务的到达速度,大于处理速度时,线程池将会无限增长.

  • 无界队列

使用一个无界队列(没有给定容量的LinkedBlockingQueue)将会使新任务在队列中等待,如果所有的核心线程都在忙碌时. 因此,不会有超过核心线程数的线程被创建. (最大线程数这个参数就没有作用了.) 当任务之间完全互相独立时,这可能是有用的,因为任务不会影响彼此的执行. 比如在web网页服务中. 这个风格的排队策略,在处理平滑的请求速度中的尖刺时很有用,但是当任务的到达速度大于处理速度时,工作队列将会无线增长.

  • 有界队列

有界队列防止资源耗尽,但是会更加难以控制. 队列的大小和最大线程数可能会不断影响彼此. 使用大的等待队列和比较小的线程池意味着较小的cpu使用率,系统资源以及上下文切换的浪费, 但是吞吐量会较低. 如果任务频繁的阻塞,系统可能可以调度时间到更多线程,远超过你搜允许的。 使用较小的队列通常要求更大的线程池,会导致CPU繁忙但是可能会遇到不可调度开销,这也会降低吞吐量.

  • 拒绝任务

如果当前线程池已经终止了,或者所有的可用线程和工作队列都满了,新提交的任务将会被拒绝. 在这些情况下,执行方法将会调用RejectedExecutionHandler.rejectedExecution. 提供了4种预定义的处理策略:

  • AbortPolicy 默认实现,拒绝时直接抛出异常
  • CallerRunsPolicy 拒绝时让调用方的线程执行这个任务,这是一个反馈型的控制策略,可以让提交任务的速度慢下来
  • DiscardPolicy 不能执行的任务直接丢弃掉.
  • DiscardOldestPolicy 拒绝时,丢弃掉最老的任务,也就是等待队列的第一个节点.

还可以实现其他的拒绝策略,也可以自己实现

  • 挂钩方法

这个类提供了beforeExecuteafterExecute方法, 在每个任务被执行之前和之后进行调用. 这些方法用来操作执行环境. 比如, 初始化ThreadLocal的值, 搜集一些统计信息,或者添加统计信息.

terminated方法可以被重写,以在线程池完全终止后,执行一些特殊的操作.

如果挂钩,回调,等待队列等抛出异常,内部的工作线程可能会失败,终止,或者被替换.

  • 队列维护

getQueue允许访问工作队列,以用来监控或者进行调试.如果用于其他目的的话,很不好. 当大量排队任务被取消时,removepurge两个方法可以用来协助回收存储.

  • 回收

程序中不再引用并且没有剩余线程的线程池可以在不显示关闭的情况下,被垃圾回收收集。 您可以通过设置合适的存活时间,使用一个较少的核心线程数,或者允许allowCoreThreadTimeOut来允许所有未使用的线程死亡.

  • 扩展示例

这个类的大部分扩展类都重写了一个或者多个hook. 比如下面这个子类添加了一个简单的暂停,继续功能:

在执行每一个任务之前,检查当前线程池是否被暂停了,如果是,自旋,等待外部唤醒.

源码阅读

常量

由于内部使用一个int来存储当前活跃的线程数和线程池的状态,因为需要一些bit位以及状态的定义,都在常量里面了.

变量

内部同步器 Worker 工作线程

构造方法

这个类提供了4个构造方法,不过本质上都是最后一个.

比较简单,首先做了一些参数的检查,之后进行赋值.

提交任务 execute

一个线程池,最重要,最常用的方法就是提交任务了,让我们从这里开始正式的看代码.

将一个任务提交至线程池,主要有三个分支:

  • 当前工作线程数小于核心线程数,新增一个工作线程
  • 如果线程大于核心线程数,但是可以入队成功.
  • 如果新增线程失败,且工作队列满了,就拒绝任务.

这里涉及到最重要的一个方法,就是新增一个工作线程.

新增工作线程 addWorker

新增工作线程时,需要提供两个参数:

新增的第一个任务, 以及是否是核心线程

上面已经备注了一些关键注释,这里再总结下新增工作线程时做了什么:

  1. 如果线程池状态不ok,返回失败.
  2. 自旋判断数量是否超出核心线程数或者最大线程数的限制,没有的话尝试增加工作线程计数.直到成功
  3. 新建一个工作线程(同时从线程工厂新创建一个线程),将工作线程添加到集合中,然后让工作线程运行第一个任务.
  4. 如果期间失败了,就清理相关属性,尝试终止线程池.

任务出队

在提交任务时,如果核心线程数满了,此时会将任务放入工作队列,那么什么时候出队呢?

每一个工作线程启动后,首先会执行创建它时的第一个任务,执行完后,会调用getTask()来获取下一个任务.

这个方法比较简单,核心思路就是从等待队列中获取第一个元素,给调用的工作线程执行.

只是在其中夹杂了一些是否需要超时死亡,是否已经超时的代码. 用是否返回一个任务,来控制调用方的工作线程是否应该死亡.

如何拒绝任务?

回顾下拒绝任务的几种情况:

  1. 线程池终止了
  2. 线程池的工作线程以及工作队列都满了.

拒绝时调用reject(command).

额,比较简单,就是直接调用RejectedExecutionHandler.rejectedExecution方法,因此当需要实现自己的拒绝策略时,记得实现一个这个接口的实现类即可.

等待终止

经常在我们提交完任务后,想要等线程池中的所有方法执行完毕,我们再进行下一步操作,这个当然是可以通过CountDownLatchCyclicBarrier等同步器来实现的,但是线程池其实已经实现了类似的功能.

分为两步.

  1. 手动调用shutdown来关闭线程池,线程池会将所有能关闭的工作线程都关闭掉.之后尝试终止线程池
  2. 阻塞调用awaitTermination来等待线程池关闭,继续下一个步骤.

预热线程

如果在创建线程池之前,已经有一个有大量值的工作队列,我们可能希望预创建一些线程.

  • prestartAllCoreThreads 预创建所有线程
  • prestartCoreThread 预创建核心线程

删除任务

如果我们已经提交了一个任务,后悔了,或者说我们想删除掉所有等待的任务怎么办呢?

监控方法

这个类提供了大量的get/set方法,来监控当前线程池内的各种状态,以及动态的修改一些参数.

  • isShutdown 是否关闭
  • isTerminating 是否正在终止
  • isTerminated 是否已经终止
  • setThreadFactory 设置线程工厂
  • getThreadFactory 获取线程工厂
  • setRejectedExecutionHandler 设置拒绝策略
  • getRejectedExecutionHandler 获取拒绝策略
  • setCorePoolSize 设置核心线程数
  • getCorePoolSize 获取核心线程数
  • allowsCoreThreadTimeOut 核心线程是否允许超时死亡
  • setMaximumPoolSize 设置最大线程数
  • getMaximumPoolSize 获取最大线程数
  • setKeepAliveTime 设置活跃时间
  • getKeepAliveTime 获取活跃时间
  • getQueue 获取工作队列
  • getPoolSize 获取当前线程池的大小
  • getActiveCount 获取活跃工作线程数量
  • getLargestPoolSize 获取到达过的最大线程池大小
  • getTaskCount 获取任务数量
  • getCompletedTaskCount 获取已经完成的任务数量

线程工厂

线程工厂实现了ThreadFactory接口. 在Executors中的默认实现为:

比较简单,采用统一的线程组,递增的线程池编号,递增的线程编号,统一的前缀,不是守护线程作为参数创建一个线程.

拒绝策略

ThreadPoolExecutor默认提供了4种拒绝策略.

  • AbortPolicy 默认实现,拒绝时直接抛出异常
  • CallerRunsPolicy 拒绝时让调用方的线程执行这个任务,这是一个反馈型的控制策略,可以让提交任务的速度慢下来
  • DiscardPolicy 不能执行的任务直接丢弃掉.
  • DiscardOldestPolicy 拒绝时,丢弃掉最老的任务,也就是等待队列的第一个节点.

实际上根据需要,可以自己实现一些策略,这里简单列举两个:

  1. 让调用方等待

上面的示例,调用阻塞方法put,在线程池中有一个任务完成,等待队列中空出一个位置时,该方法得以继续向下运行.

如果每个任务运行时间足够长,这个策略会导致提交任务的线程长时间阻塞,比较浪费.

  1. 添加日志等

当一些重要程序中,发生异常,导致异常拒绝,需要打印日志,并发送邮件等通知开发者.

  1. 强行新建线程运行

有时,我们宁愿服务器累死,也不想拒绝任务,可以使用这个.

不管任何强行,强行创建一个不受线程池管理的线程,去运行这个任务.

线程池工厂

19年介绍过Executors提供的4个工厂方法,这里不重复了.

Java中executors提供的的4种线程池

完.

完。

联系我

最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。

以上皆为个人所思所得,如有错误欢迎评论区指正。

欢迎转载,烦请署名并保留原文链接。

联系邮箱:huyanshi2580@gmail.com

更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 简介
  • 源码阅读
    • 常量
      • 变量
        • 内部同步器 Worker 工作线程
          • 构造方法
            • 提交任务 execute
              • 新增工作线程 addWorker
            • 任务出队
              • 如何拒绝任务?
                • 等待终止
                  • 预热线程
                    • 删除任务
                      • 监控方法
                        • 线程工厂
                          • 拒绝策略
                            • 线程池工厂
                            • 联系我
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档