专栏首页Java进阶架构师【原创】Java并发编程系列33 | 深入理解线程池(上)

【原创】Java并发编程系列33 | 深入理解线程池(上)

2020年Java原创面试题库连载中

【000期】Java最全面试题库思维导图

【001期】JavaSE面试题(一):面向对象

【002期】JavaSE面试题(二):基本数据类型与访问修饰符

【003期】JavaSE面试题(三):JavaSE语法(1)

【004期】JavaSE面试题(四):JavaSE语法(3)

【005期】JavaSE面试题(五):String类

【006期】JavaSE面试题(六):泛型

【007期】JavaSE面试题(七):异常

【008期】JavaSE面试题(八):集合之List

【009期】JavaSE面试题(九):集合之Set

【010期】JavaSE面试题(十):集合之Map

【011期】JavaSE面试题(十一):多线程(1)

【012期】JavaSE面试题(十二):多线程(2)

【013期】JavaSE面试题(十三):多线程(3)

【014期】JavaSE面试题(十四):基本IO流

【015期】JavaSE面试题(十五):网络IO流

【016期】JavaSE面试题(十六):反射

【017期】JavaSE面试题(十七):JVM之内存模型

【018期】JavaSE面试题(十八):JVM之垃圾回收

【020期】JavaSE系列面试题汇总(共18篇)

【019期】JavaWeb面试题(一):JDBC

【021期】JavaWeb面试题(二):HTTP协议

【022期】JavaWeb面试题(三):Cookie和Session

【023期】JavaWeb面试题(四):JSP

【024期】JavaWeb面试题(五):Filter和Listener

【025期】Java工具面试题(一):版本控制工具

【026期】Java工具面试题(二):项目管理工具

【027期】Java设计模式面试题

【028期】JavaWeb系列面试题汇总(共10篇)

【029期】JavaEE面试题(一)Web应用服务器

【030期】JavaEE面试题(二)SpringMVC

【031期】JavaEE面试题(三)Spring(1)

【032期】JavaEE面试题(四)Spring(2)

【033期】JaveEE面试题(五)MyBatis

【034期】JavaEE面试题(六)Hibernate

【035期】JavaEE面试题(七)SpringBoot(1)

更多内容,点击上面蓝字查看

并发编程必不可少的线程池,接下来分两篇文章介绍线程池,本文是第一篇。线程池将介绍如下内容:

  1. 线程池介绍
  2. Executor框架接口
  3. 线程池状态
  4. 线程池参数
  5. 线程池创建
  6. 执行过程
  7. 关闭线程池
  8. 其他问题
    • 任务拒绝策略
    • 线程池中的线程初始化
    • 线程池容量的动态调整
    • 线程池的监控

1. 介绍

1.1 使用场景

并发编程可以高效利用CPU资源,提升任务执行效率,但是多线程及线程间的切换也伴随着资源的消耗。当遇到单个任务处理时间比较短,但需要处理的任务数量很大时,线程会频繁的创建销毁,大量的时间和资源都会浪费在线程的创建和销毁上,效率很低。

这个时候就需要用的线程池了,线程作为一个工作者,线程执行完一个任务之后不销毁,而是继续执行其他的任务。

1.2 好处

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

1.3 一个简单示例

先通过一个简单的示例了解下线程池:

