前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程池ThreadPoolExecuter使用详解

线程池ThreadPoolExecuter使用详解

作者头像
叔牙
发布2020-11-19 15:08:17
3560
发布2020-11-19 15:08:17
举报

使用过jdk自带线程池或者看过源码的都知道,jdk1.5版本引入了并发包,线程池就是其中一个比较重要的内容,所谓线程池和连接池以及其他池子一样,其核心概念在于“池”,抛开技术来说,“池”在我们日常生活中其实就是容器的概念,比如水池等等,水池的目的是为了下雨了可以蓄水,干旱了可以用来灌溉,那么应用编程中的“池”可以理解成缓存,用一张图来更直观的理解“池”的概念和作用:

图中圆角图分别是线程池和连接池,应用程序需要用到多线程的时候,优先去线程池取,然后处理业务逻辑,处理完了之后把线程归还到线程池,当应用程序访问数据库的时候,优先去连接池获取连接,然后操作数据库,操作完成之后把连接归还到连接池。

从上述描述中,对线程池有了大概的概念,接下来我们详细分析一下jdk自带线程池的用法和实现原理。

常见使用方式分析

线程池的常用使用方式是:

ExecutorService executorService = Executors.newXXXThreadPool();

java通过Executors提供了常用的四种线程池:

四种线程池分别是:

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

然后我们分析一下每种线程池的用法和原理:

newCachedThreadPool线程池

创建一个线程池,根据需要创建新线程,但将在可用时重用先前构建的线程。这些池通常将提高执行许多短期异步任务的程序的性能。如果可用,对execute方法的调用将重用先前构造的线程。如果没有现有线程可用,则将创建新线程并将其添加到池中。未使用六十秒的线程被终止并从缓存中移除。因此,一个长期闲置的池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有相似属性但不同细节(例如,超时参数)的线程池。

newCachedThreadPool线程池的常用使用方式如下:

