前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ThreadPoolExecutor 源码分析

ThreadPoolExecutor 源码分析

作者头像
itliusir
发布2020-01-15 10:14:15
3150
发布2020-01-15 10:14:15
举报
文章被收录于专栏:刘君君刘君君刘君君

摘要:

  1. ThreadPoolExecutor 线程池是如何实现的

TOP 带着问题看源码

  1. ThreadPoolExecutor 线程池是如何实现的

1. 基本介绍

前面文章的 Thread 我们也分析了,因为 Java 中的Thread 和 内核线程是 1 : 1 的,所以线程是一个重量级的对象,应该避免频繁创建和销毁,我们可以使用线程池来避免。

ThreadPoolExecutor 是 Java 实现的线程池,它并没有采取常见的池化资源的设计方法,而是采用的 生产者-消费者 模式。

上图的左边是线程池的核心体系,右边是 JDK 提供创建线程池的工具类。

Executor 接口

提供最基础的执行方法 execute(Runnable command)

ExecutorService 接口

基于 Executor 接口,新增了线程池的一些操作能力

AbstractExecutorService 抽象类

使用模板模式,丰富了一部分操作的细节流程

ForkJoinPool 实现类

jdk1.7 中新增的线程池类,适用于分治的场景

2. 成员变量 & 核心类分析

// 控制变量 前 3 位标示运行状态,后 29 位标识工作线程的数量
// 初始化为 RUNNING 状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 移位的段
private static final int COUNT_BITS = Integer.SIZE - 3;
// 后29位,标识容量
// ‭0001 1111 1111 1111 1111 1111 1111 1111‬
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 下面是线程池状态
// 表示可以接受新任务,且可执行队列的任务
// 111 0 0000 ... 0000
private static final int RUNNING    = -1 << COUNT_BITS;
// 不接收新任务,但可以执行队列的任务
// 000 0 0000 ... 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 中断正在执行的,不再接收和执行队列的任务
// 001 0 0000 ... 0000
private static final int STOP       =  1 << COUNT_BITS;
// 半中止状态,所有任务都已中止且无工作线程,修改为这个状态,然后执行 terminated() 方法
// 010 0 0000 ... 0000
private static final int TIDYING    =  2 << COUNT_BITS;
// 中止状态,已经执行过 terminated() 方法
// 011 0 0000 ... 0000
private static final int TERMINATED =  3 << COUNT_BITS;
// 获取状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 获取工作线程的数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// ctl 的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

3. 核心方法分析

3.1 普通任务提交

3.1.1 execute(Runnable command)

主要过程就是:

  1. 如果当前工作线程没有达到核心线程数量阈值,就直接添加一个核心工作线程
  2. 如果达到了核心线程数量阈值,就入任务队列,如果状态不正常,执行拒绝策略
  3. 如果队列满了,就创建非核心线程
  4. 如果创建非核心线程失败(达到了最大数量阈值、线程池状态不正常),执行拒绝策略
public void execute(Runnable command) {
    // 校验是否为空
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果工作线程数小于核心数
    if (workerCountOf(c) < corePoolSize) {
        // 添加一个核心工作线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 如果线程池状态正常,并且达到了核心数量,就入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查状态,如果不是运行状态就移除任务并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 再次检查,如果工作线程数量是0,就创建一个
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果入队列失败,就尝试创建非核心工作线程
    else if (!addWorker(command, false))
        // 创建非核心线程失败,执行拒绝策略
        reject(command);
}
3.1.2 addWorker(Runnable firstTask, boolean core)

addWorker 方法主要作用就是创建一个工作线程,并加入到工作线程的集合中,然后启动。在此期间会进行状态和数量的校验。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 校验状态
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 校验工作线程数量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 数量+1 跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建工作线程,把 firstTask 封装到 Worker 对象,然后把 Worker 对象传给 thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
				// 再次检查状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 加入到工作线程集合
                    workers.add(w);
                    // 目前集合的数量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 标记添加成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 添加成功就启动线程
                // 通过上面 new Worker 的分析,我们知道这里会调用 Worker对象的 run方法
                // run 方法里接着调用 runWorker(this)
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 没有启动成功,执行降级方法(从集合中清除掉、数量减少、)
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
3.1.3 runWorker(Worker w)

如果有第一个任务就先执行,之后从任务队列取任务执行。

final void runWorker(Worker w) {
    // 获取当前工作线程
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // ???
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task如果为空就取任务,如果任务也取不到就结束循环
        // getTask() 方法主要就是从任务队列中取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 检查状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 任务执行前扩展方法,例如:可以在执行前等待实现暂停的效果
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后扩展方法,例如:可以在执行后释放一些资源
                    afterExecute(task, thrown);
                }
            } finally {
                // 置 null,重新从队列取
                task = null;
                // 增加完成数
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

3.2 异步任务提交

3.2.1 submit(Callable task)

submit 方法定义在模板类 AbstractExecutorService 中,然后把 task 封装为 FutureTask , 最后调用 execute 方法来提交任务

AbstractExecutorService#submit

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

我们上面分析 execute 方法知道其最终执行的地方还是调用的 task 的 run 方法,所以我们来分析 FutureTask 的 run 方法。

3.2.2 run()

主要是多了一个执行结果的记录

public void run() {
    // 线程状态不为 NEW 或者 修改当前线程来运行当前任务失败,直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // 再次校验线程状态
        if (c != null && state == NEW) {
            // 注意盯着这个运行结果变量
            V result;
            boolean ran;
            try {
                // 任务执行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 执行异常就修改线程状态为 EXCEPTIONAL
                setException(ex);
            }
            if (ran)
                // 执行正常就修改线程的状态为 NORMAL
                set(result);
        }
    } finally {
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
3.2.3 get()

主要思路就是自旋等待线程执行完

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果线程状态没完成,就进入等待队列
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 自旋
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 已完成就返回
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 快完成(异常),就等一会
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 第一次进来一般会走到这里,把当前线程构建一个等待节点
        else if (q == null)
            q = new WaitNode();
        // 第二次循环尝试把节点入队
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 如果有超时时间
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        // 如果发现入队失败(已经入队过了),就挂起当前线程
        else
            LockSupport.park(this);
    }
}

4. 总结

可以看到,线程池实际上是一个生产-消费模型的实现,其支持普通任务提交和异步任务提交(ps.. 其实叫异步并不是很合适,对于用户来说线程池本来就是异步的)。

知道了核心数量以及等待队列还有最大数量这些功能的实现,相信对如何更好的使用线程池会更有帮助。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-01-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TOP 带着问题看源码
  • 1. 基本介绍
  • 2. 成员变量 & 核心类分析
  • 3. 核心方法分析
    • 3.1 普通任务提交
      • 3.1.1 execute(Runnable command)
      • 3.1.2 addWorker(Runnable firstTask, boolean core)
      • 3.1.3 runWorker(Worker w)
    • 3.2 异步任务提交
      • 3.2.1 submit(Callable task)
      • 3.2.2 run()
      • 3.2.3 get()
  • 4. 总结
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档