摘要:
前面文章的 Thread 我们也分析了,因为 Java 中的Thread 和 内核线程是 1 : 1 的,所以线程是一个重量级的对象,应该避免频繁创建和销毁,我们可以使用线程池来避免。
ThreadPoolExecutor 是 Java 实现的线程池,它并没有采取常见的池化资源的设计方法,而是采用的 生产者-消费者 模式。
上图的左边是线程池的核心体系,右边是 JDK 提供创建线程池的工具类。
Executor 接口
提供最基础的执行方法 execute(Runnable command)
ExecutorService 接口
基于 Executor 接口,新增了线程池的一些操作能力
AbstractExecutorService 抽象类
使用模板模式,丰富了一部分操作的细节流程
ForkJoinPool 实现类
jdk1.7 中新增的线程池类,适用于分治的场景
// 控制变量 前 3 位标示运行状态,后 29 位标识工作线程的数量 // 初始化为 RUNNING 状态 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 移位的段 private static final int COUNT_BITS = Integer.SIZE - 3; // 后29位,标识容量 // 0001 1111 1111 1111 1111 1111 1111 1111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 下面是线程池状态 // 表示可以接受新任务,且可执行队列的任务 // 111 0 0000 ... 0000 private static final int RUNNING = -1 << COUNT_BITS; // 不接收新任务,但可以执行队列的任务 // 000 0 0000 ... 0000 private static final int SHUTDOWN = 0 << COUNT_BITS; // 中断正在执行的,不再接收和执行队列的任务 // 001 0 0000 ... 0000 private static final int STOP = 1 << COUNT_BITS; // 半中止状态,所有任务都已中止且无工作线程,修改为这个状态,然后执行 terminated() 方法 // 010 0 0000 ... 0000 private static final int TIDYING = 2 << COUNT_BITS; // 中止状态,已经执行过 terminated() 方法 // 011 0 0000 ... 0000 private static final int TERMINATED = 3 << COUNT_BITS; // 获取状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 获取工作线程的数量 private static int workerCountOf(int c) { return c & CAPACITY; } // ctl 的值 private static int ctlOf(int rs, int wc) { return rs | wc; }
主要过程就是:
public void execute(Runnable command) { // 校验是否为空 if (command == null) throw new NullPointerException(); int c = ctl.get(); // 如果工作线程数小于核心数 if (workerCountOf(c) < corePoolSize) { // 添加一个核心工作线程 if (addWorker(command, true)) return; c = ctl.get(); } // 如果线程池状态正常,并且达到了核心数量,就入队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次检查状态,如果不是运行状态就移除任务并执行拒绝策略 if (! isRunning(recheck) && remove(command)) reject(command); // 再次检查,如果工作线程数量是0,就创建一个 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果入队列失败,就尝试创建非核心工作线程 else if (!addWorker(command, false)) // 创建非核心线程失败,执行拒绝策略 reject(command); }
addWorker
方法主要作用就是创建一个工作线程,并加入到工作线程的集合中,然后启动。在此期间会进行状态和数量的校验。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 校验状态 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); // 校验工作线程数量 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 数量+1 跳出循环 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 创建工作线程,把 firstTask 封装到 Worker 对象,然后把 Worker 对象传给 thread w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); // 再次检查状态 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 加入到工作线程集合 workers.add(w); // 目前集合的数量 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 标记添加成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 添加成功就启动线程 // 通过上面 new Worker 的分析,我们知道这里会调用 Worker对象的 run方法 // run 方法里接着调用 runWorker(this) t.start(); workerStarted = true; } } } finally { // 没有启动成功,执行降级方法(从集合中清除掉、数量减少、) if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
如果有第一个任务就先执行,之后从任务队列取任务执行。
final void runWorker(Worker w) { // 获取当前工作线程 Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // ??? w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // task如果为空就取任务,如果任务也取不到就结束循环 // getTask() 方法主要就是从任务队列中取任务 while (task != null || (task = getTask()) != null) { w.lock(); // 检查状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 任务执行前扩展方法,例如:可以在执行前等待实现暂停的效果 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 任务执行后扩展方法,例如:可以在执行后释放一些资源 afterExecute(task, thrown); } } finally { // 置 null,重新从队列取 task = null; // 增加完成数 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
submit
方法定义在模板类 AbstractExecutorService
中,然后把 task 封装为 FutureTask
, 最后调用 execute
方法来提交任务
AbstractExecutorService#submit
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); }
我们上面分析 execute
方法知道其最终执行的地方还是调用的 task 的 run 方法,所以我们来分析 FutureTask 的 run 方法。
主要是多了一个执行结果的记录
public void run() { // 线程状态不为 NEW 或者 修改当前线程来运行当前任务失败,直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; // 再次校验线程状态 if (c != null && state == NEW) { // 注意盯着这个运行结果变量 V result; boolean ran; try { // 任务执行 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 执行异常就修改线程状态为 EXCEPTIONAL setException(ex); } if (ran) // 执行正常就修改线程的状态为 NORMAL set(result); } } finally { runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
主要思路就是自旋等待线程执行完
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果线程状态没完成,就进入等待队列 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 自旋 for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; // 已完成就返回 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 快完成(异常),就等一会 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 第一次进来一般会走到这里,把当前线程构建一个等待节点 else if (q == null) q = new WaitNode(); // 第二次循环尝试把节点入队 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果有超时时间 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } // 如果发现入队失败(已经入队过了),就挂起当前线程 else LockSupport.park(this); } }
可以看到,线程池实际上是一个生产-消费模型的实现,其支持普通任务提交和异步任务提交(ps.. 其实叫异步并不是很合适,对于用户来说线程池本来就是异步的)。
知道了核心数量以及等待队列还有最大数量这些功能的实现,相信对如何更好的使用线程池会更有帮助。
本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。
我来说两句