我们之前温习了Thread类,明白了Runable接口才是多线程的任务核心。那么ThreadPoolExecutor就是用维护多线程的。作为工具类,ThreadPoolExecutor应该提供了很多操作线程的方法,按理说也是逐个去调用目标线程的方法。那么我们就详细了解一下ThreadPoolExecutor的实现过程吧。我们发现ThreadPoolExecutor类继承了AbstractExecutorService。而AbstractExecutorService实现了ExecutorService,ExecutorService继承了Executor,Executor主要提供execute方法。应该和真正的线程start方法挂钩。在AbstractExecutorService方法中实现了ExecutorService的接口。ExecutorService主要包含了线程的提交和线程的中断等方法。
作为线程管理的工具,那么ThreadPoolExecutor就是专门维护线程运行的,那么线程的容器也必然在这个类中,也就是我们新建的线程会提交到这个类中,然后通过这个类会将提交的Runnable任务按照他的安排进行执行。我们看到在ThreadPoolExecutor中有一个 private final HashSet<Worker> workers = new HashSet<Worker>();其中的Worker就是用来存储Runnable任务的工作任务。
Worker(Runnable firstTask) {
// inhibit interrupts until runWorker
setState(-1);
this.firstTask = firstTask;
//创建线程,并把任务提交
this.thread = getThreadFactory().newThread(this);
}
按照上边的逻辑,那么worker就是我们提交的任务的代理,也就是调用了start方法。然后调用runwroker的方法。而start方法是在addWorker中调用的
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
线程状态
int rs = runStateOf(c);
检测
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
自旋
for (;;) {
任务数目
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
对工作线程数添加1
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
添加一个新的工作任务线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
加锁,所有创建线程的时候都加锁,防止创建多个相同的线程
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
加锁之后二次检测
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
加入到工作线程队列,然后通过对workers数组的操作就可以操作该线程。相当于记录一下线程信息
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
调用真正的线程
inal void runWorker(Worker w) {
获取当前线程
Thread wt = Thread.currentThread();
当前任务
Runnable task = w.firstTask;
移除
w.firstTask = null;
解除锁
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
这个while会一直执行,把队列中的任务都执行了。由于runWorker是另外一个线程。所以这个线程会抢着处理任务队列。指导任务处理完毕
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())
wt.interrupt();
try {
执行任务之前
beforeExecute(wt, task);
Throwable thrown = null;
try {
执行任务,正真的run方法内容。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
执行任务之后
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
后置处理
processWorkerExit(w, completedAbruptly);
}
}
获取任务队列
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
工作线程数量
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
当线程和合理范围内就获取任务
取出一个任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
后置处理
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
加锁,防止多线程操作创建多个核心线程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
将当前线程停止
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
创建新的工作线程
addWorker(null, false);
}
}
经过上述源码分析,我们大概明白了ThreadPoolExecutor类的工作机理。尤其是工作线程worker使用代理的方式。以及在while方法在工作线程中的使用使得线程能够处理足够的任务。在线程处理的任务结束,没有任务可以处理也就是任务队列空的时候就线程自然销毁,如果核心线程不需要销毁就创建新线程。但是当没有任务的时候。核心线程其实是自旋的,所以感觉还是有消耗的。因此对于没有必要使用多线程的地方还是不要使用多线程了。当然通过分析我们发现ThreadPoolExecutor并没有在初始化的时候就创建核心线程,而是在逐步添加任务的时候创建的。根据我的分析和理解,我可能会尽可能少的设置核心线程。或者建议开启allowCoreThreadTimeOut,减少不必要的系统消耗。