专栏首页IT笔记分享一篇搞懂线程池

一篇搞懂线程池

在上一篇文章《spring boot使用@Async异步任务》中我们了解了使用@Async的异步任务使用,在这篇文章中我们将学习使用线程池来创建异步任务的线程。

在《阿里巴巴Java开发手册中》对线程使用有如下要求:

接下来就让我们就好好了解一下线程池。

线程池简单介绍

在Java5中引入Executor框架。

ThreadPoolExecutor线程池解析

其类关系图如下:

下图是ThreadPoolExecutor的构造方法:

我们这里看构造参数最多的也是最全的方法:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                              long keepAliveTime, TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 || maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize || keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ? null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  1. corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程。
  2. maximumPoolSize(线程池最大大小):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。
  3. keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以如果任务很多,并且每个任务执行的时间比较短,可以调大这个时间,提高线程的利用率。
  4. TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  5. workQueue (任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。
    1. ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
  1. LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列
  2. SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  3. PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
  4. ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程做些更有意义的事情,比如设置daemon和优先级等等
  5. RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。以下是JDK1.5提供的四种策略。
    1. AbortPolicy:直接抛出异常。
  1. CallerRunsPolicy:只用调用者所在线程来运行任务。
  2. DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
  3. DiscardPolicy:不处理,丢弃掉。
  4. 也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化不能处理的任务。
线程池的工作方式
  1. 如果运行的线程少于 corePoolSize,则 Executor 始终创建新的线程,而不添加到queue中。
  2. 如果运行的线程等于或多于 corePoolSize,则 Executor 始终将请求加入队列,而不创建新的线程。
  3. 如果无法将请求加入队列(队列已满),则创建新的线程,除非创建此线程超出 maximumPoolSize,如果超过,在这种情况下,新的任务将被拒绝。

Executors提供的线程池方法

  1. newFixedThreadPool固定线程数的线程。
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

从上面源代码可以看出新创建的newFixedThreadPool的corePoolSize和maximumPoolSize都被设置为nThreads。

说明:

  1. 如果当前运行的线程数小于corePoolSize,则创建新的线程来执行任务;
  2. 当前运行的线程数等于corePoolSize后,将任务加入LinkedBlockingQueue;
  3. 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue中获取任务来执行。

FixedThreadPool使用无界队列 LinkedBlockingQueue(队列的容量为Intger.MAX_VALUE)作为线程池的工作队列会对线程池带来如下影响:

  1. 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize;
  2. 使用无界队列时maximumPoolSize和keepAliveTime将是无效参数;
  3. 运行中的newFixedThreadPool(未执行shutdown()或shutdownNow()方法)不会拒绝任务。
  4. newSingleThreadExecutor单个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

从上面源代码可以看出新创建的SingleThreadExecutor的corePoolSize和maximumPoolSize都被设置为1.其他参数和newFixedThreadPool相同。也是用的是无界队列存放。

说明:

  1. 如果当前运行的线程数少于corePoolSize,则创建一个新的线程执行任务;
  2. 当前线程池中有一个运行的线程后,将任务加入LinkedBlockingQueue
  3. 线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue中获取任务来执行。
  4. newCachedThreadPool, 先看一下源码:
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

newCachedThreadPool的corePoolSize是0,maximumPoolSize被设置为Integer.MAX.VALUE。

说明:

  1. SynchronousQueue是无界的,在某次添加元素后必须等待其他线程取走后才能继续添加。
  2. 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出maximumPoolSize,在这种情况下,任务将被拒绝。
  3. ScheduledExecutorService,此线程池支持定时以及周期性执行任务的需求。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

我们再看一下ScheduledThreadPoolExecutor中的源码:

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }

调用schedule方法,delay是延迟多少时间执行。

以上就是Executors提供的几种常见的线程池的解析。

ThreadPoolTaskExecutor

下面我们来看一下spring为我们提供的线程池ThreadPoolTaskExecutor。下图是其类关系图:

private final Object poolSizeMonitor = new Object();

    private int corePoolSize = 1;

    private int maxPoolSize = Integer.MAX_VALUE;

    private int keepAliveSeconds = 60;

    private int queueCapacity = Integer.MAX_VALUE;

    private boolean allowCoreThreadTimeOut = false;

    @Nullable
    private TaskDecorator taskDecorator;

    @Nullable
    private ThreadPoolExecutor threadPoolExecutor;

