首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

线程池技术之:ThreadPoolExecutor 源码解析

java中的所说的线程池,一般都是围绕着 ThreadPoolExecutor 来展开的。其他的实现基本都是基于它,或者模仿它的。所以只要理解 ThreadPoolExecutor, 就相当于完全理解了线程池的精髓。

其实要理解一个东西,一般地,我们最好是要抱着自己的疑问或者理解去的。否则,往往收获甚微。

理解 ThreadPoolExecutor, 我们可以先理解一个线程池的意义: 本质上是提供预先定义好的n个线程,供调用方直接运行任务的一个工具。

线程池解决的问题:

1. 提高任务执行的响应速度,降低资源消耗。任务执行时,直接立即使用线程池提供的线程运行,避免了临时创建线程的CPU/内存开销,达到快速响应的效果。

2. 提高线程的可管理性。线程总数可预知,避免用户主动创建无限多线程导致死机风险,还可以进行线程统一的分配、调优和监控。

3. 避免对资源的过度使用。在超出预期的请求任务情况,响应策略可控。

线程池提供的核心接口:

要想使用线程池,自然是要理解其接口的。一般我们使用 ExecotorService 进行线程池的调用。然而,我们并不针对初学者。

整体的接口如下:

我们就挑几个常用接口探讨下:

submit(Runnable task): 提交一个无需返回结果的任务。

submit(Callable task): 提交一个有返回结果的任务。

invokeAll(Collection

shutdown(): 关闭线程程池。

awaitTermination(long timeout, TimeUnit unit): 等待关闭结果,最长不超过timeout时间。

以上是ThreadPoolExector 提供的特性,针对以上特性。

我们应该要有自己的几个实现思路或疑问:

1. 线程池如何接受任务?

2. 线程如何运行任务?

3. 线程池如何关闭?

接下来,就让我们带着疑问去看实现吧。

ThreadPoolExecutor 核心实现原理

1. 线程池的处理流程

我们首先重点要看的是,如何执行提交的任务。我可以通过下图来看看。

总结描述下就是:

1. 判断核心线程池是否已满,如果不是,则创建线程执行任务

2. 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中

3. 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务

4. 如果线程池也满了,则按照拒绝策略对任务进行处理

另外,我们来看一下 ThreadPoolExecutor 的构造方法,因为这里会体现出每个属性的含义。

从构造方法可以看出 ThreadPoolExecutor 的主要参数 7 个,在其注释上也有说明功能,咱们翻译下每个参数的功能:

2. submit 流程详解

当调用 submit 方法,就是向线程池中提交一个任务,处理流程如步骤1所示。但是我们需要更深入理解。

submit 方法是定义在 AbstractExecutorService 中,最终调用 ThreadPoolExecutor 的 execute 方法,即是模板方法模式的应用。

通过上面这一小段代码,我们就已经完整地看到了。通过一个 ctl 变量进行全局状态控制,从而保证了线程安全性。整个框架并没有使用锁,但是却是线程安全的。

整段代码刚好完整描述了线程池的执行流程:

1. 判断核心线程池是否已满,如果不是,则创建线程执行任务;

2. 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中;

3. 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务;

4. 如果线程池也满了,则按照拒绝策略对任务进行处理;

2.1. 添加新的worker

一个worker,即是一个工作线程。

2.2. 当添加队列成功后,发现线程池状态变更,需要进行移除队列操作

3. 添加失败进行执行拒绝策略

4. Worker 的工作机制

从上面的实现中,我们可以看到,主要是对 Worker 的添加和 workQueue 的添加,所以具体的工作是由谁完成呢?自然就是 Worker 了。

所以,Worker的作用就体现出来了,一个循环取任务执行任务过程:

1. 有一个主循环一直进行任务的获取;

2. 针对有超时的设置,会使用poll进行获取任务,如果超时,则 Worker 将会退出循环结束线程;

3. 无超时的设置,则会使用 take 进行阻塞式获取,直到有值;

4. 获取任务执行前置+业务+后置任务;

5. 当获取到null的任务之后,当前Worker将会结束;

6. 当前Worker结束后,将会判断是否有必要维护最低Worker数,从而决定是否再添加Worker进来。

还是借用一个网上同学比较通用的一个图来表述下 Worker/ThreadPoolExecutor 的工作流程吧(已经很完美,不需要再造这轮子了)

5. shutdown 操作实现

ThreadPoolExecutor 是通过 ctl 这个变量进行全局状态维护的,shutdown 在线程池中也是表现为一个状态,所以应该是比较简单的。

6. invokeAll 的实现方式

invokeAll, 望文生义,即是调用所有给定的任务。想来应该是一个个地添加任务到线程池队列吧。

实现很简单,都是些外围调用。

7. ThreadPoolExecutor 的状态值的设计

通过上面的过程,可以看到,整个ThreadPoolExecutor 非状态的依赖是非常强的。所以一个好的状态值的设计就显得很重要了,runState 代表线程池或者 Worker 的运行状态。如下:

8. awaitTermination 等待关闭完成

从上面的 shutdown, 可以看到,只是写了 SHUTDOWN 标识后,尝试尽可能地中断停止Worker线程,但并不保证中断成功。要想保证停止完成,需要有另外的机制来保证。从 awaitTermination 的语义来说,它是能保证任务停止完成的,那么它是如何保证的呢?

看起来, awaitTermination 并没有什么特殊操作,而是一直在等待。所以 TERMINATED 是 Worker 自行发生的动作。

那是在哪里做的操作呢?其实是在获取任务的时候,会检测当前状态是否是 SHUTDOWN, 如果是SHUTDOWN且 队列为空,则会触发获取任务的返回null.从而结束当前 Worker.

Worker 在结束前会调用 processWorkerExit() 方法,里面会再次调用 tryTerminate(), 当所有 Worker 都运行到这个点后, awaitTermination() 就会收到通知了。(注意: processWorkerExit() 会在每次运行后进行 addWorker() 尝试,但是在 SHUTDOWN 状态的添加操作总是失败的,所以不用考虑)

到此,你是否可以解答前面的几个问题了呢?

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20210312A01G0P00?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券