public class Test {
    public static void main(String[] args) {
        // 1. 创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
        
        for (int i = 0; i < 15; i++) {
            // 2. 创建任务
            Runnable task = new Runnable() {
                @Override
                public void run() {
                    System.out.println("执行任务...");
                    try {
                        Thread.currentThread().sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            executor.execute(task);// 3. 任务交给线程池执行
        }
        executor.shutdown();// 4. 关闭线程池
    }
}

2. Executor框架接口

Executor框架提供了一种“任务提交”与“任务如何运行”分离开来的机制,实现对异步任务的控制与执行。我们先大概了解下每个类的基本情况。

2.1 Executor接口

Executor接口只有一个execute方法,用于提交任务。

public interface Executor {
    void execute(Runnable command);
}
// 启动线程执行任务
new Thread(new Runnable() {
 public void run() {
  // TODO Auto-generated method stub
 }
}).start();

// 使用Executor提交任务
Executor executor = newExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());

2.2 ExecutorService接口

ExecutorService接口继承自Executor接口,提供了线程池主要功能,提交任务、异步任务执行、关闭线程池等。

public interface ExecutorService extends Executor {
    // 关闭线程池,已提交的任务继续执行,不接受继续提交新任务
    void shutdown();

    // 关闭线程池,尝试停止正在执行的所有任务,不接受继续提交新任务
    List<Runnable> shutdownNow();

    // 线程池是否已关闭
    boolean isShutdown();

    // 如果调用了 shutdown() 或 shutdownNow() 方法后,所有任务结束了,那么返回true
    boolean isTerminated();

    // 提交一个 Callable 任务
    <T> Future<T> submit(Callable<T> task);

    // 提交一个 Runnable 任务,第二个参数将会放到 Future中,作为返回值,
    <T> Future<T> submit(Runnable task, T result);

    // 提交一个 Runnable 任务
    Future<?> submit(Runnable task);

    // 执行所有任务,返回 Future 类型的一个 list
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
}

2.3 AbstractExecutorService类

AbstractExecutorService实现了ExecutorService接口,并在其基础上实现了几个实用的方法提供给子类进行调用。

public abstract class AbstractExecutorService implements ExecutorService {

    /**
     * newTaskFor 方法用于将我们的任务包装成 FutureTask 提交到线程池中执行
     * RunnableFuture 是用于获取执行结果的,我们常用它的子类 FutureTask
     */
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

    /**
     * 提交任务
     */
    public Future<?> submit(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        // 1. 将任务包装成 FutureTask
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        // 2. 交给执行器执行
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        // 1. 将任务包装成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task, result);
        // 2. 交给执行器执行
        execute(ftask);
        return ftask;
    }

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null)
            throw new NullPointerException();
        // 1. 将任务包装成 FutureTask
        RunnableFuture<T> ftask = newTaskFor(task);
        // 2. 交给执行器执行
        execute(ftask);
        return ftask;
    }
    
    // 将 tasks 集合中的任务提交到线程池执行,任意一个线程执行完后就可以结束了
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

    // 执行所有的任务,返回任务结果。
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {}

}

2.4 ThreadPoolExecutor

ThreadPoolExecutor就是线程池了,继承自AbstractExecutorService。

3. 线程池状态

  • RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
  • SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用- shutdown()方法进入该状态);
  • STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
  • TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
  • TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。进入TERMINATED的条件如下:
    1. 线程池不是RUNNING状态;
    2. 线程池状态不是TIDYING状态或TERMINATED状态;
    3. 如果线程池状态是SHUTDOWN并且workerQueue为空;
    4. workerCount为0;
    5. 设置TIDYING状态成功。

线程状态如何保存呢?

ThreadPoolExecutor采用一个 32 位的整数(int变量ctl)来存放线程池的状态和当前池中的线程数,其中高 3 位用于存放线程池状态,低 29 位表示线程数。

// 高 3 位用于存放线程池状态,低 29 位表示线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

/** 后29位用于存放线程数 */
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000 11111111111111111111111111111
// 最大线程数:这里得到的是 29 个 1,也就是说线程池的最大线程数是 2^29-1=536870911
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

/** 高 3 位表示线程池的状态 */
// 111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;

// 将整数 c 的低 29 位修改为 0,获取线程池的状态
   return c & ~CAPACITY;
}

// 将整数 c 的高 3 为修改为 0,获取线程池中的线程数
private static int workerCountOf(int c) {
 return c & CAPACITY;
}

4. 线程池参数

线程池ThreadPoolExecutor类有四个构造方法,我们通过这个参数最全的构造方法来看下线程池参数:

