专栏首页足球是圆的通过ThreadPoolExecutor源码分析线程池实现原理

通过ThreadPoolExecutor源码分析线程池实现原理

为什么要用线程池

线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。使用线程池可以重复利用已创建的线程降低线程创建和销毁带来的消耗,随之即可提高响应速度(当一个任务到达时,不需要重新创建线程来为之服务,重用已有线程),还可以通过线程池控制线程资源统一分配和监控等。

线程池工厂Executors

JDK 提供了创建线程池的工厂类 Executors,该类提供了创建线程池的静态方法:

  1. public static ExecutorService newFixedThreadPool(int nThreads)
  2. public static ExecutorService newSingleThreadExecutor()
  3. public static ExecutorService newCachedThreadPool()
  4. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  5. public static ExecutorService newWorkStealingPool()

1-4 方式创建的线程池都是new ThreadPoolExecutor 对象,只是入参不同;第5种基于 ForkJoinPool 实现(以后会写一篇关于此实现的总结)。通过Executors创建线程池有很多局限性和隐患,在阿里巴巴《Java开发手册》中提出:

各个参数的作用

ThreadPoolExecutor 最终通过以下构造方法创建

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

corePoolSize 核心线程数 maximumPoolSize 最大线程数 keepAliveTime 大于核心线程数的线程存活时间 TimeUnit 时间单位,配合keepAliveTime使用 BlockingQueue 等待执行任务队列 ThreadFactory 创建线程的工厂 RejectedExecutionHandler 拒绝策略处理器

接下来通过源码分析一下各个参数的作用