public static void main(String[] args) {

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {

final int index = i;

try {

Thread.sleep(index * 1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

cachedThreadPool.execute(() -> System.out.println(index));

}

}

由于是缓存线程池,线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程,如果任务比较耗时,第二个任务来的时候第一个还没执行完,会新建线程。

特别注意的是,如果使用newCachedThreadPool线程池,突发访问量特别大,有可能导致内存溢出,改造上述代码并运行:

public static void main(String[] args) {

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

for (int i = 1; i < 10000; i++)

cachedThreadPool.submit(() -> {

try {

Thread.sleep(5000);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

运行后发生om错误:

所以一定要慎用newCachedThreadPool线程池,或者使用的时候不适用其默认提供的方式,而是基于ThreadPoolExecutor自己构造。

newFixedThreadPool线程池

创建一个指定线程数量从无界队列共享任务的线程池,在任何时候,至多{nThreads}线程将是活动的处理任务。如果在所有线程都处于活动状态时提交额外的任务,则它们将在队列中等待,直到线程可用。如果任何线程在关闭之前由于执行过程中的失败而终止,那么如果需要执行后续任务,将替换一个新的线程。池中的线程将存在,直到显式调用shutdown关闭为止。

newFixedThreadPool线程池常用使用方式如下:

public static void main(String[] args) {

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

for (int i = 0; i < 10; i++) {

final int index = i;

fixedThreadPool.execute(() -> {

try {

System.out.println(index);

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

}

因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。

需要注意的是,newFixedThreadPool线程池使用的队列也是无界队列LinkedBlockingQueue,源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

所以使用的时候不会因为像newCachedThreadPool线程池创建过多的线程导致om错误,但是可能会因为突发访问量特别大,导致newFixedThreadPool线程池中活跃线程处理不过来,然后任务大量堆积在队列中,而LinkedBlockingQueue队列又是无界的,所以会因为任务大量推挤到LinkedBlockingQueue导致om错误。改造上述代码并运行:

public static void main(String[] args) {

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

for (int i = 0; i < 50000; i++) {

final int index = i;

fixedThreadPool.submit(() -> {

try {

System.out.println(index);

Thread.sleep(10000);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

}

运行程序也出现了om错误:

在使用newFixedThreadPool线程池的时候,也要评估访问量峰值来设置活跃线程的大小,另外在互联网行业,流量变化的不可预知性太强,所以不建议使用默认的方式创建newFixedThreadPool线程池,而是使用原生的ThreadPoolExecutor构造方法创建线程池,根据业务需要指定活跃线程数和任务队列长度(不建议使用无界队列)。

newScheduledThreadPool线程池

创建一个在给定延迟之后执行命令或者定时执行命令的调度线程池,{corePoolSize}是保留在线程池中的活跃线程数量,即使是空闲的也不会回收。

newScheduledThreadPool线程池常用使用方式如下:

public static void main(String[] args) {

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

scheduledThreadPool.scheduleWithFixedDelay(() ->System.out.println("delay 3 seconds"),1,3,TimeUnit.SECONDS);

}

上述代码的作用是延迟1秒执行,并且每3秒执行一次,其实起到一个简易调度的功能。

使用调度线程池需要注意的是,使用Executors创建的调度线程池默认任务队列DelayedWorkQueue是无界队列:

public ScheduledThreadPoolExecutor(int corePoolSize) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue());

}

所以使用调度线程池的时候也要注意,当并发调度任务比较大的时候,也可能出现om错误。改造测试代码:

public static void main(String[] args) {

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

for(int i = 0;i < 10000;i ++) {

scheduledThreadPool.scheduleWithFixedDelay(() ->System.out.println("delay 3 seconds"),1,3,TimeUnit.SECONDS);

}

}

运行程序也出现了内存溢出:

这种场景我们遇到的比较少,一般不需要关心(调度大多由调度中间件完成),如果真的遇到比较特殊的场景,也强烈建议使用ThreadPoolExecutor原生的构造方法创建线程池,指定队列类型和长度。

newSingleThreadExecutor线程池

创建一个线程池(执行器),它使用一个从无界队列中操作的单个工作线程。(但是,需要注意的是,如果该单个线程在关闭之前由于执行过程中的失败而终止,那么如果需要执行后续任务,将替换一个新的线程。)任务被保证顺序执行,并且在任何给定时间都不会有多于一个任务是活动的。与其他等价的{@code newFixedThreadPool(1)}不同,返回的执行器保证不会重新配置来使用其他线程。

newSingleThreadExecutor线程池的使用方式如下:

public static void main(String[] args) {

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

for (int i = 0; i < 10; i++) {

final int index = i;

singleThreadExecutor.execute(() -> {

try {

System.out.println(index);

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}});

}

}

依次输出结果,也就是顺序的执行每个任务。像前边几种线程池一样,newSingleThreadExecutor线程池默认也是使用无界队列存储任务,源码如下:

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>()));

}

当突发请求量比较大的时候,任务队列过大也会导致om错误。改造测试代码:

public static void main(String[] args) {

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

for (int i = 0; i < 100000; i++) {

final int index = i;

singleThreadExecutor.execute(() -> {

try {

System.out.println(index);

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}});

}

}

运行测试代码,发现报了内存溢出:

单线程线程池的应用场景一般是需要顺序处理的业务,并发量不会太多,也不会有人创建单线程的线程池来处理高并发请求,但是使用的时候也需要注意。

从以上四种线程池的使用以及原理分析可以总结出比较重要的两点:

1) 慎用缓存线程池,如果并发量大会使线程创建过多导致jvm内存溢出。

2) 慎用无界队列,使用除缓存线程池之外的其他三种线程池,如果并发量比较大,都会遇到任务大量堆积到队列中导致om错误。

线程池原理分析

从上边四种线程池的使用和存在的问题分析,想必我们对jdk自带线程池有了比较深刻的理解,接着我们对线程池的实现原理做一下剖析。以newFixedThreadPool线程池为例,参照源码分析一下其实现原理。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

newFixedThreadPool方法调用了5个参数的构造器,代码中有几个比较重要的参数:

1)corePoolSize: 是核心线程数,此处传入nThreads(线程池一直持有的线程数量,就算线程空闲)

2)maximumPoolSize: 是线程池中拥有的最大线程数量,此处传入 nThreads,核心线程和最大线程一样(避免了不停销毁线程和新建线程带来的开销)

3)keepAliveTime:当线程池中线程数量大于corePoolSize时,大于核心线程数量部分线程空闲时存活时间,此处传入0

4)unit:存活时间的单位,此处传入TimeUnit.MILLISECONDS毫秒

5) workQueue:任务队列,如果新提交的任务无法立即被线程处理,那么将会放入任务队列,此处传入LinkedBlockingQueue无界队列

接着看源码,又调用了7个参数的构造方法:

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

Executors.defaultThreadFactory(), defaultHandler);

}

该构造方法比前边多了两个参数:

6) threadFactory:创建线程使用的线程工厂,此处使用的是默认线程工厂Executors.defaultThreadFactory()

7)handler:当线程池无法处理新提交的任务时的拒绝策略,此处使用默认的AbortPolicy

newFixedThreadPool线程池处理请求时的模型大致如下图:

当线程池接收到新的任务的时候处理流程如下:

核心思想就是,线程池新接收任务后,如果当前线程池的线程数量小于核心线程池数量,无论如何都会创建一个线程去处理任务,如果线程池中的线程到达核心线程数量但是有空闲,那么就把任务提交到任务队列,如果线程数到达核心线程数量,并且没有空闲,就尝试新建一个线程处理任务,如果当前线程数量到达最大线程数量,但是有线程空闲,那就提交到任务队列,如果线程数量达到最大线程数,并且任务队列已满,那么就使用拒绝策略拒绝任务。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-09-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 PersistentCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档