public ThreadPoolExecutor(int corePoolSize,
        int maximumPoolSize,
        long keepAliveTime,
        TimeUnit unit,
        BlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory,
        RejectedExecutionHandler handler) {
 if (corePoolSize < 0 ||
  maximumPoolSize <= 0 ||
  maximumPoolSize < corePoolSize ||
  keepAliveTime < 0)
  throw new IllegalArgumentException();
 if (workQueue == null || threadFactory == null || handler == null)
  throw new NullPointerException();
 this.corePoolSize = corePoolSize;
 this.maximumPoolSize = maximumPoolSize;
 this.workQueue = workQueue;
 this.keepAliveTime = unit.toNanos(keepAliveTime);
 this.threadFactory = threadFactory;
 this.handler = handler;
}
  • corePoolSize: 核心线程数量;
  • maximumPoolSize: 最大线程数量;
  • workQueue: 等待队列,当线程池中的线程数量大于等于corePoolSize时,把该任务放入等待队列;
  • keepAliveTime: 线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize时,核心线程外的线程空闲时间超过keepAliveTime就会销毁;
  • unit: keepAliveTime的时间单位;
  • threadFactory: 用于创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。Executors.defaultThreadFactory() 创建的线程优先级都是NORM_PRIORITY;
  • handler: RejectedExecutionHandler类型的变量,表示线程池的拒绝策略。当阻塞队列满了并且没有空闲的线程时,如果继续提交任务,就需要采取一种策略处理该任务。线程池提供以下拒绝策略:
    1. AbortPolicy:直接抛出异常,默认策略;
    2. CallerRunsPolicy:用调用者所在的线程来执行任务;
    3. DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4. DiscardPolicy:直接丢弃任务;
    5. 实现自己的拒绝策略,实现RejectedExecutionHandler接口重写rejectedExecution方法即可。

线程池任务提交过程:

任务提交的顺序为 corePoolSize –> workQueue –> maximumPoolSize -> handler。

  1. 如果运行的线程数少于 corePoolSize,则创建新线程来处理任务,即使线程池中的其他线程是空闲的
  2. 如果运行的线程数大于等于 corePoolSize,则将任务放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理;
  3. 当workQueue已经满时,如果运行的线程数小于maximumPoolSize,则创建新的线程去处理提交的任务;
  4. 当workQueue已经满时,如果运行的线程数大于等于maximumPoolSize且没有空闲线程,则通过handler所指定的拒绝策略来处理任务。

线程池中的线程执行完当前任务后,会循环到任务队列中取任务继续执行;线程获取队列中任务时会阻塞,直到获取到任务返回;当线程数大于corePoolSize且线程阻塞时间超时,线程就会被销毁。

5. 线程池创建

介绍四种创建线程池的方式:通过 ThreadPoolExecutor 的方式创建线程池及Executors工具类提供的三种创建方式。

5.1 ThreadPoolExecutor方式

直接调用 ThreadPoolExecutor 的构造方法,自己手动设置每一个参数,这是阿里推荐的方法。

【强制】线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。 ——《阿里巴巴Java开发手册》

5.2 FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
 return new ThreadPoolExecutor(nThreads, nThreads,
          0L, TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<Runnable>());
}
  • corePoolSize 和 maximumPoolSize都设置为指定nThreads,表示核心线程数等于最大线程数,当达到核心线程数且阻塞队列也已经满时,如果继续提交任务,则会直接走拒绝策略。
  • FixedThreadPool使用的是默认的拒绝策略,即AbortPolicy,则直接抛出异常。
  • keepAliveTime 表示线程数量大于corePoolSize时空闲的线程的存活时间,而FixedThreadPool的corePoolSize 和 maximumPoolSize相等,不可能有多余corePoolSize的线程,所以这里的keepAliveTime本来就无效。
  • workQueue使用LinkedBlockingQueue,没有设置范围,默认是最大值(Integer.MAX_VALUE),相当于一个无界队列。当线程池中的线程数量等于corePoolSize 时,如果继续提交任务,该任务会被添加到阻塞队列workQueue中,因为workQueue是无界队列,所以maximumPoolSize和参数都无效。

5.3 SingleThreadExecutor

newSingleThreadExecutor与FixedThreadPool类似,不过是将线程数设置为1。

corePoolSize 和 maximumPoolSize都指定为1,表示该线程池中最多有一个线程,其他同FixedThreadPool。

