博客:https://juejin.im/user/5c629a3051882562191755d8
前言
线程池是Java面试必问问题之一!
有没有对源码滚瓜烂熟的童鞋?请举手!??♂️(怎么没人举手。。)
对了,今天先来撒一波狗狼~
(表打我~)
来,介绍下:
她叫码妞,是我码仔的女朋友喔!
她也在学习各类前端技术,可厉害了!
大家鼓掌欢迎吧!以后她会经常来问我问题的,要被烦了~
最近码妞也在看Java线程池呢,已经看得一头雾水了,正准备去问问码仔,
看码仔能不能给她讲明白了!
线程
线程是一种资源,并不是只存在程序的世界里。
程序,本来就是对生活的一种抽象表述。
比如像车站的售票窗口、退票窗口、检票窗口,每个窗口都在做不同的事情,就是车站里同时运行着的不同线程。
线程多了,需要管理,不同的线程也要能保证不会互相干扰,各做各的。
线程的生命周期
这个图很熟悉的吧~
好,开始讲线程池啦~
ThreadPoolExecutor
线程池源码里最主要的类了~
看下开头的这段注释:
/**
* The main pool control state, ctl, is an atomic integer packing
* two conceptual fields
* workerCount, indicating the effective number of threads
* runState, indicating whether running, shutting down etc
*
* The runState provides the main lifecycle control, taking on values:
看到英文就头晕?没事啦~
主要讲线程池重要的两个状态:
@ReachabilitySensitive
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
线程池用一个32位的int来同时保存runState和workerCount,其中高3位(第31到29位)是runState,其余29位是workerCount(大约500 million)。
来看看存储结构(码仔手动画的哦
)
它的构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
核心线程数,好比班干部的人数。
最大线程数,好比教室里的座位数。 当提交任务数超过了这个最大值,线程还有拒绝策略——RejectExecutionHandler,做不动了嘛。
除核心线程外的空闲线程保持存活时间。 当线程池里线程数超过corePoolSize数量了,keepAliveTime时间到,就把空闲线程关了,不然也闲置了呀,节省能量嘛。
Worker来了!
你看Worker的定义,其实它就是封装了的工作线程~
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
Worker既实现了Runnable,又继承了AbstractQueuedSynchronizer(AQS),所以它既是一个可执行的任务,又可以达到锁的效果。
看看Worker构造方法:
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
线程池是怎么工作的?
DuangDuangDuang!
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/**
* 2、如果线程池RUNNING状态,且入队列成功
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果再次校验过程中,线程池不是RUNNING状态,
// 并且remove(command)--workQueue.remove()成功,拒绝当前command
if (! isRunning(recheck) && remove(command))
reject(command);
//为什么只检查运行的worker数量是不是0呢?? 为什么不和corePoolSize比较呢??
// 只保证有一个worker线程可以从queue中获取任务执行就行了??
// 因为只要还有活动的worker线程,就可以消费workerQueue中的任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/**
* 3、如果线程池不是running状态 或者 无法入队列
* 尝试开启新线程,扩容至maxPoolSize,
* 如果addWork(command, false) 失败了,拒绝当前command
*/
else if (!addWorker(command, false))
reject(command);
}}
看execute方法里的注释,一步步说得很清楚。
给你流程图!
工具类 Executors
线程池里还有个重要的类:Executors~
Executors是一个Java中的工具类,它提供工厂方法来创建不同类型的线程池。
用它可以很方便地创建出下面几种线程池来~
ExecutorService singleService = Executors.newSingleThreadExecutor();
ExecutorService fixedService = Executors.newFixedThreadPool(9);
ExecutorService cacheService = Executors.newCacheThreadPool();
或者通过ThreadPoolExecutor的构造函数自定义需要的线程池。