前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java的ThreadPoolExecutor

Java的ThreadPoolExecutor

作者头像
用户3467126
发布2019-08-09 14:08:59
6310
发布2019-08-09 14:08:59
举报
文章被收录于专栏:爱编码

前言

Java多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程。在底层,操作系统内核将这些线程映射到硬件处理器上。

下面从以下几个方面学习一下线程池

代码语言:javascript
复制
1)Executors的创建
2)ThreadPoolExecutor的使用
3)FixedThreadPool固定线程数的线程池
4)SingleThreadExecutor单个worker线程池
5)CachedThreadPool
6)线程优雅地停止

整个线程池框架的类继承图如下

Executors是线程池框架提供给我们的创建线程池的工具类,它里面提供了以下创建几类线程池的方法。

代码语言:javascript
复制
    // 创建固定线程数量的线程池

    public static ExecutorService newFixedThreadPool();

    // 创建单个线程的线程池(本质上就是容量为1的FixedThreadPool)

    public static ExecutorService newSingleThreadExecutor();

    // 创建无数量限制可自动增减线程的线程池

    public static ExecutorService newCachedThreadPool();


    // 创建(可计划的)任务延时执行线程池

    public static ScheduledExecutorService newScheduledThreadPool();

    // 单线程版的任务计划执行的线程池

    public static ScheduledExecutorService newSingleThreadScheduledExecutor();

通过查看这几个方法的源码发现:前三个方法new了ThreadPoolExecutor对象,而后面两个方法new了ScheduledThreadPoolExecutor对象。ScheduledThreadPoolExecutor下次再讲。

ThreadPoolExecutor使用

Executor框架最核心的类就是ThreadPoolExecutor,它是线程池的实现类。

代码语言:javascript
复制
    public class ThreadPoolExecutor extends AbstractExecutorService {


        public ThreadPoolExecutor(int corePoolSize,//核心池的大小。

                                  int maximumPoolSize,//线程池允许的最大线程数。

                                  long keepAliveTime,//表示线程没有任务执行时最多保持多久时间会终止。

                                  TimeUnit unit,//参数keepAliveTime的时间单位

                                  BlockingQueue<Runnable> workQueue,//一个阻塞队列(BlockingQueue接口的实现类)

                                  ThreadFactory threadFactory,//线程工厂,主要用来创建线程;

                                  RejectedExecutionHandler handler//表示当拒绝处理任务时的策略) {

            //代码省略

        }

        ...

    }

构造器中各个参数的含义:

1、corePoolSize:核心池的大小。

核心池中的线程会一致保存在线程池中(即使线程空闲),除非调用allowCoreThreadTimeOut方法允许核心线程在空闲后一定时间内销毁,该时间由构造方法中的keepAliveTime和unit参数指定;

在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这两个方法的名字就可以看出,是“预创建线程”的意思,即在没有任务到来之前就创建corePoolSize个线程(prestartAllCoreThreads)或者一个线程(prestartCoreThread);

2、maximumPoolSize:线程池允许的最大线程数。

这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。

默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把新加入的任务放到缓存队列当中,缓存队列由构造方法中的workQueue参数指定,如果入队失败(队列已满)则尝试创建临时线程,但临时线程和核心线程的总数不能超过maximumPoolSize,当线程总数达到maximumPoolSize后会拒绝新任务;所以有两种方式可以让任务绝不被拒绝:

① 将maximumPoolSize设置为Integer.MAX_VALUE(线程数不可能达到这个值),CachedThreadPool就是这么做的;

使用无限容量的阻塞队列(比如LinkedBlockingQueue),所有处理不过来的任务全部排队去,FixedThreadPool就是这么做的。

3、keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。

默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用——当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会被销毁,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(true)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

4、unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

代码语言:javascript
复制
    TimeUnit.DAYS;              //天

    TimeUnit.HOURS;             //小时

    TimeUnit.MINUTES;           //分钟

    TimeUnit.SECONDS;           //秒

    TimeUnit.MILLISECONDS;      //毫秒

    TimeUnit.MICROSECONDS;      //微妙

    TimeUnit.NANOSECONDS;       //纳秒

并发库中所有时间表示方法都是以TimeUnit枚举类作为单位

5、workQueue:一个阻塞队列(BlockingQueue接口的实现类),用来存储等待执行的任务,一般来说,这里的阻塞队列有以下几种选择:

代码语言:javascript
复制
    ArrayBlockingQueue    // 数组实现的阻塞队列,数组不支持自动扩容。所以当阻塞队列已满

                          // 线程池会根据handler参数中指定的拒绝任务的策略决定如何处理后面加入的任务


    LinkedBlockingQueue   // 链表实现的阻塞队列,默认容量Integer.MAX_VALUE(不限容),

                          // 当然也可以通过构造方法限制容量


    SynchronousQueue      // 零容量的同步阻塞队列,添加任务直到有线程接受该任务才返回

                          // 用于实现生产者与消费者的同步,所以被叫做同步队列


    PriorityBlockingQueue // 二叉堆实现的优先级阻塞队列


    DelayQueue          // 延时阻塞队列,该队列中的元素需要实现Delayed接口

                        // 底层使用PriorityQueue的二叉堆对Delayed元素排序

                        // ScheduledThreadPoolExecutor底层就用了DelayQueue的变体"DelayWorkQueue"

                        // 队列中所有的任务都会封装成ScheduledFutureTask对象(该类已实现Delayed接口)

