Java并发学习之玩转线程池

线程池的使用姿势

基本上实际的项目不可能离开线程池,只是看你有没有注意到罢了

作为以业务需求为驱动,最顺溜的是写if-else的码农我来说,线程池就比较高端了,真要说有什么地方是需要自己来维护一个线程池,还真没几个...

这个东西,真的是你不去实际的用一下,基本上很难理解这个东西可以怎么玩,可以怎么优雅的去玩,可以怎么装高大上的玩

为了让自己显得稍微有biger一点,本篇博文围绕下面两点进行说明

  • 线程池是什么鬼,为什么用它
  • 线程池可以怎么耍
    • 固定大小的线程池: java.util.concurrent.Executors#newFixedThreadPool(int)
    • 计划线程池: java.util.concurrent.Executors#newScheduledThreadPool(int)
    • 工作窃取线程池: java.util.concurrent.Executors#newWorkStealingPool(int)
    • 优先级线程池: java.util.concurrent.Executors#privilegedThreadFactory
    • 缓存线程池: java.util.concurrent.Executors#newCachedThreadPool()

I. 线程池介绍

本片博文的两个目标之一,就是本节的说清楚什么是线程池,接下来尝试下能否向一个文科的孩子来解释下什么是线程池,以及这个东西有什么用

1. 通俗说明

对比之前,有必要先说明下线程池的几个特点:

  • 重复利用
  • 有过期时间
  • 线程池为空时,提交任务时,新创建线程
  • 线程池满时,提交任务放在等候队列里
  • 等候队列也满了,可以采取扔掉/等待/抛异常等各种处理方式

看到这几点有什么想法?想象力不够,有一个可能不是非常恰当的类比场景

假设你现在有一个工厂,专门给人做衣服,现在你有十个工人,一个工人一个机器,来一个单,就让一个工人来做衣服;

那么这个和线程池有什么关系呢,我们来简单的类比下

  • 每个工人,将衣服做完之后,可以接着做下一单,继续做衣服 ==》 重复利用
  • 工人不可能24小时连轴转吧,得休息去了 ==》 有过期时间
  • 如果十个工人都在干活了,又来了一单,就只能等了 ==》 线程池满,排队
  • 单太多了,再排下去明年都做不完,只能不再接单 ==》 队列也是有长度的(注意线程池的阻塞队列可以不设置长度的)

2. 学术说明

来自百科的解释:

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

II. 玩转线程池

1. 创建线程池

通过ThreadPoolExecutor来创建一个线程池

new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, milliseconds,runnableTaskQueue, threadFactory,handler);

参数说明:

  • corePoolSize 线程池中任务的基本个数
    • 新提交一个任务时,若线程池中个数未达到基本个数,则新建一个线程
    • 到线程池中的线程数达到基本个数时,再提交任务,则看是否有空闲线程,有则只直接使用
    • 若无空闲线程,则新几条的任务放入排队
  • maximumPoolSize 线程chi池中任务的做多个数
    • 当线程池中个数达到 corePoolSize & 且队列排满了
    • 新创建线程来执行任务
    • 当线程池中任务达到maximumPoolSize,则不再创建
  • keepAliveTime 线程池的工作线程空闲后存活的时间
  • milliseconds 配合上个参数使用,表示时间的单位,如TimeUnit.SECONDS
  • runnableTaskQueue 排队队列
    • ArrayBlockingQueue基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序
    • LinkedBlockingQueue 基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue
    • SynchronousQueue 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue
    • PriorityBlockingQueue 一个具有优先级得无限阻塞队列
  • threadFactory 创建线程的工厂,通常会重新指定线程名,方便debug
  • handler 线程池饱和策略
    • 当线程数达到 maximumPoolsize 队列已满时,表示饱和
    • CallerRunsPolicy 只用调用者所在线程来运行任务
    • DiscardOldestPolicy 丢弃队列里最近的一个任务,并执行当前任务
    • DiscardPolicy 不处理,丢弃掉
    • AbortPolicy 抛异常
    • 也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略

线程池提交任务的处理流程

提交任务

1. execute(Runnable)

直接执行一个实现了Runnable的接口,即表示提交了一个异步任务给线程池

2. submit(Runnable)

相比较于上面的,区别是这个会返回一个 Future<V> 对象,通过调用future.get() 可以获取线程的返回值,其中这个方程是线程阻塞的,直到返回了结果之后,才会继续执行下去

关闭线程

线程池的shutdown或shutdownNow方法来关闭线程池

shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程

shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止

调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。

至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow


2. 利用Executors创建线程池

看到上面创建的方式,参数一大堆,有点可怕,然而jdk非常贴心的提供了一种创建方式,借助 java.util.concurrent.Executors

直接翻到源码,看到提供了一些常见线程池的静态方法,直接调用就可以了

// 创建一个固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

// jdk1.8
// 创建工作窃取线程池(fork/join的工作原理)
public static ExecutorService newWorkStealingPool(int parallelism) {
    return new ForkJoinPool
        (parallelism,
         ForkJoinPool.defaultForkJoinWorkerThreadFactory,
         null, true);
}


// 创建单线程池
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}


// 缓存线程池
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}


public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}


// 单定时任务线程池
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1, threadFactory));
}

// 定时任务线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}


public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

// 不可配置线程池 ?
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
    if (executor == null)
        throw new NullPointerException();
    return new DelegatedExecutorService(executor);
}

public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
    if (executor == null)
        throw new NullPointerException();
    return new DelegatedScheduledExecutorService(executor);
}

3. 几种线程池对比说明

上面借助 Executors 可以非常方便的创建线程池,从方法命名也可以看出,上面定义了几种不同类型的线程池,那么这些有什么区别呢?什么场景,选择什么样的线程池呢?

