其实早在19年,就简单的写过ThreadPoolExecutor
. 但是只涉及到了其中两个参数,理解也不深刻,今天重新看一下代码。
这个类是Java中常用的线程池的一个类,相关的类图:
继承自父类: AbstractExecutorService
,实现了Executor
和ExecutorService
接口.
使用一些池化的线程来执行每一个提交的任务,一般使用Executors
的工厂方法来进行相关的配置.
线程池解决两个不同的问题:由于减少了每个任务的调用开销,它们通常在执行大量异步任务时提供更好的性能, 并且它们提供了一种限制和管理资源的方法,包括在执行集合时消耗的线程任务。 每个 ThreadPoolExecutor 还维护一些基本的统计信息,例如已完成的任务数。
为了在更加广泛的上下文中可用,这个类提供了许多可以调整的参数和可以扩展的挂钩. 但是强烈建议程序员使用Executors
的工厂方法来进行这个类的创建.
如果你要手动创建和配置的话,以下是一些使用指南:
线程池自动调整池子的大小,调整范围在corePoolSize
和maximumPoolSize
之间. 如果正在运行的线程少于核心线程数, 处理请求时将创建一个新的线程.即使当前有一些线程是空闲的. 如果当前运行的线程数少于最大线程数.只有当工作队列满了的时候,才会创建新的线程. 设置核心线程数等于最大线程数,就创建了一个固定大小的线程池. 设置一个无限大的最大线程数,就允许线程池拥有任意数量的线程. 通常,核心线程数和最大线程数只在调用构造方法时设置,但是也可以被对应的set方法修改.
默认情况下, 核心线程也是在有新的任务到来时才会初始化. 但是这一点可以被动态的重写,使用prestartCoreThread
和prestartAllCoreThreads
. 如果你创建线程时已经有一个不为空的队列,你可能想要预启动线程.
使用ThreadFactory
创建新线程. 默认使用Executors.defaultThreadFactory
. 他创建的线程全在同一个ThreadGroup
中. 有用相同的优先级,且不是守护线程. 使用其他的线程工厂,你可以修改线程的名字,线程组,优先级,是否是守护线程等等. 如果线程工厂在创建新线程时出错,调用newThread
时返回null.执行器会继续,但是可能没有办法执行任务了. 线程应该有用”修改线程”的运行时权限. 如果工作线程或者其他线程没有取得这个权限,服务将退化.配置的更改可能不起效,一个终止的线程池可能还处在未完成状态中.
如果一个线程池有超过核心线程数的线程数量,超过核心线程数的线程将在空闲超过keepAliveTime
时间后被终止. 当线程池没有完全应用起来时,这提供了一个减少资源消耗的方法.如果之后线程池变得更加活跃,将新创建线程. 这个参数也可以动态的更改. 使用Long.Max_Value意味着空闲线程永远不会终止. 默认实现中,保持活跃
策略只有在线程数大于核心线程时被应用. 但是allowCoreThreadTimeOut
可以将这个策略也应用在核心线程上.
线程池应用一个BlockingQueue
来持有提交的任务,这个队列的使用和线程池的大小有关系:
常见的排队策略有三个:
工作队列的一个默认实现是SynchronousQueue
,他将任务直接交给线程,而不是使用其他方式来保留任务。 这种实现下,如果当前没有一个线程是立刻可用的,那么入队一个任务将会失败. 因此会创建一个新的线程. 这个策略避免了处理一系列内部依赖的任务时造成的锁定. 直接交接通常要求吴杰的最大线程数,以避免拒绝新任务. 当任务的到达速度,大于处理速度时,线程池将会无限增长.
使用一个无界队列(没有给定容量的LinkedBlockingQueue)将会使新任务在队列中等待,如果所有的核心线程都在忙碌时. 因此,不会有超过核心线程数的线程被创建. (最大线程数这个参数就没有作用了.) 当任务之间完全互相独立时,这可能是有用的,因为任务不会影响彼此的执行. 比如在web网页服务中. 这个风格的排队策略,在处理平滑的请求速度中的尖刺时很有用,但是当任务的到达速度大于处理速度时,工作队列将会无线增长.
有界队列防止资源耗尽,但是会更加难以控制. 队列的大小和最大线程数可能会不断影响彼此. 使用大的等待队列和比较小的线程池意味着较小的cpu使用率,系统资源以及上下文切换的浪费, 但是吞吐量会较低. 如果任务频繁的阻塞,系统可能可以调度时间到更多线程,远超过你搜允许的。 使用较小的队列通常要求更大的线程池,会导致CPU繁忙但是可能会遇到不可调度开销,这也会降低吞吐量.
如果当前线程池已经终止了,或者所有的可用线程和工作队列都满了,新提交的任务将会被拒绝. 在这些情况下,执行方法将会调用RejectedExecutionHandler.rejectedExecution
. 提供了4种预定义的处理策略:
还可以实现其他的拒绝策略,也可以自己实现
这个类提供了beforeExecute
和afterExecute
方法, 在每个任务被执行之前和之后进行调用. 这些方法用来操作执行环境. 比如, 初始化ThreadLocal
的值, 搜集一些统计信息,或者添加统计信息.
terminated
方法可以被重写,以在线程池完全终止后,执行一些特殊的操作.
如果挂钩,回调,等待队列等抛出异常,内部的工作线程可能会失败,终止,或者被替换.
getQueue
允许访问工作队列,以用来监控或者进行调试.如果用于其他目的的话,很不好. 当大量排队任务被取消时,remove
和purge
两个方法可以用来协助回收存储.
程序中不再引用并且没有剩余线程的线程池可以在不显示关闭的情况下,被垃圾回收收集。 您可以通过设置合适的存活时间,使用一个较少的核心线程数,或者允许allowCoreThreadTimeOut
来允许所有未使用的线程死亡.
这个类的大部分扩展类都重写了一个或者多个hook. 比如下面这个子类添加了一个简单的暂停,继续功能:
在执行每一个任务之前,检查当前线程池是否被暂停了,如果是,自旋,等待外部唤醒.
由于内部使用一个int来存储当前活跃的线程数和线程池的状态,因为需要一些bit位以及状态的定义,都在常量里面了.
这个类提供了4个构造方法,不过本质上都是最后一个.
比较简单,首先做了一些参数的检查,之后进行赋值.
一个线程池,最重要,最常用的方法就是提交任务了,让我们从这里开始正式的看代码.
将一个任务提交至线程池,主要有三个分支:
这里涉及到最重要的一个方法,就是新增一个工作线程.
新增工作线程时,需要提供两个参数:
新增的第一个任务, 以及是否是核心线程
上面已经备注了一些关键注释,这里再总结下新增工作线程时做了什么:
在提交任务时,如果核心线程数满了,此时会将任务放入工作队列,那么什么时候出队呢?
每一个工作线程启动后,首先会执行创建它时的第一个任务,执行完后,会调用getTask()
来获取下一个任务.
这个方法比较简单,核心思路就是从等待队列中获取第一个元素,给调用的工作线程执行.
只是在其中夹杂了一些是否需要超时死亡,是否已经超时的代码. 用是否返回一个任务,来控制调用方的工作线程是否应该死亡.
回顾下拒绝任务的几种情况:
拒绝时调用reject(command)
.
额,比较简单,就是直接调用RejectedExecutionHandler.rejectedExecution
方法,因此当需要实现自己的拒绝策略时,记得实现一个这个接口的实现类即可.
经常在我们提交完任务后,想要等线程池中的所有方法执行完毕,我们再进行下一步操作,这个当然是可以通过CountDownLatch
和CyclicBarrier
等同步器来实现的,但是线程池其实已经实现了类似的功能.
分为两步.
shutdown
来关闭线程池,线程池会将所有能关闭的工作线程都关闭掉.之后尝试终止线程池awaitTermination
来等待线程池关闭,继续下一个步骤.如果在创建线程池之前,已经有一个有大量值的工作队列,我们可能希望预创建一些线程.
如果我们已经提交了一个任务,后悔了,或者说我们想删除掉所有等待的任务怎么办呢?
这个类提供了大量的get/set方法,来监控当前线程池内的各种状态,以及动态的修改一些参数.
线程工厂实现了ThreadFactory
接口. 在Executors
中的默认实现为:
比较简单,采用统一的线程组,递增的线程池编号,递增的线程编号,统一的前缀,不是守护线程作为参数创建一个线程.
ThreadPoolExecutor
默认提供了4种拒绝策略.
实际上根据需要,可以自己实现一些策略,这里简单列举两个:
上面的示例,调用阻塞方法put,在线程池中有一个任务完成,等待队列中空出一个位置时,该方法得以继续向下运行.
如果每个任务运行时间足够长,这个策略会导致提交任务的线程长时间阻塞,比较浪费.
当一些重要程序中,发生异常,导致异常拒绝,需要打印日志,并发送邮件等通知开发者.
有时,我们宁愿服务器累死,也不想拒绝任务,可以使用这个.
不管任何强行,强行创建一个不受线程池管理的线程,去运行这个任务.
19年介绍过Executors
提供的4个工厂方法,这里不重复了.
完.
完。
最后,欢迎关注我的个人公众号【 呼延十 】,会不定期更新很多后端工程师的学习笔记。 也欢迎直接公众号私信或者邮箱联系我,一定知无不言,言无不尽。
以上皆为个人所思所得,如有错误欢迎评论区指正。
欢迎转载,烦请署名并保留原文链接。
联系邮箱:huyanshi2580@gmail.com
更多学习笔记见个人博客或关注微信公众号 <呼延十 >——>呼延十