java中的所说的线程池,一般都是围绕着 ThreadPoolExecutor 来展开的。其他的实现基本都是基于它,或者模仿它的。所以只要理解 ThreadPoolExecutor, 就相当于完全理解了线程池的精髓。
其实要理解一个东西,一般地,我们最好是要抱着自己的疑问或者理解去的。否则,往往收获甚微。
理解 ThreadPoolExecutor, 我们可以先理解一个线程池的意义: 本质上是提供预先定义好的n个线程,供调用方直接运行任务的一个工具。
线程池解决的问题:
1. 提高任务执行的响应速度,降低资源消耗。任务执行时,直接立即使用线程池提供的线程运行,避免了临时创建线程的CPU/内存开销,达到快速响应的效果。
2. 提高线程的可管理性。线程总数可预知,避免用户主动创建无限多线程导致死机风险,还可以进行线程统一的分配、调优和监控。
3. 避免对资源的过度使用。在超出预期的请求任务情况,响应策略可控。
线程池提供的核心接口:
要想使用线程池,自然是要理解其接口的。一般我们使用 ExecotorService 进行线程池的调用。然而,我们并不针对初学者。
整体的接口如下:
我们就挑几个常用接口探讨下:
submit(Runnable task): 提交一个无需返回结果的任务。
submit(Callable task): 提交一个有返回结果的任务。
invokeAll(Collection
shutdown(): 关闭线程程池。
awaitTermination(long timeout, TimeUnit unit): 等待关闭结果,最长不超过timeout时间。
以上是ThreadPoolExector 提供的特性,针对以上特性。
我们应该要有自己的几个实现思路或疑问:
1. 线程池如何接受任务?
2. 线程如何运行任务?
3. 线程池如何关闭?
接下来,就让我们带着疑问去看实现吧。
ThreadPoolExecutor 核心实现原理
1. 线程池的处理流程
我们首先重点要看的是,如何执行提交的任务。我可以通过下图来看看。
总结描述下就是:
1. 判断核心线程池是否已满,如果不是,则创建线程执行任务
2. 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中
3. 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务
4. 如果线程池也满了,则按照拒绝策略对任务进行处理
另外,我们来看一下 ThreadPoolExecutor 的构造方法,因为这里会体现出每个属性的含义。
从构造方法可以看出 ThreadPoolExecutor 的主要参数 7 个,在其注释上也有说明功能,咱们翻译下每个参数的功能:
2. submit 流程详解
当调用 submit 方法,就是向线程池中提交一个任务,处理流程如步骤1所示。但是我们需要更深入理解。
submit 方法是定义在 AbstractExecutorService 中,最终调用 ThreadPoolExecutor 的 execute 方法,即是模板方法模式的应用。
通过上面这一小段代码,我们就已经完整地看到了。通过一个 ctl 变量进行全局状态控制,从而保证了线程安全性。整个框架并没有使用锁,但是却是线程安全的。
整段代码刚好完整描述了线程池的执行流程:
1. 判断核心线程池是否已满,如果不是,则创建线程执行任务;
2. 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中;
3. 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务;
4. 如果线程池也满了,则按照拒绝策略对任务进行处理;
2.1. 添加新的worker
一个worker,即是一个工作线程。
2.2. 当添加队列成功后,发现线程池状态变更,需要进行移除队列操作
3. 添加失败进行执行拒绝策略
4. Worker 的工作机制
从上面的实现中,我们可以看到,主要是对 Worker 的添加和 workQueue 的添加,所以具体的工作是由谁完成呢?自然就是 Worker 了。
所以,Worker的作用就体现出来了,一个循环取任务执行任务过程:
1. 有一个主循环一直进行任务的获取;
2. 针对有超时的设置,会使用poll进行获取任务,如果超时,则 Worker 将会退出循环结束线程;
3. 无超时的设置,则会使用 take 进行阻塞式获取,直到有值;
4. 获取任务执行前置+业务+后置任务;
5. 当获取到null的任务之后,当前Worker将会结束;
6. 当前Worker结束后,将会判断是否有必要维护最低Worker数,从而决定是否再添加Worker进来。
还是借用一个网上同学比较通用的一个图来表述下 Worker/ThreadPoolExecutor 的工作流程吧(已经很完美,不需要再造这轮子了)
5. shutdown 操作实现
ThreadPoolExecutor 是通过 ctl 这个变量进行全局状态维护的,shutdown 在线程池中也是表现为一个状态,所以应该是比较简单的。
6. invokeAll 的实现方式
invokeAll, 望文生义,即是调用所有给定的任务。想来应该是一个个地添加任务到线程池队列吧。
实现很简单,都是些外围调用。
7. ThreadPoolExecutor 的状态值的设计
通过上面的过程,可以看到,整个ThreadPoolExecutor 非状态的依赖是非常强的。所以一个好的状态值的设计就显得很重要了,runState 代表线程池或者 Worker 的运行状态。如下:
8. awaitTermination 等待关闭完成
从上面的 shutdown, 可以看到,只是写了 SHUTDOWN 标识后,尝试尽可能地中断停止Worker线程,但并不保证中断成功。要想保证停止完成,需要有另外的机制来保证。从 awaitTermination 的语义来说,它是能保证任务停止完成的,那么它是如何保证的呢?
看起来, awaitTermination 并没有什么特殊操作,而是一直在等待。所以 TERMINATED 是 Worker 自行发生的动作。
那是在哪里做的操作呢?其实是在获取任务的时候,会检测当前状态是否是 SHUTDOWN, 如果是SHUTDOWN且 队列为空,则会触发获取任务的返回null.从而结束当前 Worker.
Worker 在结束前会调用 processWorkerExit() 方法,里面会再次调用 tryTerminate(), 当所有 Worker 都运行到这个点后, awaitTermination() 就会收到通知了。(注意: processWorkerExit() 会在每次运行后进行 addWorker() 尝试,但是在 SHUTDOWN 状态的添加操作总是失败的,所以不用考虑)
到此,你是否可以解答前面的几个问题了呢?
领取专属 10元无门槛券
私享最新 技术干货