专栏首页一个执拗的后端搬砖工线程池ThreadPoolExecuter使用详解

线程池ThreadPoolExecuter使用详解

使用过jdk自带线程池或者看过源码的都知道,jdk1.5版本引入了并发包,线程池就是其中一个比较重要的内容,所谓线程池和连接池以及其他池子一样,其核心概念在于“池”,抛开技术来说,“池”在我们日常生活中其实就是容器的概念,比如水池等等,水池的目的是为了下雨了可以蓄水,干旱了可以用来灌溉,那么应用编程中的“池”可以理解成缓存,用一张图来更直观的理解“池”的概念和作用:

图中圆角图分别是线程池和连接池,应用程序需要用到多线程的时候,优先去线程池取,然后处理业务逻辑,处理完了之后把线程归还到线程池,当应用程序访问数据库的时候,优先去连接池获取连接,然后操作数据库,操作完成之后把连接归还到连接池。

从上述描述中,对线程池有了大概的概念,接下来我们详细分析一下jdk自带线程池的用法和实现原理。

常见使用方式分析

线程池的常用使用方式是:

ExecutorService executorService = Executors.newXXXThreadPool();

java通过Executors提供了常用的四种线程池:

四种线程池分别是:

newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。

newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

然后我们分析一下每种线程池的用法和原理:

newCachedThreadPool线程池

创建一个线程池,根据需要创建新线程,但将在可用时重用先前构建的线程。这些池通常将提高执行许多短期异步任务的程序的性能。如果可用,对execute方法的调用将重用先前构造的线程。如果没有现有线程可用,则将创建新线程并将其添加到池中。未使用六十秒的线程被终止并从缓存中移除。因此,一个长期闲置的池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有相似属性但不同细节(例如,超时参数)的线程池。

newCachedThreadPool线程池的常用使用方式如下:

