使用过jdk自带线程池或者看过源码的都知道,jdk1.5版本引入了并发包,线程池就是其中一个比较重要的内容,所谓线程池和连接池以及其他池子一样,其核心概念在于“池”,抛开技术来说,“池”在我们日常生活中其实就是容器的概念,比如水池等等,水池的目的是为了下雨了可以蓄水,干旱了可以用来灌溉,那么应用编程中的“池”可以理解成缓存,用一张图来更直观的理解“池”的概念和作用:
图中圆角图分别是线程池和连接池,应用程序需要用到多线程的时候,优先去线程池取,然后处理业务逻辑,处理完了之后把线程归还到线程池,当应用程序访问数据库的时候,优先去连接池获取连接,然后操作数据库,操作完成之后把连接归还到连接池。
从上述描述中,对线程池有了大概的概念,接下来我们详细分析一下jdk自带线程池的用法和实现原理。
常见使用方式分析
线程池的常用使用方式是:
ExecutorService executorService = Executors.newXXXThreadPool();
java通过Executors提供了常用的四种线程池:
四种线程池分别是:
①newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
②newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
③newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
④newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
然后我们分析一下每种线程池的用法和原理:
newCachedThreadPool线程池
创建一个线程池,根据需要创建新线程,但将在可用时重用先前构建的线程。这些池通常将提高执行许多短期异步任务的程序的性能。如果可用,对execute方法的调用将重用先前构造的线程。如果没有现有线程可用,则将创建新线程并将其添加到池中。未使用六十秒的线程被终止并从缓存中移除。因此,一个长期闲置的池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有相似属性但不同细节(例如,超时参数)的线程池。
newCachedThreadPool线程池的常用使用方式如下:
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int index = i;
try {
Thread.sleep(index * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachedThreadPool.execute(() -> System.out.println(index));
}
}
由于是缓存线程池,线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程,如果任务比较耗时,第二个任务来的时候第一个还没执行完,会新建线程。
特别注意的是,如果使用newCachedThreadPool线程池,突发访问量特别大,有可能导致内存溢出,改造上述代码并运行:
public static void main(String[] args) {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
for (int i = 1; i < 10000; i++)
cachedThreadPool.submit(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
运行后发生om错误:
所以一定要慎用newCachedThreadPool线程池,或者使用的时候不适用其默认提供的方式,而是基于ThreadPoolExecutor自己构造。
newFixedThreadPool线程池
创建一个指定线程数量从无界队列共享任务的线程池,在任何时候,至多{nThreads}线程将是活动的处理任务。如果在所有线程都处于活动状态时提交额外的任务,则它们将在队列中等待,直到线程可用。如果任何线程在关闭之前由于执行过程中的失败而终止,那么如果需要执行后续任务,将替换一个新的线程。池中的线程将存在,直到显式调用shutdown关闭为止。
newFixedThreadPool线程池常用使用方式如下:
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(() -> {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
因为线程池大小为3,每个任务输出index后sleep 2秒,所以每两秒打印3个数字。定长线程池的大小最好根据系统资源进行设置。如Runtime.getRuntime().availableProcessors()。
需要注意的是,newFixedThreadPool线程池使用的队列也是无界队列LinkedBlockingQueue,源码如下:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
所以使用的时候不会因为像newCachedThreadPool线程池创建过多的线程导致om错误,但是可能会因为突发访问量特别大,导致newFixedThreadPool线程池中活跃线程处理不过来,然后任务大量堆积在队列中,而LinkedBlockingQueue队列又是无界的,所以会因为任务大量推挤到LinkedBlockingQueue导致om错误。改造上述代码并运行:
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 50000; i++) {
final int index = i;
fixedThreadPool.submit(() -> {
try {
System.out.println(index);
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
运行程序也出现了om错误:
在使用newFixedThreadPool线程池的时候,也要评估访问量峰值来设置活跃线程的大小,另外在互联网行业,流量变化的不可预知性太强,所以不建议使用默认的方式创建newFixedThreadPool线程池,而是使用原生的ThreadPoolExecutor构造方法创建线程池,根据业务需要指定活跃线程数和任务队列长度(不建议使用无界队列)。
newScheduledThreadPool线程池
创建一个在给定延迟之后执行命令或者定时执行命令的调度线程池,{corePoolSize}是保留在线程池中的活跃线程数量,即使是空闲的也不会回收。
newScheduledThreadPool线程池常用使用方式如下:
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleWithFixedDelay(() ->System.out.println("delay 3 seconds"),1,3,TimeUnit.SECONDS);
}
上述代码的作用是延迟1秒执行,并且每3秒执行一次,其实起到一个简易调度的功能。
使用调度线程池需要注意的是,使用Executors创建的调度线程池默认任务队列DelayedWorkQueue是无界队列:
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
所以使用调度线程池的时候也要注意,当并发调度任务比较大的时候,也可能出现om错误。改造测试代码:
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
for(int i = 0;i < 10000;i ++) {
scheduledThreadPool.scheduleWithFixedDelay(() ->System.out.println("delay 3 seconds"),1,3,TimeUnit.SECONDS);
}
}
运行程序也出现了内存溢出:
这种场景我们遇到的比较少,一般不需要关心(调度大多由调度中间件完成),如果真的遇到比较特殊的场景,也强烈建议使用ThreadPoolExecutor原生的构造方法创建线程池,指定队列类型和长度。
newSingleThreadExecutor线程池
创建一个线程池(执行器),它使用一个从无界队列中操作的单个工作线程。(但是,需要注意的是,如果该单个线程在关闭之前由于执行过程中的失败而终止,那么如果需要执行后续任务,将替换一个新的线程。)任务被保证顺序执行,并且在任何给定时间都不会有多于一个任务是活动的。与其他等价的{@code newFixedThreadPool(1)}不同,返回的执行器保证不会重新配置来使用其他线程。
newSingleThreadExecutor线程池的使用方式如下:
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
final int index = i;
singleThreadExecutor.execute(() -> {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}});
}
}
依次输出结果,也就是顺序的执行每个任务。像前边几种线程池一样,newSingleThreadExecutor线程池默认也是使用无界队列存储任务,源码如下:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
当突发请求量比较大的时候,任务队列过大也会导致om错误。改造测试代码:
public static void main(String[] args) {
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100000; i++) {
final int index = i;
singleThreadExecutor.execute(() -> {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}});
}
}
运行测试代码,发现报了内存溢出:
单线程线程池的应用场景一般是需要顺序处理的业务,并发量不会太多,也不会有人创建单线程的线程池来处理高并发请求,但是使用的时候也需要注意。
从以上四种线程池的使用以及原理分析可以总结出比较重要的两点:
1) 慎用缓存线程池,如果并发量大会使线程创建过多导致jvm内存溢出。
2) 慎用无界队列,使用除缓存线程池之外的其他三种线程池,如果并发量比较大,都会遇到任务大量堆积到队列中导致om错误。
线程池原理分析
从上边四种线程池的使用和存在的问题分析,想必我们对jdk自带线程池有了比较深刻的理解,接着我们对线程池的实现原理做一下剖析。以newFixedThreadPool线程池为例,参照源码分析一下其实现原理。
newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newFixedThreadPool方法调用了5个参数的构造器,代码中有几个比较重要的参数:
1)corePoolSize: 是核心线程数,此处传入nThreads(线程池一直持有的线程数量,就算线程空闲)
2)maximumPoolSize: 是线程池中拥有的最大线程数量,此处传入 nThreads,核心线程和最大线程一样(避免了不停销毁线程和新建线程带来的开销)
3)keepAliveTime:当线程池中线程数量大于corePoolSize时,大于核心线程数量部分线程空闲时存活时间,此处传入0
4)unit:存活时间的单位,此处传入TimeUnit.MILLISECONDS毫秒
5) workQueue:任务队列,如果新提交的任务无法立即被线程处理,那么将会放入任务队列,此处传入LinkedBlockingQueue无界队列
接着看源码,又调用了7个参数的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
该构造方法比前边多了两个参数:
6) threadFactory:创建线程使用的线程工厂,此处使用的是默认线程工厂Executors.defaultThreadFactory()
7)handler:当线程池无法处理新提交的任务时的拒绝策略,此处使用默认的AbortPolicy
newFixedThreadPool线程池处理请求时的模型大致如下图:
当线程池接收到新的任务的时候处理流程如下:
核心思想就是,线程池新接收任务后,如果当前线程池的线程数量小于核心线程池数量,无论如何都会创建一个线程去处理任务,如果线程池中的线程到达核心线程数量但是有空闲,那么就把任务提交到任务队列,如果线程数到达核心线程数量,并且没有空闲,就尝试新建一个线程处理任务,如果当前线程数量到达最大线程数量,但是有线程空闲,那就提交到任务队列,如果线程数量达到最大线程数,并且任务队列已满,那么就使用拒绝策略拒绝任务。
本文分享自 PersistentCoder 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!