本文来源:http://yeming.me/2016/05/07/threadPool1/
本篇文章我们先从左边这条线 Executor==>ExcutorService==>AbstractExecutorService==>ThreadPoolExecutor来分析一下。
AbstractExecutorService实现了submit方法,代码如下:
public
<T>
Future<T> submit(Callable<T> task)
{
if
(task ==
null)
throw
new
NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected
<T>
RunnableFuture<T> newTaskFor(Callable<T> callable)
{
return
new
FutureTask<T>(callable);
}
上面的FutureTask实现了RunnableFuture接口,RunnableFuture继承了 Runnable和Future接口。Runnable接口只有一个void run方法,Future接口有cancel(boolean),V get(),V get(long timeout, TimeUnit unit),boolean isCancelled(),boolean isDone()方法。
接着上面的AbstractExecutorService.submit方法,会调用到execute(ftask),这个execute方法就是ThreadPoolExecutor中的。我们接下来就以execute方法作为起点来分析。
public
void execute(Runnable command)
{
if
(command ==
null)
throw
new
NullPointerException();
int c = ctl.get();
if
(workerCountOf(c)
< corePoolSize)
{
if
(addWorker(command,
true))
return;
c = ctl.get();
}
if
(isRunning(c)
&& workQueue.offer(command))
{
int recheck = ctl.get();
if
(! isRunning(recheck)
&& remove(command))
reject(command);
else
if
(workerCountOf(recheck)
==
0)
addWorker(null,
false);
}
else
if
(!addWorker(command,
false))
reject(command);
}
private
boolean addWorker(Runnable firstTask,
boolean core)
{
retry:
for
(;;)
{
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if
(rs >= SHUTDOWN &&
!
(rs == SHUTDOWN &&
firstTask ==
null
&&
! workQueue.isEmpty()))
return
false;
for
(;;)
{
int wc = workerCountOf(c);
if
(wc >= CAPACITY ||
wc >=
(core ? corePoolSize : maximumPoolSize))
return
false;
if
(compareAndIncrementWorkerCount(c))
break
retry;
c = ctl.get();
// Re-read ctl
if
(runStateOf(c)
!= rs)
continue
retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted =
false;
boolean workerAdded =
false;
Worker w =
null;
try
{
w =
new
Worker(firstTask);
final
Thread t = w.thread;
if
(t !=
null)
{
final
ReentrantLock mainLock =
this.mainLock;
mainLock.lock();
try
{
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if
(rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask ==
null))
{
if
(t.isAlive())
// precheck that t is startable
throw
new
IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if
(s > largestPoolSize)
largestPoolSize = s;
workerAdded =
true;
}
}
finally
{
mainLock.unlock();
}
if
(workerAdded)
{
t.start();
workerStarted =
true;
}
}
}
finally
{
if
(! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
(1)这个判断逻辑比较复杂,我们先来看下
if
(rs >= SHUTDOWN &&
!
(rs == SHUTDOWN &&
firstTask ==
null
&&
! workQueue.isEmpty()))
return
false;
若当前状态大于SHUTDOWN,显然if判断条件为ture,直接returnfalse。(很好理解,线程池处于关闭状态,肯定不让新添加worker了) 若当前状态小于SHUTDOWN,if判断条件为false,接着往下走(线程池为RUNNING状态,很好理解) 若当前状态等于SHUTDOWN:若firstTask等于null并且工作队列有任务,则if判断条件为false,代码不会return,会继续往下运行;若firstTask不等于null或者工作队列为空,则判断条件为true,会return false(这个也好理解,我们知道SHUTDOWN状态,线程池不再接受新的任务,但是已经在工作队列中的任务还是要完成才行。所以若first等于null,并且工作队列有任务,还要继续往下走。若相反,则不会往下走) (2)判断当前工作线程数
for
(;;)
{
int wc = workerCountOf(c);
if
(wc >= CAPACITY ||
wc >=
(core ? corePoolSize : maximumPoolSize))
return
false;
if
(compareAndIncrementWorkerCount(c))
break
retry;
c = ctl.get();
// Re-read ctl
if
(runStateOf(c)
!= rs)
continue
retry;
// else CAS failed due to workerCount change; retry inner loop
}
当前工作线程数没有超过线程池设置的参数的限制,则利用CAS添加一个worker,并跳出外层的for循环,继续向下运行。否则返回false,添加worker失败。(3) 完成了上述1 2步骤后,会执行new Worker(firstTask),Thread t = w.thread并再次检查线程池的状态,若合法,则向工作线程池HashSet中添加当前worker,并执行t.start。此时才开启了子线程来执行任务。
上面步骤3调用了t.start,会开启一个子线程来运行Worker中的run方法。
public
void run()
{
runWorker(this);
}
final
void runWorker(Worker w)
{
Thread wt =
Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask =
null;
w.unlock();
// allow interrupts
boolean completedAbruptly =
true;
try
{
while
(task !=
null
||
(task = getTask())
!=
null)
{
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if
((runStateAtLeast(ctl.get(), STOP)
||
(Thread.interrupted()
&&
runStateAtLeast(ctl.get(), STOP)))
&&
!wt.isInterrupted())
wt.interrupt();
try
{
beforeExecute(wt, task);
Throwable thrown =
null;
try
{
task.run();
}finally
{
afterExecute(task, thrown);
}
}
finally
{
task =
null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly =
false;
}
finally
{
processWorkerExit(w, completedAbruptly);
}
}
上述worker不断通过getTask()方法,从workQueue中获取任务;若没有获取到任务,则调用processWorkerExit方法。
private
Runnable getTask()
{
boolean timedOut =
false;
// Did the last poll() time out?
for
(;;)
{
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if
(rs >= SHUTDOWN &&
(rs >= STOP || workQueue.isEmpty()))
{
decrementWorkerCount();
return
null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if
((wc > maximumPoolSize ||
(timed && timedOut))
&&
(wc >
1
|| workQueue.isEmpty()))
{
if
(compareAndDecrementWorkerCount(c))
return
null;
continue;
}
try
{
Runnable r = timed ?
workQueue.poll(keepAliveTime,
TimeUnit.NANOSECONDS)
:
workQueue.take();
if
(r !=
null)
return r;
timedOut =
true;
}
catch
(InterruptedException
retry)
{
timedOut =
false;
}
}
}
getTask方法是一个无限的for循环方法,它首先判断当前线程池的状态
if
(rs >= SHUTDOWN &&
(rs >= STOP || workQueue.isEmpty()))
{
decrementWorkerCount();
return
null;
}
这个判断也很好理解,若rs==SHUTDOWN,workQueue为空,显然应该直接返回null,并提前是工作的worker减一。(getTask返回null,runWorker方法会调用processWorkerExit从HashSet中remove当前worker);若rs>大于SHUTDOWN(这个对应线程池的shutDownNow方法,工作队列中等待的任务不再执行);其他情况,说明线程池处于运行状态,继续往下运行。然后根据当前线程池设置的最大线程数,以及是否允许线coreThread超时间以及workQueue的状态来判断是否通过CAS操作来是线程数减一并return null。最后我们要关注下下面这个从工作队列中取得任务的三目运算。
Runnable r = timed ?
workQueue.poll(keepAliveTime,
TimeUnit.NANOSECONDS)
:
workQueue.take();
若timed为ture(设置allowCoreThreadTimeOut为true),则超过了等待的时间还没有从workQueue中取得任务则r = null,此时就有可能造成即使workerCount小于corePoolSize,当前的worker也可能被回收。若timed为false,则调用阻塞方法从workQueue中获取任务,newFixedThreadPool就会一直调用这个阻塞方法,从而达到不显示关闭线程池的情况下,即使workQueue为空,也能维持固定的工作线程的个数。
public
List<Runnable> shutdownNow()
{
List<Runnable> tasks;
final
ReentrantLock mainLock =
this.mainLock;
mainLock.lock();
try
{
checkShutdownAccess();
//shutDwonNow为STOP,shutDown为SHUTDOWN
advanceRunState(STOP);(advanceRunState(SHUTDOWN);)
interruptWorkers();(interruptIdleWorkers)
//shutDownNow专用
tasks = drainQueue();
//shutDown专用 ScheduledThreadPoolExecutor回调
onShutdown();
}
finally
{
mainLock.unlock();
}
tryTerminate();
return tasks;
}
shutDown和shutDownnNow方法区别(代码层面):
(1)shutDownNow
private
void interruptWorkers()
{
final
ReentrantLock mainLock =
this.mainLock;
mainLock.lock();
try
{
for
(Worker w : workers)
w.interruptIfStarted();
}
finally
{
mainLock.unlock();
}
}
显然这个是中断所有的线程 (2)shutDown
private
void interruptIdleWorkers(boolean onlyOne)
{
final
ReentrantLock mainLock =
this.mainLock;
mainLock.lock();
try
{
for
(Worker w : workers)
{
Thread t = w.thread;
if
(!t.isInterrupted()
&& w.tryLock())
{
try
{
t.interrupt();
}
catch
(SecurityException ignore)
{
}
finally
{
w.unlock();
}
}
if
(onlyOne)
break;
}
}
finally
{
mainLock.unlock();
}
}
注意onlyOne参数,这个只有在调用tryTerminate()方法里面,会调用interruptIdleWorkers(true),其他情况都是interruptIdleWorkers(false),所以对于shutDown方法,也是尝试中断所有还没有被中断的线程。3)tryTerminate 上面(2)中提到了tryTerminate方法,接下来就来看下这个方法
final
void tryTerminate()
{
for
(;;)
{
int c = ctl.get();
if
(isRunning(c)
||
runStateAtLeast(c, TIDYING)
||
(runStateOf(c)
== SHUTDOWN &&
! workQueue.isEmpty()))
return;
if
(workerCountOf(c)
!=
0)
{
// Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final
ReentrantLock mainLock =
this.mainLock;
mainLock.lock();
try
{
if
(ctl.compareAndSet(c, ctlOf(TIDYING,
0)))
{
try
{
terminated();
}
finally
{
ctl.set(ctlOf(TERMINATED,
0));
termination.signalAll();
}
return;
}
}
finally
{
mainLock.unlock();
}
// else retry on failed CAS
}
}
从上述代码可以看出,若线程池状态为SHUTDOWN,workQueue为空,工作线程数为0或者线程池状态为STOP,工作线程数为0,都最终会把线程池状态设置为TERMINATED,并且唤醒所有因为调用awaitTermination()方法阻塞在termination.awaitNanos(nanos)还未醒过来的线程。
public
boolean awaitTermination(long timeout,
TimeUnit unit)
throws
InterruptedException
{
long nanos = unit.toNanos(timeout);
final
ReentrantLock mainLock =
this.mainLock;
mainLock.lock();
try
{
for
(;;)
{
if
(runStateAtLeast(ctl.get(), TERMINATED))
return
true;
if
(nanos <=
0)
return
false;
nanos = termination.awaitNanos(nanos);
}
}
finally
{
mainLock.unlock();
}
}
上述tryTerminate方法,在addWorkerFailed(),processWorkerExit(),shutDown(),shutDownNow(),remove(Runnable task)方法中都会调用到。
上面经常提到线程池的运行状态,这里稍作解释一下。
private
static
final
int RUNNING =
-1
<< COUNT_BITS;
private
static
final
int SHUTDOWN =
0
<< COUNT_BITS;
private
static
final
int STOP =
1
<< COUNT_BITS;
private
static
final
int TIDYING =
2
<< COUNT_BITS;
private
static
final
int TERMINATED =
3
<< COUNT_BITS;