专栏首页刘君君ThreadPoolExecutor 源码分析

ThreadPoolExecutor 源码分析

摘要:

  1. ThreadPoolExecutor 线程池是如何实现的

TOP 带着问题看源码

  1. ThreadPoolExecutor 线程池是如何实现的

1. 基本介绍

前面文章的 Thread 我们也分析了,因为 Java 中的Thread 和 内核线程是 1 : 1 的,所以线程是一个重量级的对象,应该避免频繁创建和销毁,我们可以使用线程池来避免。

ThreadPoolExecutor 是 Java 实现的线程池,它并没有采取常见的池化资源的设计方法,而是采用的 生产者-消费者 模式。

上图的左边是线程池的核心体系,右边是 JDK 提供创建线程池的工具类。

Executor 接口

提供最基础的执行方法 execute(Runnable command)

ExecutorService 接口

基于 Executor 接口,新增了线程池的一些操作能力

AbstractExecutorService 抽象类

使用模板模式,丰富了一部分操作的细节流程

ForkJoinPool 实现类

jdk1.7 中新增的线程池类,适用于分治的场景

2. 成员变量 & 核心类分析

// 控制变量 前 3 位标示运行状态,后 29 位标识工作线程的数量
// 初始化为 RUNNING 状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 移位的段
private static final int COUNT_BITS = Integer.SIZE - 3;
// 后29位,标识容量
// ‭0001 1111 1111 1111 1111 1111 1111 1111‬
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
// 下面是线程池状态
// 表示可以接受新任务,且可执行队列的任务
// 111 0 0000 ... 0000
private static final int RUNNING    = -1 << COUNT_BITS;
// 不接收新任务,但可以执行队列的任务
// 000 0 0000 ... 0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// 中断正在执行的,不再接收和执行队列的任务
// 001 0 0000 ... 0000
private static final int STOP       =  1 << COUNT_BITS;
// 半中止状态,所有任务都已中止且无工作线程,修改为这个状态,然后执行 terminated() 方法
// 010 0 0000 ... 0000
private static final int TIDYING    =  2 << COUNT_BITS;
// 中止状态,已经执行过 terminated() 方法
// 011 0 0000 ... 0000
private static final int TERMINATED =  3 << COUNT_BITS;
// 获取状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 获取工作线程的数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
// ctl 的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

3. 核心方法分析

3.1 普通任务提交

3.1.1 execute(Runnable command)

主要过程就是:

  1. 如果当前工作线程没有达到核心线程数量阈值,就直接添加一个核心工作线程
  2. 如果达到了核心线程数量阈值,就入任务队列,如果状态不正常,执行拒绝策略
  3. 如果队列满了,就创建非核心线程
  4. 如果创建非核心线程失败(达到了最大数量阈值、线程池状态不正常),执行拒绝策略
public void execute(Runnable command) {
    // 校验是否为空
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 如果工作线程数小于核心数
    if (workerCountOf(c) < corePoolSize) {
        // 添加一个核心工作线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 如果线程池状态正常,并且达到了核心数量,就入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查状态,如果不是运行状态就移除任务并执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 再次检查,如果工作线程数量是0,就创建一个
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果入队列失败,就尝试创建非核心工作线程
    else if (!addWorker(command, false))
        // 创建非核心线程失败,执行拒绝策略
        reject(command);
}

3.1.2 addWorker(Runnable firstTask, boolean core)

addWorker 方法主要作用就是创建一个工作线程,并加入到工作线程的集合中,然后启动。在此期间会进行状态和数量的校验。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 校验状态
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 校验工作线程数量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 数量+1 跳出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 创建工作线程,把 firstTask 封装到 Worker 对象,然后把 Worker 对象传给 thread
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());
				// 再次检查状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 加入到工作线程集合
                    workers.add(w);
                    // 目前集合的数量
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 标记添加成功
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 添加成功就启动线程
                // 通过上面 new Worker 的分析,我们知道这里会调用 Worker对象的 run方法
                // run 方法里接着调用 runWorker(this)
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        // 没有启动成功,执行降级方法(从集合中清除掉、数量减少、)
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

3.1.3 runWorker(Worker w)

如果有第一个任务就先执行,之后从任务队列取任务执行。