这是ThreadPoolTaskExecutor类中的属性,可以看出依然需要ThreadPoolExecutor类来支持。默认使用无界队列。

在初始化方法中调用了ThreadPoolExecutor的构造器:

@Override
    protected ExecutorService initializeExecutor(
            ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {

        BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);

        ThreadPoolExecutor executor;
        if (this.taskDecorator != null) {
            executor = new ThreadPoolExecutor(
                    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                    queue, threadFactory, rejectedExecutionHandler) {
                @Override
                public void execute(Runnable command) {
                    Runnable decorated = taskDecorator.decorate(command);
                    if (decorated != command) {
                        decoratedTaskMap.put(decorated, command);
                    }
                    super.execute(decorated);
                }
            };
        }
        else {
            executor = new ThreadPoolExecutor(
                    this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
                    queue, threadFactory, rejectedExecutionHandler);

        }

        if (this.allowCoreThreadTimeOut) {
            executor.allowCoreThreadTimeOut(true);
        }

        this.threadPoolExecutor = executor;
        return executor;
    }

说明:

  1. spring 配置的线城池(threadPoolTaskExecutor)由于是spring创建注入的,在首次使用之后,会一直保持corePoolSize个空闲线程,它只会把多余的空闲线程在keepAliveSeconds 时间之后释放,而且线城池不能调用shutdown()方法,否则再次调用,由于线程池已经关闭,会报错。
  2. threadPoolTaskExecutor也可以在配置文件配置多个线城池,防止多有任务之间竞争,或者由于不同任务使用的线城池大小不同等情况。

总结

以上简单的介绍了java自带的四种线程池和spring提供的线程池,他们各有利弊,实际项目中可以根据需求选择。

本文分享自微信公众号 - IT笔记分享(xiaosen_javashare),作者:xiaosen L

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-02-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java多线程学习(二)——Thread类的方法使用

    停止线程意味着在线程处理完任务之前停掉正在做的操作,也就是放弃当前操作。有以下三种方法终止正在运行中的线程:

    小森啦啦啦
  • Java多线程学习(五)——等待通知机制

    方法wait()的作用是使当前线程进行等待,wait()方法是Object类的方法,该方法用来将当前线程放到“预执行队列”,并在wait()所在的代码处停止执行...

    小森啦啦啦
  • Java多线程学习(三)——synchronized(上)

    在前两节的《Java多线程学习(一)——多线程基础》和《Java多线程学习(二)——Thread类的方法介绍》中我们接触了线程安全和非线程安全的概念,这节就来学...

    小森啦啦啦
  • 初学者第66节生产者消费者(八)

    上一节讲解了生产者与消费者模式的基本理论以及简单实现,并且遗留下来一个消费商品是null,库存为:-1的问题 ,看下代码。

    用户5224393
  • 多线程协作打印ABC之ReentrantLock版本

    我们介绍了在Java里面使用synchronized + wait/notifyAll实现的多线程轮流打印特定的字符串,输出的结果如下:

    我是攻城师
  • Java并发编程实战系列11之性能与可伸缩性Performance and Scalability

    线程可以充分发挥系统的处理能力,提高资源利用率。同时现有的线程可以提升系统响应性。 但是在安全性与极限性能上,我们首先需要保证的是安全性。 11.1 对性能的...

    JavaEdge
  • [Java] CountDownLatch 与 CyclicBarrier

    A synchronization aid that allows one or more threads to wait until a set of ope...

    wOw
  • 打通 Java 任督二脉 —— 并发数据结构的基石

    每一个 Java 的高级程序员在体验过多线程程序开发之后,都需要问自己一个问题,Java 内置的锁是如何实现的?最常用的最简单的锁要数 ReentrantLoc...

    老钱
  • 【转】Java并发的AQS原理详解

    每一个 Java 的高级程序员在体验过多线程程序开发之后,都需要问自己一个问题,Java 内置的锁是如何实现的?最常用的最简单的锁要数 ReentrantL...

    一枝花算不算浪漫
  • IOCP异步优化

    2. IO操作: CPU会把内存中的程序委托给其他的网络、磁盘等驱动程序,让这些外部的驱动程序来进行具体的处理,处理完成以后再返回给内存程序。对于这两类操作的优...

    小蜜蜂

扫码关注云+社区

领取腾讯云代金券