6、threadFactory:线程工厂,主要用来创建线程;默认情况都会使用Executors工具类中定义的默认工厂类DefaultThreadFactory。可以实现ThreadFactory接口来自己控制创建线程池的过程(比如设置创建线程的名字、优先级或者是否为Deamon守护线程)

7、handler:表示当拒绝处理任务时的策略,有以下四种取值(默认为AbortPolicy):

ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

可通过实现RejectedExecutionHandler接口来自定义任务拒绝后的处理策略。

以上参数解析,除此之外Java提供了3种常用的ThreadPoolExecutor:

1、FixedThreadPool

2、SingleThreadExecutor

3、CacheThreadPool

FixedThreadPool被称为可重用固定线程数的线程池。

FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。

代码语言:javascript
复制
        public static ExecutorService newFixedThreadPool(int nThreads) {

            return new ThreadPoolExecutor(nThreads, nThreads,

                                          0L, TimeUnit.MILLISECONDS,

                                          new LinkedBlockingQueue<Runnable>());

        }

当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。

FixedThreadPool的execute()方法运行如下图:

说明:1)如果当前运行的线程数小于corePoolSize,则创建新线程来执行任务。2)在线程池完成预热之后(当前运行的线程等于corePoolSize),将任务加入LinkedBlockingQueue。3)线程执行完1)中的任务后,会循环中反复从LinkedBlockingQueue获取任务来执行。

SingleThreadExecutor是单个worker线程的Executor

SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,其他参数与FixedThreadPool相同。

代码语言:javascript
复制
        public static ExecutorService newSingleThreadExecutor() {

            return new FinalizableDelegatedExecutorService

                (new ThreadPoolExecutor(1, 1,

                                        0L, TimeUnit.MILLISECONDS,

                                        new LinkedBlockingQueue<Runnable>()));

        }

SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor使用无界队列作为工作队列对线程池带来的影响与FixedThreadPool相同。

说明:1)如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。2)在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入LinkedBlockingQueue。3)线程执行完1中的任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池。

代码语言:javascript
复制
          public static ExecutorService newCachedThreadPool() {

            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                          60L, TimeUnit.SECONDS,

                                          new SynchronousQueue<Runnable>());

        }

CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着CachedThreadPool中空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

FixedThreadPool和SingleThreadExecutor使用无界队列LinkBlockingQueue作为线程池的工作队列。CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这就意味着如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

线程的停止

1 单线程停止

Java虚拟机会先将该线程的中断标识位清除,然后抛出InterruptedException,因为在发生InterruptedException异常的时候,会清除中断标记。如果不加处理,那么下一次循环开始的时候,就无法捕获这个异常。故在异常处理中,再次设置中断标记位

代码语言:javascript
复制
    public class ThreadStopSafeInterrupted {

        public static void main(String[] args) throws InterruptedException {

            Thread thread = new Thread() {

                @Override

                public void run() {

                    while (true) {

                        // 使用中断机制,来终止线程

                        if (Thread.currentThread().isInterrupted()) {

                            System.out.println("Interrupted ...");

                            break;

                        }


                        try {

                            Thread.sleep(3000);

                        } catch (InterruptedException e) {

                            System.out.println("Interrupted When Sleep ...");

                            // Thread.sleep()方法由于中断抛出异常。

                            // Java虚拟机会先将该线程的中断标识位清除,然后抛出InterruptedException,

                            // 因为在发生InterruptedException异常的时候,会清除中断标记

                            // 如果不加处理,那么下一次循环开始的时候,就无法捕获这个异常。

                            // 故在异常处理中,再次设置中断标记位

                            Thread.currentThread().interrupt();

                        }


                    }

                }

            };


            // 开启线程

            thread.start();

            Thread.sleep(2000);

            thread.interrupt();//主线程发起中断


        }


    }
2 线程池停止
代码语言:javascript
复制
       /**

         * 停止线程池中的所有线程

         */

        private void stopDownloadThreadTask() {

            try {

                this.fixedThreadPool.shutdown();//尝试停止所有线程

                if(!this.fixedThreadPool.awaitTermination(5 * 1000, TimeUnit.MILLISECONDS)){

                    this.fixedThreadPool.shutdownNow();//规定时间内还未停止,再次请求停止

                }

            } catch (InterruptedException e) {

                logger.error("awaitTermination interrupted: " + e);

                this.fixedThreadPool.shutdownNow();//停不了就再停止一次。

            }


        }

参考文章

《Java并发编程艺术》

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-08-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 爱编码 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • ThreadPoolExecutor使用
    • FixedThreadPool被称为可重用固定线程数的线程池。
      • SingleThreadExecutor是单个worker线程的Executor
        • CachedThreadPool
        • 线程的停止
          • 1 单线程停止
            • 2 线程池停止
            • 参考文章
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档