private static final ExecutorService executor =
        new ThreadPoolExecutor(2, 3, 1, TimeUnit.SECONDS,
                new ArrayBlockingQueue(2), new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) {
    for (int i = 0; i < 6; i++) {
        executor.execute(() -> {
            System.out.println("--------" + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

创建核心线程数为2、最大线程数为3、大于核心线程数的线程空闲存活时间为1秒、存放执行任务的队列容器大小为2、线程池拒绝线程后由执行线程处理改任务的线程池。

线程池创建后,通过execute方法提交任务到线程池,看一下execute方法具体的实现:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // ctl是线程安全的原子操作AtomicInteger类型,控制着线程池worker数量、运行状态以及关闭状态等。
    int c = ctl.get();
   // 1. 池内线程数小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 添加一个worker,即新创建一个线程,添加成功则 return 
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. 运行中并且尝试往等待任务队列添加任务
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 重复检查运行状态,如果不是运行中,把刚加到队列的任务移除并且会重置ctl的值
        if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略处理器
            reject(command);
        // 检查当前线程数是否为0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 队列添加任务失败(队列满的情况),尝试直接添加worker(最大线程数控制)
    else if (!addWorker(command, false))
        // 如果已经达到最大线程数,走拒绝策略处理器
        reject(command);
}

拿上面我的例子分析一下这个execute执行逻辑

i=0 => 1处 workerCountOf(c) = 0,判断为true,直接添加Worker

i=1 => 1处 workerCountOf(c) = 1,判断为true,直接添加Worker

i=2 => 1处 workerCountOf(c) = 2,判断为false;执行到2处,往workQueue添加一个任务,添加成功

i=3 => 1处 workerCountOf(c) = 2,判断为false;执行到2处,往workQueue添加一个任务,添加成功

i=4 => 1处 workerCountOf(c) = 2,判断为false;执行到2处,往workQueue添加一个任务,此时workQueue队列中已经有两个任务,容量已经满了,添加失败,判断为false;执行到3处,尝试直接添加一个Worker(新创建一个线程),当前已有的Worker数为2(i=0和i=1时创建的),没有达到最大线程数限制,所以创建成功

i=5 => 1处 workerCountOf(c) = 3,判断为false;执行到2处,往workQueue添加一个任务,此时workQueue队列中已经有两个任务,容量已经满了,添加失败,判断为false;执行到3处,尝试直接添加一个Worker(新创建一个线程),当前已有的Worker数为3(i=0、i=1、i4时创建的),没有达到最大线程数限制,所以创建成功

i=6 => 1处 workerCountOf(c) = 4,判断为false;执行到2处,往workQueue添加一个任务,此时workQueue队列中已经有两个任务,容量已经满了,添加失败,判断为false;执行到3处,尝试直接添加一个Worker(新创建一个线程),当前已有的Worker数为3(i=0、i=1、i=4、i=5时创建的),此时达到最大线程数限制,所以添加Worker失败,执行拒绝策略处理器(这里用到的是调用线程执行策略)

运行结果:

--------pool-1-thread-2 --------main --------pool-1-thread-3 --------pool-1-thread-1 --------pool-1-thread-2 --------pool-1-thread-3

创建了三个线程,i=6时交给了main主线程执行任务。

通过上面的分析,如果队列是无界的,最大核心线程数这个参数就不起作用。

接下来看看添加的Worker是什么玩意

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
  
Worker(Runnable firstTask) {
    setState(-1); 
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

Worker是一个可执行的任务,每个Worker拥有一个线程,并且在初始化时把自己当做第一个任务放到了这个线程中。

添加Worker过程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 判断当前Worker数量(线程数)是否大于核心线程数或者最大线程数(根据入参core决定)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 通过AQS的cas先占一个线程数,占位成功跳出最外层循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
       // 创建Worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁,workers成员变量是线程不安全的
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 创建Worker成功,并且添加到workers中,然后通过启动Worker中的线程执行任务
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

添加Worker这个方法维护着workers,添加Worker成功后马上启动其拥有的线程来执行任务。

怎样控制核心线程数

创建Worker后启动了其属性中的线程,正常情况下线程执行完任务会关闭销毁,那Worker对线程做了什么让其循环使用呢?t.start之后交给了JVM,当JVM拿到资源高兴时就会回调run方法。

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task不为空或者从等待队列中获取到task执行 task的run方法 
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

当前Worker的task!=null执行任务中的run方法,然后把task置为null,此时执行getTask获取任务,如此循环。从这里得知,核心线程数量和最大核心线程数量两个参数其实是控制Worker数量,通过Worker的数量控制线程的数量,每创建一个Worker调用到该方法就开始长轮询,这样就避免了线程处理任务结束后的关闭和销毁了。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                // 返回null使 runWorker 方法结束while循环
                return null;
            continue;
        }

        try {
            // 从等待队列中获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 设置超时
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

该方法从队列中获取任务,大于核心线程数的线程空闲时间的参数就是用在这个获取等待队列等待超时时间,如果超时并且线程数量大于核心线程数量时就会返回null,此时会触发runWorker 方法结束while循环,最后回收改Worker

拒绝策略

jdk提供了四种拒绝策略

AbortPolicy 直接抛出异常,默认用的是这种

DiscardPolicy 丢弃该任务不处理

DiscardOldestPolicy 丢弃等待队列中放入最久的一个任务,然后把该任务放到execute中执行

CallerRunsPolicy 直接调用改任务的run方法,相当于在调用线程中执行

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • CountDownLatch、CyclicBarrier和Semaphore使用

    CountDownLatch是用来线程计数的。等待一组线程全部执行完后再本线程继续执行。如:A线程需要等待B、C和D(由初始化CountDownLatch参数觉...

    GreizLiao
  • Queue-PriorityQueue源码解析

    Queue队列通常是先进先出(FIFO),但也有特殊的非FIFO,如本文也分析的PriorityQueue。

    GreizLiao
  • Arthas查看Spring配置

    在开发过程经常有同学问:“我这个配置更新提交了,怎么样知道项目中是否已经更新使用新值?” 常用的方法是添加日志打印该值判断是否更新。今天我们用Arthas来实现...

    GreizLiao
  • Java源码之ThreadPoolExecutor

    “ ThreadPoolExecutor类是线程池的基础,线程池的目的是为了减少了每个任务调用的开销,在拥有大量异步任务时可以增强的性能,并且还可以提供绑定和管...

    每天学Java
  • 面经手册 · 第21篇《手写线程池,对照学习ThreadPoolExecutor线程池实现原理!》

    正好是2020年,看到这张图还是蛮有意思的。以前小时候总会看到一些科技电影,讲到机器人会怎样怎样,但没想到人似乎被娱乐化的东西,搞成了低头族、大肚子!

    小傅哥
  • Java的重入锁ReentrantLock

    ReentrantLock重入锁,是实现Lock接口的一个类,也是在实际编程中使用频率很高的一个锁,支持重入性,表示能够对共享资源能够重复加锁,即当前线程获取该...

    用户3467126
  • 源码分析—ThreadPoolExecutor线程池三大问题及改进方案

    在一次聚会中,我和一个腾讯大佬聊起了池化技术,提及到java的线程池实现问题,我说这个我懂啊,然后巴拉巴拉说了一大堆,然后腾讯大佬问我说,那你知道线程池有什么缺...

    luozhiyun
  • 线程池-线程池源码详解

    在ThreadPoolExecutor的属性定义中频繁地用位移运算来表示线程池状态,位移运算是改变当前值的一种高效手段,包括左移和右移。下面从属性定义开始阅读T...

    DougWang
  • 死磕 java同步系列之开篇

    同步系列,这是彤哥想了好久的名字,本来是准备写锁相关的内容,但是java中的CountDownLatch、Semaphore、CyclicBarrier这些类又...

    彤哥
  • iOS多线程编程之一——NSThread线程管理

    NSTread是iOS中进行多线程开发的一个类,其结构逻辑清晰,使用十分方便,但其封装度和性能不高,线程周期,加锁等需要手动处理。

    珲少

扫码关注云+社区

领取腾讯云代金券