进入 ThreadPoolExecutor 中 , 查看线程池任务执行方法 public void execute(Runnable command) 的源码 ;
用户向线程池中提交任务时 , 主要执行了三个步骤 ,
第一步 : 核心线程数不足的情况
如果当前运行的线程 , 小于核心线程数 , 那么 创建一个新的核心线程 ,
将 传入的任务 作为该线程的 第一个任务 .
调用 addWorker 方法 , 会原子性 检查运行状态和任务数量 ;
如果在 不应该添加线程的情况下 执行添加线程操作 , 就会发出错误警报 ;
如果该方法返回 false , 说明 当前不能添加线程 , 此时就不要执行添加线程的操作了 ;
源码解析 : 如果当前的线程数小于核心线程数 , 则调用 addWorker(command, true) , 添加核心线程 , 第二个参数 true 表示该添加的线程是核心线程 ;
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
第二步 : 任务放入 线程池任务队列 成功情况
如果任务被成功放入 线程池任务 队列 , 不管我们此时是否应该添加线程 , 都需要进行 双重验证 ;
双重验证 : 添加到任务队列 时验证一次 , 添加到线程执行 时验证一次 ;
可能存在这种情况 , 在上次验证线程运行状态之后 , 有可能 该线程就立刻被销毁了 ;
也可能存在进入该方法后 , 线程池被销毁的情况 ;
因此我们 反复验证线程状态 , 如果需要在线程停止时回滚队列 , 如果没有线程就创建新线程 ;
先调用 isRunning( c ) 判断线程是否在运行中 , 只有运行状态时 , 线程池才能接收任务 ,
先 将任务添加到队列 中 , 调用 workQueue.offer(command) 方法实现 ,
然后又调用 isRunning(recheck) 判定该线程是否是运行状态 ,
如果不在 , 那么调用 remove(command) 移除该命令 , 然后调用 reject(command) 拒绝执行该命令 ,
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
两个核心的操作 :
第三步 : 任务放入 线程池任务队列 失败情况
如果不能将任务放入队列中 , 尝试创建一个新线程 ;
如果创建线程失败 , 说明当前线程池关闭 , 或者线程池中线程饱和 , 此时 拒绝执行该任务 ;
else if (!addWorker(command, false))
reject(command);
public class ThreadPoolExecutor extends AbstractExecutorService {
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* 进行以下三个步骤处理:
*
* 1. 如果当前运行的线程 , 小于核心线程数 , 那么创建一个新的核心线程 ,
* 将传入的任务作为该线程的第一个任务 .
* 调用 addWorker 方法 , 会原子性检查运行状态和任务数量 ;
* 如果在不应该添加线程的情况下执行添加线程操作 , 就会发出错误警报 ;
* 如果该方法返回 false , 说明当前不能添加线程 , 此时就不要执行添加线程的操作了 ;
*
* 2. 如果任务被成功放入 线程池任务 队列 , 不管我们此时是否应该添加线程 , 都需要进行双重验证 ;
* 双重验证 : 添加到任务队列时验证一次 , 添加到线程执行时验证一次 ;
* 可能存在这种情况 , 在上次验证线程运行状态之后 , 有可能该线程就立刻被销毁了 ;
* 也可能存在进入该方法后 , 线程池被销毁的情况 ;
* 因此我们反复验证线程状态 , 如果需要在线程停止时回滚队列 , 如果没有线程就创建新线程 ;
*
* 3. 如果不能将任务放入队列中 , 尝试创建一个新线程 ;
* 如果创建线程失败 , 说明当前线程池关闭 , 或者线程池中线程饱和 , 此时拒绝执行该任务 ;
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
/*
先调用 isRunning(c) 判断线程是否在运行中 , 只有运行状态时 , 线程池才能接收任务 ,
先将任务添加到队列中 , 调用 workQueue.offer(command) 方法实现 ,
然后又调用 isRunning(recheck) 判定该线程是否是运行状态 ,
如果不在 , 那么调用 remove(command) 移除该命令 , 然后调用 reject(command) 拒绝执行该命令 ,
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}