final void runWorker(Worker w) {
    // 获取当前工作线程
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    // ???
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task如果为空就取任务,如果任务也取不到就结束循环
        // getTask() 方法主要就是从任务队列中取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // 检查状态
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 任务执行前扩展方法,例如:可以在执行前等待实现暂停的效果
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 任务执行后扩展方法,例如:可以在执行后释放一些资源
                    afterExecute(task, thrown);
                }
            } finally {
                // 置 null,重新从队列取
                task = null;
                // 增加完成数
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

3.2 异步任务提交

3.2.1 submit(Callable task)

submit 方法定义在模板类 AbstractExecutorService 中,然后把 task 封装为 FutureTask , 最后调用 execute 方法来提交任务

AbstractExecutorService#submit

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

我们上面分析 execute 方法知道其最终执行的地方还是调用的 task 的 run 方法,所以我们来分析 FutureTask 的 run 方法。

3.2.2 run()

主要是多了一个执行结果的记录

public void run() {
    // 线程状态不为 NEW 或者 修改当前线程来运行当前任务失败,直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // 再次校验线程状态
        if (c != null && state == NEW) {
            // 注意盯着这个运行结果变量
            V result;
            boolean ran;
            try {
                // 任务执行
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 执行异常就修改线程状态为 EXCEPTIONAL
                setException(ex);
            }
            if (ran)
                // 执行正常就修改线程的状态为 NORMAL
                set(result);
        }
    } finally {
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

3.2.3 get()

主要思路就是自旋等待线程执行完

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果线程状态没完成,就进入等待队列
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 自旋
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // 已完成就返回
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 快完成(异常),就等一会
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 第一次进来一般会走到这里,把当前线程构建一个等待节点
        else if (q == null)
            q = new WaitNode();
        // 第二次循环尝试把节点入队
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 如果有超时时间
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        // 如果发现入队失败(已经入队过了),就挂起当前线程
        else
            LockSupport.park(this);
    }
}

4. 总结

可以看到,线程池实际上是一个生产-消费模型的实现,其支持普通任务提交和异步任务提交(ps.. 其实叫异步并不是很合适,对于用户来说线程池本来就是异步的)。

知道了核心数量以及等待队列还有最大数量这些功能的实现,相信对如何更好的使用线程池会更有帮助。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 线程的实现与分析

    线程是操作系统调度的最小单位,实现线程有三种方式,而 Java Thread 采用的是 内核线程实现

    itliusir
  • ThreadLocal 源码分析

    我们知道解决共享变量不安全的一种方式,就是利用每个线程的私有变量来操作,避免共享变量导致的线程不安全问题。

    itliusir
  • DelayQueue 源码分析

    我们先来看一下它的实现类图,它实现了 Delayed、BlockingQueue 接口和 AbstractQueue 基础类,从实现的功能上看,它首先是一个阻塞...

    itliusir
  • 源码分析—ThreadPoolExecutor线程池三大问题及改进方案

    在一次聚会中,我和一个腾讯大佬聊起了池化技术,提及到java的线程池实现问题,我说这个我懂啊,然后巴拉巴拉说了一大堆,然后腾讯大佬问我说,那你知道线程池有什么缺...

    luozhiyun
  • 通过ThreadPoolExecutor源码分析线程池实现原理

    线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。使用线程池可以重复利用已创建的线程降低线程创建和销毁带来的消耗,随之即可提高响应速度...

    GreizLiao
  • ReentranReadWriteLock源码浅析

    c指的AbstractQueuedSynchronizer的state。SHARED_UNIT为65536。

    小金狗子
  • 面经手册 · 第21篇《手写线程池,对照学习ThreadPoolExecutor线程池实现原理!》

    正好是2020年,看到这张图还是蛮有意思的。以前小时候总会看到一些科技电影,讲到机器人会怎样怎样,但没想到人似乎被娱乐化的东西,搞成了低头族、大肚子!

    小傅哥
  • Java源码之ThreadPoolExecutor

    “ ThreadPoolExecutor类是线程池的基础,线程池的目的是为了减少了每个任务调用的开销,在拥有大量异步任务时可以增强的性能,并且还可以提供绑定和管...

    每天学Java
  • ThreadPoolExecutor 几个疑惑与解答

    用户3148308
  • 如何才能够系统地学习Java并发技术?

    这里不仅仅是指使用简单的多线程编程,或者使用juc的某个类。当然这些都是并发编程的基本知识,除了使用这些工具以外,Java并发编程中涉及到的技术原理十分丰富。为...

    Java技术江湖

扫码关注云+社区

领取腾讯云代金券