public static ExecutorService newSingleThreadExecutor() {
 return new FinalizableDelegatedExecutorService
  (new ThreadPoolExecutor(1, 1,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>()));
}

5.4 CachedThreadPool

public static ExecutorService newCachedThreadPool() {
 return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
          60L, TimeUnit.SECONDS,
          new SynchronousQueue<Runnable>());
}
  • CachedThreadPool的corePool为0,maximumPoolSize为Integer.MAX_VALUE,线程池中的所有线程都不是核心线程。
  • keepAliveTime为60L,unit设置为TimeUnit.SECONDS,空闲线程超过60秒后将会被终止。
  • 阻塞队列采用的SynchronousQueue

  • SynchronousQueue 不存储元素,数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。
  • SynchronousQueue 执行put/take操作时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致(如当前操作是 put 操作,而队列中的元素也都是写线程),则将当前线程加入到等待队列。如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然),则匹配等待队列的队头,出队,返回相应数据。

理解CachedThreadPool提交任务的过程:

  1. 第一次提交任务,因为SynchronousQueue需要有读操作与写操作匹配才能写入数据,所以任务不能进入SynchronousQueue队列,而是直接创建一个线程执行任务;
  2. 之后提交任务,如果线程池里因为空闲线程超时被销毁而没有线程,同样不能进入SynchronousQueue队列,需要创建一个线程执行任务;
  3. 之后提交任务,如果线程池里的线程都正在执行任务,同样不能进入SynchronousQueue队列,需要创建一个线程执行任务;
  4. 之后提交任务,如果线程池有线程处于空闲状态(处于空闲状态的线程都会在SynchronousQueue的take()方法上阻塞),那么SynchronousQueue通过offer()方法将任务交给take()执行,不需要创建线程;

CachedThreadPool的问题:如果主线程提交任务的速度远远大于CachedThreadPool的处理速度,则CachedThreadPool会不断地创建新线程来执行任务,这样有可能会导致系统耗尽CPU和内存资源,所以在使用该线程池时,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。

并发系列文章汇总

【原创】01|开篇获奖感言 【原创】02|并发编程三大核心问题 【原创】03|重排序-可见性和有序性问题根源 【原创】04|Java 内存模型详解 【原创】05|深入理解 volatile 【原创】06|你不知道的 final 【原创】07|synchronized 原理 【原创】08|synchronized 锁优化 【原创】09|基础干货 【原创】10|线程状态 【原创】11|线程调度 【原创】12|揭秘 CAS 【原创】13|LockSupport 【原创】14|AQS 源码分析 【原创】15|重入锁 ReentrantLock 【原创】16|公平锁与非公平锁 【原创】17|读写锁八讲(上) 【原创】18|读写锁八讲(下) 【原创】19|JDK8新增锁StampedLock 【原创】20|StampedLock源码解析 【原创】21|Condition-Lock的等待通知 【原创】22|倒计时器CountDownLatch 【原创】22|倒计时器CountDownLatch 【原创】23|循环屏障CyclicBarrier 【原创】24|信号量Semaphore 【原创】25|交换器Exchangere 【原创】26|ConcurrentHashMap(上) 【原创】27|ConcurrentHashMap(下) 【原创】28|Copy-On-Write容器 【原创】29|ConcurrentLinkedQueue 【原创】30 | ThreadLocal 【原创】31 | 阻塞队列(上)

