ThreadPoolExecutor是JDK提供的线程池的基类,其中定义了线程池的核心框架,并且允许客户端通过继承的方式实现自定义的线程池。JDK提供默认的几种线程池都继承了ThreadPoolExecutor类,因此有必要对ThreadPoolExecutor进行详细的分析。
ThreadPoolExecutor主要有以下几个参数:
上述的线程池参数,在一定程度上也说明了线程池执行任务的逻辑。下面进行总结:
ThreadPoolExecutor有一个内部类Worker,它是对Runnable进行了封装,主要功能是对待执行的任务进行中断处理和状态监控。此外,Worker还继承了AQS,在每个任务执行时进行了加锁的处理。可以将Worker简单理解为可中断的、可进行锁处理的Runnable,源码如下:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//当前Worker所处于的线程
final Thread thread;
//待执行的任务
Runnable firstTask;
//任务计数器
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
向线程池中提交任务的核心方法为execute(),下面进行详细分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
源码中的英文注释我没有删除,可以清楚地看出线程池执行任务的逻辑与前文所述一致。
首先,通过ctl这个原子变量。获取当前线程池内的线程数:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
ctl是ThreadPoolExecutor内部一个用来进行技术和状态控制的控制变量,它使用了一个原子整形字段来实现两个方面的管理:
由于使用了一个整形(32字节)来表示两个状态,因此JDK进行了特殊的处理,只使用30个字节来表示线程数量,剩余2个字节表示状态,这就表示ThreadPoolExecutor最多可以创建(2^29)-1个线程,这个数量理论上足够用了。
回到execute方法,通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行,addWorker方法如下:
private boolean addWorker(Runnable firstTask, boolean core) {
//自旋,判断线程池状态,并对线程数量执行原子+1操作
retry:
for (;;) {
int c = ctl.get();
//获取线程池状态
int rs = runStateOf(c);
//如果线程池已经关闭,则直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//判断线程数是否已达上限,根据传入参数core的不同,判断corePoolSize或者maximumPoolSize。
//如果线程数已达上限,直接返回false
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//执行原子操作,对线程数+1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
//ctl变量操作成功,执行Worker相关逻辑
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建一个新的Worker,传入待执行的任务
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//加锁后,再次判断线程池状态
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果创建了新的Worker,则调用其start方法立即执行
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker()方法的在首先在一个自旋中进行CAS操作,判断线程池状态和线程数,并尝试将线程数+1。如果操作失败直接返回false。CAS操作成功后,创建一个新的Worker,并立即调用其start()方法执行该任务。
Worker执行任务的方法为runWorker():
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//首先释放锁,允许中断
w.unlock();
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
//执行任务前先加锁
w.lock();
//如果线程池已经终止,则中断该线程。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
//执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
//当前Worker计数器+1
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
runWorker()的主要逻辑就是进行线程池的关闭检查,然后执行任务,并将计数器+1。
值得注意的是,Worker的firstTask可能为空,此时Worker并不会执行自己的任务,而是调用getTask()方法从任务队列中拉取任务执行。也就是说,Worker在执行完提交给自己的任务后,会执行任务队列中的任务。
至此,execute()方法的第一个分支执行完毕,即线程数量少于corePoolSize的情况。后面的分支逻辑也并不复杂:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
//通过位运算,获取当前线程数,如果线程数小于corePoolSize,则执行addWorker(),创建新线程执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//针对线程数超过corePoolSize的情况,将任务放入workQueue中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果workQueue以满,则以maximumPoolSize为上限尝试创建新的线程
else if (!addWorker(command, false))
//如果线程数已达maximumPoolSize,则执行拒绝策略
reject(command);
}