前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >jvm源码分析(四)ThreadPoolExecutor

jvm源码分析(四)ThreadPoolExecutor

作者头像
用户6203048
发布2020-10-27 14:37:53
3550
发布2020-10-27 14:37:53
举报
文章被收录于专栏:JathonKatuJathonKatuJathonKatu

线程池是一种为了避免线程频繁的创建和销毁,带来性能消耗而建立的一种池化技术。

它是把已创建的线程放入“池”中,当有任务来临,就可以重用已有的线程,无需等待创建的过程,可以有效提高程序的响应速度。

阿里巴巴的java开发手册规定:

线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的读者更加明确线程池的运行规则,规避资源耗尽的风险。

说明:Executors的弊端如下

FixedThreadPool和SingleThreadPool允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量请求,导致OOM

CachedThreadPool和ScheduledThreadPool允许创建的线程数量为Integer.MAX_VALUE,可能会创建大量线程,导致OOM

但是,事实上这四个创建方法底层都是通过ThreadPoolExecutor创建。

一、ThreadPoolExecutor构建时传递的参数

int corePoolSize

线程池常驻核心线程数

解释:

如果为0,则表示没有线程时销毁线程池。

如果不为0,即使没有任务,也会保证线程的数量等于这个值。

注意:设置的比较小,会频繁的创建和销毁线程。设置的比较大,会浪费资源。

int maximumPoolSize

最多可以创建的线程数

官方规定:必须大于0,且必须大于corePoolSize

任务比较多,且不能放入任务队列,才会用到

long keepAliveTime

线程存活时间

当线程池空闲,且超过这个时间,多余的线程就会被销毁,直到线程数量 = corePoolSize

如果maximumPoolSize = corePoolSize,当线程池空闲时也不会销毁任何线程

TimeUnit unit

存活时间的单位

是个枚举方法,存放了从nm~day的单位,及互相的转换进制

BlockingQueue<Runnable> workQueue

线程池执行的任务队列

当线程池的所有线程都在处理任务,新来的任务就会在此队列中排队等待执行

ThreadFactory threadFactory

线程的创建工厂

用的较少,大部分用的默认线程创建工厂

也可以通过实现ThreadFactory,自定义线程名称和线程执行的优先级等属性。

RejectedExecutionHandler handler

线程池的拒绝策略,是一种限流保护

线程池任务已经在缓存队列workQueue已经满了,并且不能创建新的线程,就会用到拒绝策略

默认是ThreadPoolExecutor的静态内部类AbortPolicy,拒绝方法如下:

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

二、线程池状态

运行(RUNNING):该状态下的线程池接收新任务并处理队列中的任务;线程池创建完毕就处于该状态,也就是正常状态;

关机(SHUTDOWN):线程池不接受新任务,但处理队列中的任务;线程池调用shutdown()之后的池状态;

停止(STOP):线程池不接受新任务,也不处理队列中的任务,并中断正在执行的任务;线程池调用shutdownNow()之后的池状态;

清理(TIDYING):线程池所有任务已经终止,workCount(当前线程数)为0;过渡到清理状态的线程将运行terminated()钩子方法;

终止(TERMINATED):terminated()方法结束后的线程池状态;

三、线程池主要方法解析

ThreadPoolExecutor中有个int型变量(其实是AtomicInteger类型)ctl,

它的作用是存储线程池的状态和工作线程数量,

因为ThreadPoolExecutor中定义的状态有5种(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED),

用1位表示,则只有0、1两种情况,只能表示两种状态;

用2位表示,有00、01、10、11四种情况,只能表示四种状态;

用3位表示,有222=8种情况;

所以状态值至少要用3位,那么就可以用int的高3位来表示(最左边3个),剩下29个就可以表示线程数量,所以workerCount最大值为2^29-1

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

每当线程池中的线程数量或状态发生变化时,具体操作的便是ctl变量,如以下方法:

private boolean compareAndIncrementWorkerCount(int expect) {
    return ctl.compareAndSet(expect, expect + 1);
}

那么又是读取线程状态和数量的值呢:

读取状态利用以下方法:

private static int runStateOf(int c) { return c & ~CAPACITY; }

