上一篇从整体上介绍了Executor
接口,从上一篇我们知道了Executor
框架的最顶层实现是ThreadPoolExecutor
类,Executors
工厂类中提供的newScheduledThreadPool
、newFixedThreadPool
、newCachedThreadPool
方法
其实本质上也只是ThreadPoolExecutor
的构造函数参数不同而已。通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池,那么它的底层原理是怎样实现的呢,这篇就来介绍下ThreadPoolExecutor
线程池的运行过程。
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
在ThreadPoolExecutor类中提供了四个构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
ThreadPoolExecutor的完整构造方法的签名是:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
ArrayBlockingQueue; //有界队列
LinkedBlockingQueue; //无界队列
SynchronousQueue; //特殊的一个队列,只有存在等待取出的线程时才能加入队列,可以说容量为1,是无界队列
PriorityBlockingQueue
Executors
工厂方法 Executors.newCachedThreadPool()
(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)
(固定大小线程池)Executors.newSingleThreadExecutor()
(单个后台线程)
它们均为大多数使用场景预定义了设置。”
下面介绍一下几个类的源码:
ExecutorService es = executor.newFixedThreadPool (int nThreads):固定大小线程池。
可以看到,corePoolSize和maximumPoolSize的大小是一样的(实际上,后面会介绍,如果使用无界queue的话 maximumPoolSize参数是没有意义的),keepAliveTime和unit的设值表明什么?-就是该实现不想keep alive!最后的BlockingQueue选择了LinkedBlockingQueue,该queue有一个特点,他是无界的,为什么是使用无界的LinkedBlockingqueue 是因为无法知道用户设置多大的固定线程数量,但是数量肯定在无界的范围内。
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
ExecutorService newSingleThreadExecutor():单线程
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
ExecutorService newCachedThreadPool():无界线程池,可以进行自动线程回收
这个实现就有意思了。首先是无界的线程池,所以我们可以发现maximumPoolSize为big big。其次BlockingQueue的选择上使用SynchronousQueue。可能对于该BlockingQueue有些陌生,简单说:该 QUEUE中,每个插入操作必须等待另一个线程的对应移除操作。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
public abstract class AbstractExecutorService implements ExecutorService
而ExecutorService又是继承了Executor接口
public interface ExecutorService extends Executor
Executor
|-- ExecutorService
|-- AbstractExecutorService
|-- ThreadPoolExecutor
我们看一下Executor接口的实现:
public interface Executor {
void execute(Runnable command);
}
到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
然后ThreadPoolExecutor继承了类AbstractExecutorService。
在ThreadPoolExecutor类中有几个非常重要的方法:
public void execute(Runnable command)
public <T> Future<T> submit(Callable<T> task)
public void shutdown()
public List<Runnable> shutdownNow() //返回未执行的任务
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中 并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的 实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容已在其他文章写过)。
shutdown()和shutdownNow()是用来关闭线程池的。
既然要讲运行过程,那么首先要了解下线程池的状态分为哪些?
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
ThreadPoolExecutor
代码中定义了上面几个变量:定义了一个volatile变量runState,以及其他几个表示状态的常量。
runState
:初始状态,表示当前线程池的运行状态,它的值就是上面的那4个常量值之一
RUNNING
:线程池接受新任务并执行队列任务中...
SHUTDOWN
:不再接受新任务,但是会继续执行等待队列Queued中的任务。当调用了shutdown()方法,会从 RUNNING -> SHUTDOWN
STOP
:不再接受新任务,同时也不执行等待队列Queued中的任务,并且会尝试终止正在执行中的任务。当调用了shutdownNow()方法, 会从(RUNNING or SHUTDOWN) -> STOP
TERMINATED
:线程池中所有线程已经停止运行,其他行为同 STOP状态。
在讲解运行过程前,我们先看下ThreadPoolExecutor
中的几个比较重要的成员变量:
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来保存等待中的任务,等待worker线程空闲时执行任务
private final ReentrantLock mainLock = new ReentrantLock(); //更新 poolSize, corePoolSize,maximumPoolSize, runState, and workers set 时需要持有这个锁
private final HashSet<Worker> workers = new HashSet<Worker>(); //用来保存工作中的执行线程
private volatile long keepAliveTime; //超过corePoolSize外的线程空闲存活之间
private volatile boolean allowCoreThreadTimeOut; //是否对corePoolSize内的线程设置空闲存活时间
private volatile int corePoolSize; //核心线程数
private volatile int maximumPoolSize; //最大线程数(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int poolSize; //线程池中的当前线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory; //线程工厂,用来新建线程
private int largestPoolSize; //记录线程池中出现过的最大线程数大小
private long completedTaskCount; //已经执行完的线程数
这边重点解释下 corePoolSize
、maximumPoolSize
、workQueue
两个变量,这两个变量涉及到线程池中创建线程个数的一个策略。
corePoolSize
: 这个变量我们可以理解为线程池的核心大小,举个例子来说明(corePoolSize假设等于10,maximumPoolSize等于20):
有一个部门,其中有10(corePoolSize)名工人,当有新任务来了后,领导就分配任务给工人去做,每个工人只能做一个任务。
当10个工人都在忙时,新来的任务就要放到队列(workQueue)中等待。
当任务越积累越多,远远超过工人做任务的速度时,领导就想了一个办法:从其他部门借10个工人来,借的数量有一个公式(maximumPoolSize - corePoolSize)来计算。然后把新来的任务分配给借来的工人来做。
但是如果速度还是还不急的话,可能就要采取措施来放弃一些任务了(RejectedExecutionHandler)。
等到一定时间后,任务都完成了,工人比较闲的情况下,就考虑把借来的10个工人还回去(根据keepAliveTime判断)
也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
先看下前一篇文章中的一个例子:
ExecutorService executor = Executors.newFixedThreadPool(3);
IntStream.range(0, 6).forEach(
i -> executor.execute(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("finished: " + threadName);
}
)
);
上面代码就是新建6个任务,然后扔到线程池中运行,输出线程名称,直到运行完毕。其中最核心的方法就是execute()
方法,虽然submit()
也可以执行任务,但它底层也是调用execute()
方法,所以懂了execute()
的实现原理即可:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //1.
if (runState == RUNNING && workQueue.offer(command)) { //2.
if (runState != RUNNING || poolSize == 0) //3.
ensureQueuedTaskHandled(command); //4.
}
else if (!addIfUnderMaximumPoolSize(command)) //5.
reject(command); // is shutdown or saturated //6
}
}
上面的代码看起来逻辑有点复杂,我们一个一个看,首先看上面1位置处:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
是一个或表达式,它分成两部分
addIfUnderCorePoolSize(command)
,这个方法是当线程数小于核心线程数时,用来新建线程执行任务(因为线程数小于corePoolSize时,直接新建线程来运行任务,不管当前线程池里有没有空闲的线程)。如果新建失败,那么进入if语句块,成功了那么execute方法就执行结束了,因为线程已经新建成功了,任务已经开始在线程池中运行。进入if语句块后,看上面代码2.if (runState == RUNNING && workQueue.offer(command))
if (!addIfUnderMaximumPoolSize(command))
,判断新任务用新线程执行是否成功(注:这里的新线程就是我们上面讲的 “借来的工人” maximumPoolSize)继续进到代码块3 的if语句块if (runState != RUNNING || poolSize == 0)
, 因为新任务加入到等待队列中了,这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是的话,应急处理加入的新任务 ensureQueuedTaskHandled(command)
。
我们看下两个关键方法的实现: ##### 1.addIfUnderCorePoolSize
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
首先获取锁,因为涉及到线程池状态的变化。然后再次判断 if (poolSize < corePoolSize && runState == RUNNING)
,在execute()方法中我们已经判断过一次,这边再次判断是为了防止其他线程又新增了新线程或者调用了shutdown、shutdownNow方法,这边起到了双重检查的一个效果。如果为true
的话,进行t = addThread(firstTask)
新增线程执行任务。addThread方法里面比较简单,就是通过线程工厂创建线程thread,然后封装到Worker对象中,加入到 workers队列中,并执行线程,可以把Worker对象看成是拥有一个线程的对象。
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
boolean workerStarted = false;
if (t != null) {
w.thread = t;
workers.add(w); //用来保存工作中执行的线程
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
try {
t.start(); //执行的是Worker对象中的run方法 Worker类实现了Runable接口
workerStarted = true;
}
}
return t;
}
这里在介绍下Worker对象, 它实现了Runnable接口,你把它当成Runnable的一个代理类即可,最终也是执行它的run方法。只要注意一下Worker中的beforeExecute
和afterExecute
方法,这两个方法在ThreadPoolExecutor中没有具体实现,用户可以重写这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等,而afterExecute方法还有一个Throwable t
参数,用户可以用来记录一些异常信息,因为新线程中的异常时捕获不到的,需要在afterExecute中记录。
看起来这个是不是和spring 切面有点像,可以看到 知识都是相通的。
看一下它的run方法:
public void run() {
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) { //1
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
注意代码块1,可以看到这边在循环获取任务,并执行,直到任务全部执行完毕。除了第一个任务,其他任务都是通过getTask()
方法去取,这个方法是ThreadPoolExecutor中的一个方法。我们猜一下,整个类中只有任务缓存队列中保存了任务,应该就是去缓存队列中取了。
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll(); //取任务
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
//则通过poll取任务,若等待一定的时间取不到任务,则返回null
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); //制定时间过后再去进行poll取值
else
r = workQueue.take(); //阻塞取值
if (r != null)
return r;
if (workerCanExit()) { //如果没取到任务,即r为null,则判断当前的worker是否可以退出
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers(); //中断处于空闲状态的worker
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给 空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程Worker去任务缓存队列里面取任务来执行,因为每一个Worker里面都包含了一个线程thread。
下面看一下另一个方法 runTask()
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* If pool is stopping ensure thread is interrupted;
* if not, ensure thread is not interrupted. This requires
* a double-check of state in case the interrupt was
* cleared concurrently with a shutdownNow -- if so,
* the interrupt is re-enabled.
*/
//当线程池的执行状态为关闭等,则执行当前线程的interrupt()操作
if ((runState >= STOP ||(Thread.interrupted() && runState >= STOP)) && hasRun)
thread.interrupt();
/*
* Track execution state to ensure that afterExecute
* is called only if task completed or threw
* exception. Otherwise, the caught runtime exception
* will have been thrown by afterExecute itself, in
* which case we don't want to call it again.
*/
boolean ran = false;
beforeExecute(thread, task);
try {
//任务执行
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
还有一个方法是 每执行完一个线程任务 会调用 workerDone
//记录执行任务数量,将工作线程移除,当poolSize为0是则尝试关闭线程池
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
if (--poolSize == 0)
tryTerminate();
} finally {
mainLock.unlock();
}
这个方法的实现思想和 addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程 池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
到这里,大部分朋友应该对任务提交给线程池之后到被执行的整个过程有了一个基本的了解,下面总结一下:
我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=2da2gtpfllwkk