线程池是一种为了避免线程频繁的创建和销毁,带来性能消耗而建立的一种池化技术。
它是把已创建的线程放入“池”中,当有任务来临,就可以重用已有的线程,无需等待创建的过程,可以有效提高程序的响应速度。
阿里巴巴的java开发手册规定:
线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的读者更加明确线程池的运行规则,规避资源耗尽的风险。
说明:Executors的弊端如下
FixedThreadPool和SingleThreadPool允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求,导致OOM
CachedThreadPool和ScheduledThreadPool允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量线程,导致OOM
但是,事实上这四个创建方法底层都是通过ThreadPoolExecutor创建。
一、ThreadPoolExecutor构建时传递的参数
int corePoolSize
线程池常驻核心线程数
解释:
如果为0,则表示没有线程时销毁线程池。
如果不为0,即使没有任务,也会保证线程的数量等于这个值。
注意:设置的比较小,会频繁的创建和销毁线程。设置的比较大,会浪费资源。
int maximumPoolSize
最多可以创建的线程数
官方规定:必须大于0,且必须大于corePoolSize
任务比较多,且不能放入任务队列,才会用到
long keepAliveTime
线程存活时间
当线程池空闲,且超过这个时间,多余的线程就会被销毁,直到线程数量 = corePoolSize
如果maximumPoolSize = corePoolSize,当线程池空闲时也不会销毁任何线程
TimeUnit unit
存活时间的单位
是个枚举方法,存放了从nm~day的单位,及互相的转换进制
BlockingQueue<Runnable> workQueue
线程池执行的任务队列
当线程池的所有线程都在处理任务,新来的任务就会在此队列中排队等待执行
ThreadFactory threadFactory
线程的创建工厂
用的较少,大部分用的默认线程创建工厂
也可以通过实现ThreadFactory,自定义线程名称和线程执行的优先级等属性。
RejectedExecutionHandler handler
线程池的拒绝策略,是一种限流保护
线程池任务已经在缓存队列workQueue已经满了,并且不能创建新的线程,就会用到拒绝策略
默认是ThreadPoolExecutor的静态内部类AbortPolicy,拒绝方法如下:
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
二、线程池状态
运行(RUNNING):该状态下的线程池接收新任务并处理队列中的任务;线程池创建完毕就处于该状态,也就是正常状态;
关机(SHUTDOWN):线程池不接受新任务,但处理队列中的任务;线程池调用shutdown()之后的池状态;
停止(STOP):线程池不接受新任务,也不处理队列中的任务,并中断正在执行的任务;线程池调用shutdownNow()之后的池状态;
清理(TIDYING):线程池所有任务已经终止,workCount(当前线程数)为0;过渡到清理状态的线程将运行terminated()钩子方法;
终止(TERMINATED):terminated()方法结束后的线程池状态;
三、线程池主要方法解析
ThreadPoolExecutor中有个int型变量(其实是AtomicInteger类型)ctl,
它的作用是存储线程池的状态和工作线程数量,
因为ThreadPoolExecutor中定义的状态有5种(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED),
用1位表示,则只有0、1两种情况,只能表示两种状态;
用2位表示,有00、01、10、11四种情况,只能表示四种状态;
用3位表示,有222=8种情况;
所以状态值至少要用3位,那么就可以用int的高3位来表示(最左边3个),剩下29个就可以表示线程数量,所以workerCount最大值为2^29-1
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
每当线程池中的线程数量或状态发生变化时,具体操作的便是ctl变量,如以下方法:
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
那么又是读取线程状态和数量的值呢:
读取状态利用以下方法:
private static int runStateOf(int c) { return c & ~CAPACITY; }
CAPACITY是个常量00011111 11111111 11111111 11111111,通过 &(按位与)运算,
可以保留高3位,把低29位全部变为0;
读取数量利用以下方法:
private static int workerCountOf(int c) { return c & CAPACITY; }
可以把高3位变为0,低29位保留。
addWorker
addWorker(Runnable firstTask, boolean core)
firstTask
线程应首先运行的任务,如果没有则可以置为null
core
判断是否可以创建线程的阈值(最大值)
如果等于true则表示用corePoolSize做为阈值(false则用maximumPoolSize)
execute方法
public void execute(Runnable command) {// 校验传进来的线程是否为空
if (command == null)
throw new NullPointerException();
// 线程池里的线程 + 1 这个数就 + 1,第一个线程是-536870912
// c + CAPACITY + 2 = 线程数
int c = ctl.get();
// workerCountOf(C) = c & (1 << Integer.SIZE - 3) - 1 = c & (1 << 29) - 1
// 池内线程数量 < 线程池常驻核心线程数
if (workerCountOf(c) < corePoolSize) {
// 当常驻线程数 < 设定常驻线程上限时,往workerSet中添加新线程,并返回(后面是另一种场景的判断)
if (addWorker(command, true))
return;
// 重新获取当前线程数
c = ctl.get();
}
// c < 0,即线程池还处于running状态则尝试将线程放入阻塞队列,后者则成功返回true
if (isRunning(c) && workQueue.offer(command)) {
// 获取当前线程数
int recheck = ctl.get();
// 再判断线程池是否是running状态,如果否,则尝试将线程从线程池中删除
if (! isRunning(recheck) && remove(command))
// 拒绝策略,抛出异常
reject(command);
// 如果发现没有worker,则补充一个空任务的worker
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// !c < 0 或 c < 0 && 尝试放入阻塞队列失败
else if (!addWorker(command, false))
// 拒绝策略,抛出异常
reject(command);
}
上文中22行reject调用
handler.rejectedExecution(command, this);
这里的handler我们在debugger的时候可以看到是ThreadPoolExecutor的内部类,如下图1。也可以从明明和注释中看出,默认是调用内部类AbortPolicy的方法,如图2
这个方法没有什么作用,就是抛出一个异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
四、常见面试题
ThreadPoolExecutor的执行方法有几种,他们有什么区别
execute()和submit
他们都是用来执行线程池任务的
submit可以接收线程池的返回值(Callable<V>&Future<V>),是ExecutorService的方法
execute不能接受返回值,是Executor的方法
什么是线程的拒绝策略
当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。
拒绝策略提供顶级接口 RejectedExecutionHandler ,其中方法 rejectedExecution 即定制具体的拒绝策略的执行逻辑。
拒绝策略的分类有哪些
AbortPolicy,默认,线程池会抛出异常,并终止执行
CallerRunsPolicy,把任务交给当前线程来执行
DiscardPolicy,忽略此任务(新任务)
DiscardOldestPolicy ,如果线程池没有关闭,则丢弃阻塞队列中最老的一个
如何自定义拒绝策略
实现RejectedExecutionHandler接口,重写rejectedExecution 方法。
ThreadPoolExecutor能不能实现扩展,如何实现扩展
重写beforeExecute和afterExecute方法(添加日志,实现数据统计,执行时间统计)
题外话
为什么不推荐使用ThreadGroup,我们可以通过Thread.currentThread()获取当前线程,再通过Thread.getThreadGroup返回线程所属的线程组,然后调用ThreadGroup的destroy、interrupt、stop、resume等方法影响线程组中的其他线程,导致线程不安全等问题。
疑问
既然已经弃用,为什么Executors中还是有ThreadGroup的身影?