CAPACITY是个常量00011111 11111111 11111111 11111111,通过 &(按位与)运算,

可以保留高3位,把低29位全部变为0;

读取数量利用以下方法:

private static int workerCountOf(int c) { return c & CAPACITY; }

可以把高3位变为0,低29位保留。

addWorker

addWorker(Runnable firstTask, boolean core)

firstTask

线程应首先运行的任务,如果没有则可以置为null

core

判断是否可以创建线程的阈值(最大值)

如果等于true则表示用corePoolSize做为阈值(false则用maximumPoolSize)

execute方法

public void execute(Runnable command) {// 校验传进来的线程是否为空
    if (command == null)
        throw new NullPointerException();
    // 线程池里的线程 + 1 这个数就 + 1,第一个线程是-536870912
    // c + CAPACITY + 2 = 线程数
    int c = ctl.get(); 
    // workerCountOf(C) = c & (1 << Integer.SIZE - 3) - 1 = c & (1 << 29) - 1
    // 池内线程数量 < 线程池常驻核心线程数
    if (workerCountOf(c) < corePoolSize) { 
        // 当常驻线程数 < 设定常驻线程上限时,往workerSet中添加新线程,并返回(后面是另一种场景的判断)
        if (addWorker(command, true))
            return;
        // 重新获取当前线程数
        c = ctl.get();
    }
    // c < 0,即线程池还处于running状态则尝试将线程放入阻塞队列,后者则成功返回true
    if (isRunning(c) && workQueue.offer(command)) {
        // 获取当前线程数
        int recheck = ctl.get();
        // 再判断线程池是否是running状态,如果否,则尝试将线程从线程池中删除
        if (! isRunning(recheck) && remove(command))
        // 拒绝策略,抛出异常
            reject(command);
        // 如果发现没有worker,则补充一个空任务的worker
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // !c < 0 或 c < 0 && 尝试放入阻塞队列失败
    else if (!addWorker(command, false))
        // 拒绝策略,抛出异常
        reject(command);
}

上文中22行reject调用

handler.rejectedExecution(command, this);

这里的handler我们在debugger的时候可以看到是ThreadPoolExecutor的内部类,如下图1。也可以从明明和注释中看出,默认是调用内部类AbortPolicy的方法,如图2

这个方法没有什么作用,就是抛出一个异常

public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                         " rejected from " +
                                         e.toString());
}

四、常见面试题

ThreadPoolExecutor的执行方法有几种,他们有什么区别

execute()和submit

他们都是用来执行线程池任务的

submit可以接收线程池的返回值(Callable<V>&Future<V>),是ExecutorService的方法

execute不能接受返回值,是Executor的方法

什么是线程的拒绝策略

当提交任务数大于 corePoolSize 的时候,会优先将任务放到 workQueue 阻塞队列中。当阻塞队列饱和后,会扩充线程池中线程数,直到达到 maximumPoolSize 最大线程数配置。此时,再多余的任务,则会触发线程池的拒绝策略了。

拒绝策略提供顶级接口 RejectedExecutionHandler ,其中方法 rejectedExecution 即定制具体的拒绝策略的执行逻辑。

拒绝策略的分类有哪些

AbortPolicy,默认,线程池会抛出异常,并终止执行

CallerRunsPolicy,把任务交给当前线程来执行

DiscardPolicy,忽略此任务(新任务)

DiscardOldestPolicy ,如果线程池没有关闭,则丢弃阻塞队列中最老的一个

如何自定义拒绝策略

实现RejectedExecutionHandler接口,重写rejectedExecution 方法。

ThreadPoolExecutor能不能实现扩展,如何实现扩展

重写beforeExecute和afterExecute方法(添加日志,实现数据统计,执行时间统计)

题外话

为什么不推荐使用ThreadGroup,我们可以通过Thread.currentThread()获取当前线程,再通过Thread.getThreadGroup返回线程所属的线程组,然后调用ThreadGroup的destroy、interrupt、stop、resume等方法影响线程组中的其他线程,导致线程不安全等问题。

疑问

既然已经弃用,为什么Executors中还是有ThreadGroup的身影?

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-06-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JathonKatu 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档