前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >通过ThreadPoolExecutor源码分析线程池实现原理

通过ThreadPoolExecutor源码分析线程池实现原理

作者头像
GreizLiao
发布2020-03-17 17:53:59
3100
发布2020-03-17 17:53:59
举报

为什么要用线程池

线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性。使用线程池可以重复利用已创建的线程降低线程创建和销毁带来的消耗,随之即可提高响应速度(当一个任务到达时,不需要重新创建线程来为之服务,重用已有线程),还可以通过线程池控制线程资源统一分配和监控等。

线程池工厂Executors

JDK 提供了创建线程池的工厂类 Executors,该类提供了创建线程池的静态方法:

  1. public static ExecutorService newFixedThreadPool(int nThreads)
  2. public static ExecutorService newSingleThreadExecutor()
  3. public static ExecutorService newCachedThreadPool()
  4. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
  5. public static ExecutorService newWorkStealingPool()

1-4 方式创建的线程池都是new ThreadPoolExecutor 对象,只是入参不同;第5种基于 ForkJoinPool 实现(以后会写一篇关于此实现的总结)。通过Executors创建线程池有很多局限性和隐患,在阿里巴巴《Java开发手册》中提出:

各个参数的作用

ThreadPoolExecutor 最终通过以下构造方法创建

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

corePoolSize 核心线程数 maximumPoolSize 最大线程数 keepAliveTime 大于核心线程数的线程存活时间 TimeUnit 时间单位,配合keepAliveTime使用 BlockingQueue 等待执行任务队列 ThreadFactory 创建线程的工厂 RejectedExecutionHandler 拒绝策略处理器

接下来通过源码分析一下各个参数的作用

private static final ExecutorService executor =
        new ThreadPoolExecutor(2, 3, 1, TimeUnit.SECONDS,
                new ArrayBlockingQueue(2), new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) {
    for (int i = 0; i < 6; i++) {
        executor.execute(() -> {
            System.out.println("--------" + Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

创建核心线程数为2、最大线程数为3、大于核心线程数的线程空闲存活时间为1秒、存放执行任务的队列容器大小为2、线程池拒绝线程后由执行线程处理改任务的线程池。

线程池创建后,通过execute方法提交任务到线程池,看一下execute方法具体的实现:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // ctl是线程安全的原子操作AtomicInteger类型,控制着线程池worker数量、运行状态以及关闭状态等。
    int c = ctl.get();
   // 1. 池内线程数小于核心线程数
    if (workerCountOf(c) < corePoolSize) {
        // 添加一个worker,即新创建一个线程,添加成功则 return 
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. 运行中并且尝试往等待任务队列添加任务
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 重复检查运行状态,如果不是运行中,把刚加到队列的任务移除并且会重置ctl的值
        if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略处理器
            reject(command);
        // 检查当前线程数是否为0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 队列添加任务失败(队列满的情况),尝试直接添加worker(最大线程数控制)
    else if (!addWorker(command, false))
        // 如果已经达到最大线程数,走拒绝策略处理器
        reject(command);
}

拿上面我的例子分析一下这个execute执行逻辑

i=0 => 1处 workerCountOf(c) = 0,判断为true,直接添加Worker

i=1 => 1处 workerCountOf(c) = 1,判断为true,直接添加Worker

i=2 => 1处 workerCountOf(c) = 2,判断为false;执行到2处,往workQueue添加一个任务,添加成功

i=3 => 1处 workerCountOf(c) = 2,判断为false;执行到2处,往workQueue添加一个任务,添加成功

i=4 => 1处 workerCountOf(c) = 2,判断为false;执行到2处,往workQueue添加一个任务,此时workQueue队列中已经有两个任务,容量已经满了,添加失败,判断为false;执行到3处,尝试直接添加一个Worker(新创建一个线程),当前已有的Worker数为2(i=0和i=1时创建的),没有达到最大线程数限制,所以创建成功

i=5 => 1处 workerCountOf(c) = 3,判断为false;执行到2处,往workQueue添加一个任务,此时workQueue队列中已经有两个任务,容量已经满了,添加失败,判断为false;执行到3处,尝试直接添加一个Worker(新创建一个线程),当前已有的Worker数为3(i=0、i=1、i4时创建的),没有达到最大线程数限制,所以创建成功

i=6 => 1处 workerCountOf(c) = 4,判断为false;执行到2处,往workQueue添加一个任务,此时workQueue队列中已经有两个任务,容量已经满了,添加失败,判断为false;执行到3处,尝试直接添加一个Worker(新创建一个线程),当前已有的Worker数为3(i=0、i=1、i=4、i=5时创建的),此时达到最大线程数限制,所以添加Worker失败,执行拒绝策略处理器(这里用到的是调用线程执行策略)

运行结果:

--------pool-1-thread-2 --------main --------pool-1-thread-3 --------pool-1-thread-1 --------pool-1-thread-2 --------pool-1-thread-3

创建了三个线程,i=6时交给了main主线程执行任务。

通过上面的分析,如果队列是无界的,最大核心线程数这个参数就不起作用。

接下来看看添加的Worker是什么玩意

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
  
Worker(Runnable firstTask) {
    setState(-1); 
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

Worker是一个可执行的任务,每个Worker拥有一个线程,并且在初始化时把自己当做第一个任务放到了这个线程中。

添加Worker过程

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 判断当前Worker数量(线程数)是否大于核心线程数或者最大线程数(根据入参core决定)
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 通过AQS的cas先占一个线程数,占位成功跳出最外层循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
       // 创建Worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁,workers成员变量是线程不安全的
            mainLock.lock();
            try {
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 创建Worker成功,并且添加到workers中,然后通过启动Worker中的线程执行任务
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

添加Worker这个方法维护着workers,添加Worker成功后马上启动其拥有的线程来执行任务。

怎样控制核心线程数

创建Worker后启动了其属性中的线程,正常情况下线程执行完任务会关闭销毁,那Worker对线程做了什么让其循环使用呢?t.start之后交给了JVM,当JVM拿到资源高兴时就会回调run方法。

public void run() {
    runWorker(this);
}
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // task不为空或者从等待队列中获取到task执行 task的run方法 
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

当前Worker的task!=null执行任务中的run方法,然后把task置为null,此时执行getTask获取任务,如此循环。从这里得知,核心线程数量和最大核心线程数量两个参数其实是控制Worker数量,通过Worker的数量控制线程的数量,每创建一个Worker调用到该方法就开始长轮询,这样就避免了线程处理任务结束后的关闭和销毁了。

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                // 返回null使 runWorker 方法结束while循环
                return null;
            continue;
        }

        try {
            // 从等待队列中获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 设置超时
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

该方法从队列中获取任务,大于核心线程数的线程空闲时间的参数就是用在这个获取等待队列等待超时时间,如果超时并且线程数量大于核心线程数量时就会返回null,此时会触发runWorker 方法结束while循环,最后回收改Worker

拒绝策略

jdk提供了四种拒绝策略

AbortPolicy 直接抛出异常,默认用的是这种

DiscardPolicy 丢弃该任务不处理

DiscardOldestPolicy 丢弃等待队列中放入最久的一个任务,然后把该任务放到execute中执行

CallerRunsPolicy 直接调用改任务的run方法,相当于在调用线程中执行

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-03-15 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 为什么要用线程池
  • 线程池工厂Executors
  • 各个参数的作用
  • 怎样控制核心线程数
  • 拒绝策略
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档