本文分享自微信公众号 - java进阶架构师(java_jiagoushi),作者:何适

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-08-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Java并发编程系列34 | 深入理解线程池(下)

    公众号改版后文章乱序推荐,希望你可以点击上方“Java进阶架构师”,点击右上角,将我们设为★“星标”!这样才不会错过每日进阶架构文章呀。

    java进阶架构师
  • 【原创】Java并发编程系列26 | ConcurrentHashMap(上)

    终于轮到ConcurrentHashMap了,并发编程必备,也是面试必备。先说明两点:

    java进阶架构师
  • 【原创】Java并发编程系列11 | 线程调度

    之前发过,但是因为之前忘记标记原创,没办法收录在【并发编程专题】里面,作为强迫症的我,必须要重发一次。本文为第 11 篇,前面几篇没看过的,可以在文末找到前几篇...

    java进阶架构师
  • 【原创】Java并发编程系列31 | 阻塞队列(上)

    阻塞队列在并发编程非常常用,被广泛使用在“生产者-消费者”问题中。接下来两篇文章就来详细介绍阻塞队列。本文是阻塞队列上篇。

    java进阶架构师
  • 【原创】Java并发编程系列36 | FutureTask

    线程池源码中出现了很多Callable、Future、FutureTask等以前没介绍过的接口,尤其是线程池提交任务时总是把任务封装成FutureTask,今天...

    java进阶架构师
  • 【原创】Java并发编程系列13 | LookSupport

    java.util.concurrent 中源码频繁使用的 LockSupport 来阻塞线程和唤醒线程,如 AQS 的底层实现用到 LockSupport.p...

    java进阶架构师
  • 【原创】Java并发编程系列30 | ThreadLocal

    线程安全问题的核心在于多个线程会对同一个临界区共享资源进行操作,而大多数解决线程安全问题的方法都是通过加锁的方式,让同一时间只有一个线程能过访问到共享资源。这篇...

    java进阶架构师
  • 【原创】Java并发编程系列29 | ConcurrentLinkedQueue

    J.U.C 为常用的集合提供了并发安全的版本,前面讲解了 map 的并发安全集合 ConcurrentHashMap,List 并发安全集合 CopyOnWri...

    java进阶架构师
  • java并发系列第5天-深入理解进程和线程

    进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。程序是指令、数据及其组织形式的描...

    路人甲Java
  • 【原创】Java并发编程系列15 | 重入锁ReentrantLock

    AQS是java.util.concurrent包的核心基础组件,是实现Lock的基础。那么AQS是如何实现Lock的呢?

    java进阶架构师
  • 【原创】Java并发编程系列1:大纲

    随着现今互联网行业的迅猛发展,其业务复杂度、并发量也在不断增加,对程序的要求变得越来越高,传统的线性模型也越来越不适用。 同时,计算机软硬件技术的发展,也为多...

    王金龙
  • 【原创】Java并发编程系列27 | ConcurrentHashMap(下)

    上一篇详细分析了HashMap源码,介绍了HashMap的数据结构以及并发编程中HashMap的问题,这篇就来看下ConcurrentHashMap。因为Con...

    java进阶架构师
  • 由浅入深理解Java四种线程池及Java并发库

    线程,程序执行流的最小执行单位,是行程中的实际运作单位,经常容易和进程这个概念混淆。那么,线程和进程究竟有什么区别呢?首先,进程是一个动态的过程,是一个活动的实...

    葆宁
  • 【原创】Java并发编程系列17 | 读写锁八讲(上)

    通过以下几部分来分析Java提供的读写锁ReentrantReadWriteLock:

    java进阶架构师
  • 【原创】Java并发编程系列12 | 揭秘CAS

    并发编程,为了保证数据的安全,需要满足三个特性:原子性、可见性、有序性。Java 中可以通过锁和 CAS 的方式来实现原子操作。

    java进阶架构师
  • 【原创】Java并发编程系列2:线程概念与基础操作

    本篇为【Dali王的技术博客】Java并发编程系列第二篇,讲讲有关线程的那些事儿。主要内容是如下这些:

    王金龙
  • Java并发编程实战系列8之线程池的使用

    ThreadPoolExecutor UML图: ? image ? image ? 8.1 在任务和执行策略之间隐形耦合 避免Thread starvati...

    JavaEdge
  • 死磕 java线程系列之线程池深入解析——体系结构

    Java的线程池是块硬骨头,对线程池的源码做深入研究不仅能提高对Java整个并发编程的理解,也能提高自己在面试中的表现,增加被录取的可能性。

    彤哥
  • 【原创】Java并发编程系列32 | 阻塞队列(下)

    阻塞队列在并发编程非常常用,被广泛使用在“生产者-消费者”问题中。本文是阻塞队列下篇。

    java进阶架构师

扫码关注云+社区

领取腾讯云代金券