“ ThreadPoolExecutor类是线程池的基础,线程池的目的是为了减少了每个任务调用的开销,在拥有大量异步任务时可以增强的性能,并且还可以提供绑定和管理资源的方法”
我在前面的一篇文章中说到了Executors这个类提供四种创建线程池的方法,但是其实质还是通过ThreadPoolExecutor来创建线程池。在阿里的开发手册中提到过,推荐使用ThreadPoolExecutor创建线程池,原因是了解线程池的创建,才能避免资源浪费。
01
—
构造器
ThreadPoolExecutor继承AbstractExecutorService抽象类,AbstractExecutorService提供了ExecutorService执行方法的默认实现。
public class ThreadPoolExecutor extends AbstractExecutorService
ThreadPoolExecutor有四个构造器(实际上都是调用第四个构造方法)
第一个:这个构造方法提供默认的工厂方法和默认拒绝策略(AbortPolicy),也就是说我们只需要传核心线程数,最大线程数,非核心线程的超时时间,超时时间单位,任务队列
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
第二个:相对于第一种,这个构造方法多传了一个线程工厂方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
第三个:相对于第一种,这个构造方法多传递了拒绝策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
第四个:这是线程池的核心构造器,上面三个构造器核心仍是调用它。我们可以看一下代码:
一.如果核心线程数小于0,或者最大线程数小于0,或者最大线程数小于核心线程数,或者非核心线程存活时间小于0则抛出参数不合法的异常。
二.如果工作队列为null,或者线程方法为null,或者拒绝策略为null抛出空指针异常
三.this.acc不知道大家是否清楚(可以了解一下Java安全模型):System.getSecurityManager()返回一个安全管理器对象security,如果security不为null,ccessController.getContext()就比较快照上下文信息与本上下文信息,然后来做出对受控资源访问控制的决策。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
上面就是线程池的四个构造器。通过这个四个构造器我们就可以创建线程池了。
扩展:
四种拒绝策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
队列有如下选择:
ArrayBlockingQueue; LinkedBlockingQueue; SynchronousQueue; ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous
02
—
执行方法
如果我们通过第一步创建了一个线程池,那么下一步我们就是要使用线程池的线程去执行任务了,ThreadPoolExecutor提供了方法:execute(Runnable command),我们将任务传进来就可以了,我们看一下源码:源码中注释也有说明。
如果任务为null则抛出空指针异常
如果允许的线程小于核心线程,我们开启一个新的线程去执行第一个任务。使用addWorker方法进行检测允许状态和工作线程的数量,如果没有返回false我们就增加线程去处理任务。
如果一个任务成功的进入队列,在添加一个线城时仍需要进行双重检查(因为前一次检测后线程已经消亡)或者线程池shutdown了当我们进入这个方法的时候。所以我们需要再次检测状态,如果有需要,回滚队列的操作在停止的时候,或者当线程池没有线程时需要创建一个新线程。
如果无法入队列,那么需要增加一个新线程,如果此操作失败,那么就意味着线程池已经shutdown或者已经饱和了,执行拒绝策略
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
关于ctl.get()
//利用低29位表示线程池中线程数,通过高3位表示线程池的运行状态: private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
到这里我们会发现execute方法实质是将任务放入到addWorker中,那么addWorker是怎么样的逻辑呢?
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//死循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果当前线程池的状态时SHUTDOWN,STOP,TIDYING,TERMINATED并且为SHUTDOWN状态时任务队列为空,那么就返回false 原因:如果调用了shutdown方法,此时的线程池还会继续工作并且会在任务队列中的所有任务执行完成后才会结束线程池。
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//死循环
for (;;) {
int wc = workerCountOf(c);
//core是在execute方法中传的参数,true表示 核心线程,false表示最大线程
//CAPACITY 可以理解为Integer的最大值 1左移29位再-1
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//如果增加任务数量成功那么退出这个循环执行下面的代码,否则继续
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 初始化worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//同步块 使用内置锁 锁住
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//再判断一次当前线程池的状态 避免在执行过程中线程时被使用者关闭
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//向正在执行的任务队列(workers)中添加work
// 区别一下:workqueue是等待执行的阻塞队列
workers.add(w);
int s = workers.size();
//记录曾经并发执行的最大任务个数
if (s > largestPoolSize)
largestPoolSize = s;
//添加任务成功
workerAdded = true;
}
} finally {
//finally块释放内置锁
mainLock.unlock();
}
//如果任务添加成功那么开始执行任务
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWoker方法中我们会发现Worker这个类将传入的任务启动。而Worker是ThreadPoolExector内部类,它继承了AQS抽象类,其重写了AQS的一些方法,并且其也可作为一个Runnable对象,从而可以创建线程Thread。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
在它的构造器中:我们发现通过线程工厂方法获取到线程
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
除此之外由于worker重写了run方法所以我们在addWorker方法中使用t.start实际上是执行了runWorker方法
public void run() {
runWorker(this);
}
runWorker方法中首先取到我们提交的任务然后while循环获取task(每次任务执行完之后我们会将task设为null,然后调用getTask方法)。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//得到worker对象中我们提交的任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果当前任务为空 那么就从getTask中获得任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//任务执行前调用的方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//任务结束后调用的方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask():我们会发现这个getTask方法是一个死循环,这里面有一串代码:
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
timed取决与allowCoreThreadTimeOut也就是是否允许销毁核心线程池以及线程数是否大于核心线程数。
如果为true那么取任务,如果false 那就调用take方法,一直阻塞队列等待任务添加
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
到这里关于线程池如何构建出来以及执行线程的过程。
总结:
ThreadPoolExecutor是创建线程池的基础,其核心构造器需要我们传入参数有核心线程数,最大线程数,非核心线程数的存活时间,存活时间的单位,任务队列,线程工厂方法,拒绝策略。
当我们传入正确的参数后就创建了一个线程池,此时我们可以通过execute方法执行一个线程任务。在execute方法中最主要的是addWorker这个方法,此方法传入任务类和是否核心线程处理,addWorker方法线程的start方法是执行Worker这个重写run方法的内部类调用的runWorker方法(也就是runWorker是执行run方法的地方)。getTask是保证线程池中核心线程不被销毁的方法