public static void main(String[] args) {

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

for (int i = 0; i < 10; i++) {

final int index = i;

try {

Thread.sleep(index * 1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

cachedThreadPool.execute(() -> System.out.println(index));

}

}

由于是缓存线程池,线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程,如果任务比较耗时,第二个任务来的时候第一个还没执行完,会新建线程。

特别注意的是,如果使用newCachedThreadPool线程池,突发访问量特别大,有可能导致内存溢出,改造上述代码并运行:

public static void main(String[] args) {

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

for (int i = 1; i < 10000; i++)

cachedThreadPool.submit(() -> {

try {

Thread.sleep(5000);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

运行后发生om错误:

所以一定要慎用newCachedThreadPool线程池,或者使用的时候不适用其默认提供的方式,而是基于ThreadPoolExecutor自己构造。

newFixedThreadPool线程池

创建一个指定线程数量从无界队列共享任务的线程池,在任何时候,至多{nThreads}线程将是活动的处理任务。如果在所有线程都处于活动状态时提交额外的任务,则它们将在队列中等待,直到线程可用。如果任何线程在关闭之前由于执行过程中的失败而终止,那么如果需要执行后续任务,将替换一个新的线程。池中的线程将存在,直到显式调用shutdown关闭为止。

newFixedThreadPool线程池常用使用方式如下:

public static void main(String[] args) {

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

for (int i = 0; i < 10; i++) {

final int index = i;

fixedThreadPool.execute(() -> {

try {

System.out.println(index);

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

}

因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。

需要注意的是,newFixedThreadPool线程池使用的队列也是无界队列LinkedBlockingQueue,源码如下:

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

所以使用的时候不会因为像newCachedThreadPool线程池创建过多的线程导致om错误,但是可能会因为突发访问量特别大,导致newFixedThreadPool线程池中活跃线程处理不过来,然后任务大量堆积在队列中,而LinkedBlockingQueue队列又是无界的,所以会因为任务大量推挤到LinkedBlockingQueue导致om错误。改造上述代码并运行:

public static void main(String[] args) {

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

for (int i = 0; i < 50000; i++) {

final int index = i;

fixedThreadPool.submit(() -> {

try {

System.out.println(index);

Thread.sleep(10000);

} catch (InterruptedException e) {

e.printStackTrace();

}

});

}

}

运行程序也出现了om错误:

在使用newFixedThreadPool线程池的时候,也要评估访问量峰值来设置活跃线程的大小,另外在互联网行业,流量变化的不可预知性太强,所以不建议使用默认的方式创建newFixedThreadPool线程池,而是使用原生的ThreadPoolExecutor构造方法创建线程池,根据业务需要指定活跃线程数和任务队列长度(不建议使用无界队列)。

newScheduledThreadPool线程池

创建一个在给定延迟之后执行命令或者定时执行命令的调度线程池,{corePoolSize}是保留在线程池中的活跃线程数量,即使是空闲的也不会回收。

newScheduledThreadPool线程池常用使用方式如下:

public static void main(String[] args) {

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

scheduledThreadPool.scheduleWithFixedDelay(() ->System.out.println("delay 3 seconds"),1,3,TimeUnit.SECONDS);

}

上述代码的作用是延迟1秒执行,并且每3秒执行一次,其实起到一个简易调度的功能。

使用调度线程池需要注意的是,使用Executors创建的调度线程池默认任务队列DelayedWorkQueue是无界队列:

public ScheduledThreadPoolExecutor(int corePoolSize) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue());

}

所以使用调度线程池的时候也要注意,当并发调度任务比较大的时候,也可能出现om错误。改造测试代码:

public static void main(String[] args) {

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);

for(int i = 0;i < 10000;i ++) {

scheduledThreadPool.scheduleWithFixedDelay(() ->System.out.println("delay 3 seconds"),1,3,TimeUnit.SECONDS);

}

}

运行程序也出现了内存溢出:

这种场景我们遇到的比较少,一般不需要关心(调度大多由调度中间件完成),如果真的遇到比较特殊的场景,也强烈建议使用ThreadPoolExecutor原生的构造方法创建线程池,指定队列类型和长度。

newSingleThreadExecutor线程池

创建一个线程池(执行器),它使用一个从无界队列中操作的单个工作线程。(但是,需要注意的是,如果该单个线程在关闭之前由于执行过程中的失败而终止,那么如果需要执行后续任务,将替换一个新的线程。)任务被保证顺序执行,并且在任何给定时间都不会有多于一个任务是活动的。与其他等价的{@code newFixedThreadPool(1)}不同,返回的执行器保证不会重新配置来使用其他线程。

newSingleThreadExecutor线程池的使用方式如下:

public static void main(String[] args) {

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

for (int i = 0; i < 10; i++) {

final int index = i;

singleThreadExecutor.execute(() -> {

try {

System.out.println(index);

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}});

}

}

依次输出结果,也就是顺序的执行每个任务。像前边几种线程池一样,newSingleThreadExecutor线程池默认也是使用无界队列存储任务,源码如下:

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>()));

}

当突发请求量比较大的时候,任务队列过大也会导致om错误。改造测试代码:

public static void main(String[] args) {

ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();

for (int i = 0; i < 100000; i++) {

final int index = i;

singleThreadExecutor.execute(() -> {

try {

System.out.println(index);

Thread.sleep(2000);

} catch (InterruptedException e) {

e.printStackTrace();

}});

}

}

运行测试代码,发现报了内存溢出:

单线程线程池的应用场景一般是需要顺序处理的业务,并发量不会太多,也不会有人创建单线程的线程池来处理高并发请求,但是使用的时候也需要注意。

从以上四种线程池的使用以及原理分析可以总结出比较重要的两点:

1) 慎用缓存线程池,如果并发量大会使线程创建过多导致jvm内存溢出。

2) 慎用无界队列,使用除缓存线程池之外的其他三种线程池,如果并发量比较大,都会遇到任务大量堆积到队列中导致om错误。

线程池原理分析

从上边四种线程池的使用和存在的问题分析,想必我们对jdk自带线程池有了比较深刻的理解,接着我们对线程池的实现原理做一下剖析。以newFixedThreadPool线程池为例,参照源码分析一下其实现原理。

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

newFixedThreadPool方法调用了5个参数的构造器,代码中有几个比较重要的参数:

1)corePoolSize: 是核心线程数,此处传入nThreads(线程池一直持有的线程数量,就算线程空闲)

2)maximumPoolSize: 是线程池中拥有的最大线程数量,此处传入 nThreads,核心线程和最大线程一样(避免了不停销毁线程和新建线程带来的开销)