由于这一块技术储备比较少,所以对上面的几种线程池的说明,将借助网上的博文,然后以自己的理解加以说明,如有缺陷,欢迎指出

a. 固定大小的线程池

通过调用 java.util.concurrent.Executors#newFixedThreadPool(int) 可以创建一个固定大小的线程池,使用姿势也很简单 :

// 创建一个大小为2的线程池
ExecutorService executorService = Executors.newFixedThreadPoll(2);

// 上面的这一行,实际上等同于
// 也就是说,创建的线程池中,基本任务个数为2,最多个数也是2
// 线程空余之后,直接干掉;Queue采用的是单链表阻塞队列
ExecutorService executorService = new ThreadPoolExecutor(2, 2,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());

这个定长的意思是什么呢?假设现在线程池是空闲的,提交一个A任务,可以直接执行;然后又提交一个B任务,还没满,依然可以执行;接着又来任务C,这些超过线程池的长度了,怎么办?

这个时候新的任务C,会挂在阻塞队列中,知道线程池中某个任务执行完毕,释放资源之后,任务C才会执行

简单来说,这个固定大小的线程池,就是线程池中的任务恒定为指定个数(即便空闲,也会有这么几个线程在);其他的任务都放在阻塞队列中执行

那么这儿有什么用

固定大小的线程池,保证同一时刻,最多只有n个任务在执行,所以可以有效的控制并发数

java.util.concurrent.Executors#newSingleThreadExecutor() 是一个特殊的固定大小线程池(大小为1)

b. 缓存线程池

主要接口:java.util.concurrent.Executors#newCachedThreadPool()

ExecutorService executorService = Executors.newCachedThreadPool();

// 等同于下面直接创建的方式
// sleepTime 有个60s
ExecutorService executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
          60L, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>());

线程池大小没有限制,一个线程用完之后,有60s再次复活的权利,如果60s内还没有任务来拯救他,那么这个线程就可以领便当了;如果运气好,又来了一个任务,那么就可以直接用这个线程来执行这个任务

所以这个缓存的意思,就是说我这里的线程,能复用的话,尽量复用;然后应用场景也比较明确了,对于容易连续调度的任务来将,就比较合适了

一个case就是异常报警:

实际项目环境中,通常应用报警,这个时候常常是短暂的大量的报警,这个时候就可以考虑用缓存线程池,实现线程的复用

c. 定时(计划)线程池

定时任务线程池: ScheduledExecutorService

这个与上面两个的区别就非常明显了,可以指定某个时间执行,也可以指定按照某个频率执行

这一块可以结合我之前的一篇定时任务的博文: Java并发学习之定时任务的几种玩法

d. 工作窃取线程池

这个就有意思了,Fork/Join的思想,我们都知道就是利用的工作窃取,在jdk1.8里面就提供了这样的一个线程池,简直友好得不像话,关于这一个,有必要单独的说明,这里就不展开,先挖个坑。

e. unconfigurableExecutorService

返回一个将所有已定义的 ExecutorService 方法委托给指定执行程序的对象,但是使用强制转换可能无法访问其他方法。

这个基本没用到过,也不太清楚有嘛用,直接从javadoc里面捞了下注释...


III. 小结

线程池,简单来讲主要是为了实现线程的复用,控制线程任务的调度的一个手段

线程池的创建方式

// 1. 基本创建方式
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
  keepAliveTime, milliseconds,
  runnableTaskQueue, threadFactory,handler);
  
// 2. 借助Executors创建
Executors.newScheduledThreadPool(1);

IV. 其他

声明

尽信书则不如,已上内容,纯属一家之言,因本人能力一般,见识有限,如有问题,请不吝指正,感激

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏行者常至

016.多线程-线程池的四种创建方式

版权声明:本文为博主原创文章,允许转载,请标明出处。 https://blog.csdn.net/qwdafedv/article/deta...

1613
来自专栏JavaEdge

长文慎入-探索Java并发编程与高并发解决方案(更新中)1 基本概念2 CPU3 项目准备4线程安全性5发布对象7 AQS9 线程池10 死锁

4968
来自专栏Linyb极客之路

并发编程之CyclicBarrier

一、CyclicBarrier简介 CyclicBarrier也叫同步屏障,在JDK1.5被引入,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏...

2923
来自专栏移动开发面面观

Android线程池的详细说明(二)

2425
来自专栏向前进

vue-cli脚手架npm相关文件解读(9)config/index.js

系列文章传送门: 1、build/webpack.base.conf.js 2、build/webpack.prod.conf.js 3、build/webp...

3916
来自专栏代码散人

Vapor奇幻之旅(04Routing)

Vapor的Routing提供了RouteBuilder和RouteCollection

802
来自专栏xdecode

Java高并发之线程池详解

例如线程, jdbc连接等等, 在高并发场景中, 如果可以复用之前销毁的对象, 那么系统效率将大大提升.

1522
来自专栏Android相关

Java线程池---getTask方法解析

/** * Performs blocking or timed wait for a task, depending on * current confi...

3252
来自专栏吴伟祥

线程池总结 原

New Thread的弊端如下:        a、每次New Thread新建对象性能差。        b、线程缺乏统一的管理,可能无限制的新建线程,相...

1002
来自专栏calvin

【nodejs】让nodejs像后端mvc框架(asp.net mvc)一orm篇【如EF般丝滑】typeorm介绍(8/8)

在使用nodejs开发过程中,刚好碰到需要做一个小工具,需要用到数据库存储功能。而我又比较懒,一个小功能不想搞一个nodejs项目,又搞一个后端项目。不如直接在...

3742

扫码关注云+社区

领取腾讯云代金券