3)keepAliveTime:当线程池中线程数量大于corePoolSize时,大于核心线程数量部分线程空闲时存活时间,此处传入0

4)unit:存活时间的单位,此处传入TimeUnit.MILLISECONDS毫秒

5) workQueue:任务队列,如果新提交的任务无法立即被线程处理,那么将会放入任务队列,此处传入LinkedBlockingQueue无界队列

接着看源码,又调用了7个参数的构造方法:

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue<Runnable> workQueue) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

Executors.defaultThreadFactory(), defaultHandler);

}

该构造方法比前边多了两个参数:

6) threadFactory:创建线程使用的线程工厂,此处使用的是默认线程工厂Executors.defaultThreadFactory()

7)handler:当线程池无法处理新提交的任务时的拒绝策略,此处使用默认的AbortPolicy

newFixedThreadPool线程池处理请求时的模型大致如下图:

当线程池接收到新的任务的时候处理流程如下:

核心思想就是,线程池新接收任务后,如果当前线程池的线程数量小于核心线程池数量,无论如何都会创建一个线程去处理任务,如果线程池中的线程到达核心线程数量但是有空闲,那么就把任务提交到任务队列,如果线程数到达核心线程数量,并且没有空闲,就尝试新建一个线程处理任务,如果当前线程数量到达最大线程数量,但是有线程空闲,那就提交到任务队列,如果线程数量达到最大线程数,并且任务队列已满,那么就使用拒绝策略拒绝任务。

本文分享自微信公众号 - PersistentCoder(TyphoonChan),作者:叔牙

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2018-09-15

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • jdk源码分析之HashMap--并发情况下remove失败

    了解过jdk源码的都知道,Hash底层是使用数组+链表的方式实现的,大概如下图:

    叔牙
  • jdk源码分析之原子变量AtomicInteger--乐观锁与CAS以及问题

    熟悉jdk的人都知道,从jdk1.5开始引入了并发包,旨在对于java内置锁synchronized解决不了或者不适用的场景提供一种更优的解决方案。

    叔牙
  • jdk源码分析之HashMap--为什么初始容量是2的n次幂

    熟悉HashMap的人都知道,其底层是数组+链表结构实现,也就是说我们常用的get和put操作中,key要和底层的结构关联对应起来,先看一下HashMa...

    叔牙
  • 40 个Java多线程问题总结

    原文地址:http://www.cnblogs.com/xrq730/p/5060921.htm

    一个优秀的废人
  • java中线程池的几种实现方式

    多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力.

    海仔
  • 100道Java并发和多线程基础面试题大集合(含解答),这波面试稳了~

    这些多线程的问题来源于各大网站,可能有些问题网上有、可能有些问题对应的答案也有、也可能有些各位网友也都看过,但是本文写作的重心就是所有的问题都会按照自己的理解回...

    程序员白楠楠
  • 线程的创建

    1. 定义Thread类的子类,并重写该类的run()方法,该run()方法的方法体就代表了线程需要完成的任务。

    黑洞代码
  • 面试必考——线程池原理概述

    线程池的源码解析较为繁琐。各位同学必须先大体上理解线程池的核心原理后,方可进入线程池的源码分析过程。

    黑洞代码
  • 图文介绍进程和线程的区别

    先了解一下操作系统的一些相关概念,大部分操作系统(如Windows、Linux)的任务调度是采用时间片轮转的抢占式调度方式,也就是说一个任务执行一小段时间后强制...

    趣学程序-shaofeer
  • 笔记09 - 线程池刨根问底

    我们知道CPU运行的最小单位是线程,Java中实现并发是通过多线程来完成的,利用多线程提高了对CPU资源的利用率,但是线程的创建和销毁是很消耗性能的。线程的创建...

    码农帮派

扫码关注云+社区

领取腾讯云代金券