首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程池

线程池

作者头像
胖虎
发布2020-12-08 14:48:26
4760
发布2020-12-08 14:48:26
举报
文章被收录于专栏:晏霖晏霖

2.8 线程池

在实际开发中,我们的项目里是杜绝在某些业务中直接继承Thread类或者实现Runnalbe接口等方式创建线程的,因为这样创建的每一个线程都会经历创建、运行直至销毁,缺乏统一管理,并且这样会导致无限制创建新线程,线程互相竞争,严重时会占用过多的系统资源或内存溢出(OOM)。在JDK1.5推出的java.util.concurrent(简称JUC)并发工具包中又一并发利器就是线程池,需要做异步或并发执行任务都可以使用线程池。使用线程池可以带来以下好处。

l 重用存在的线程,减少对象新建、消亡的开销。

l 线程总数可控,提高资源的利用率。

l 避免过多资源竞争,避免阻塞。

l 提供额外功能,定时执行、定期执行,监控等。

2.8.1线程池的原理

Java中的线程池顶层接⼝是 Executor接⼝, ThreadPoolExecutor 是这个接⼝的实现类。首先我们进入源码,从前向后阅读我们发现,该源码在前面声明来很多参数和属性,我们贴在来下面,并已写好注释,如代码2-33所示。

代码清单2-33 ThreadPoolExecutor.java

public class ThreadPoolExecutor extends AbstractExecutorService {
    // 线程池的控制状态(用来表示线程池的运行状态(整形的高3位)和运行的worker数量(低29位))
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 29位的偏移量
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大容量(2^29 - 1)
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    // 线程运行状态,总共有5个状态,需要3位来表示(所以偏移量的29 = 32 - 3)
  /**
    * RUNNING    :    接受新任务并且处理已经进入阻塞队列的任务
    * SHUTDOWN    :    不接受新任务,但是处理已经进入阻塞队列的任务
    * STOP        :    不接受新任务,不处理已经进入阻塞队列的任务并且中断正在运行的任务
    * TIDYING    :    所有的任务都已经终止,workerCount为0, 线程转化为TIDYING状态并且调用terminated钩子函数
    * TERMINATED:    terminated钩子函数已经运行完成
    **/
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    // 可重入锁
    private final ReentrantLock mainLock = new ReentrantLock();
    // 存放工作线程集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    // 终止条件
    private final Condition termination = mainLock.newCondition();
    // 最大线程池容量
    private int largestPoolSize;
    // 已完成任务数量
    private long completedTaskCount;
    // 线程工厂
    private volatile ThreadFactory threadFactory;
    // 拒绝执行处理器
    private volatile RejectedExecutionHandler handler;
    // 线程等待运行时间
    private volatile long keepAliveTime;
    // 是否运行核心线程超时
    private volatile boolean allowCoreThreadTimeOut;
    // 核心池的大小
    private volatile int corePoolSize;
    // 最大线程池大小
    private volatile int maximumPoolSize;
    // 默认拒绝执行处理器
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
    //
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");
}  

这些属性希望大家跟着注释的含义,在心里大概有个概念,我们会在线程池的使用中详细讲解。这里我们需要知道AtomicInteger类型的ctl属性,ctl为线程池的控制状态,用来表示线程池的运行状态(整形的高3位)和运行的worker数量(低29位)),其中,线程池的运行状态有如下五种,这个状态是线程池的状态,和线程的状态要分别开,不要混淆。由于有5种状态,最少需要3位表示,所以采用的AtomicInteger的高3位来表示,低29位用来表示worker的数量,即最多表示2^29 - 1。这里提前让大家知道,在使用线程池时可以调用execute()和submit()两个方法,根据这两个方法把任务提交给线程池,任务才会真正执行,也是我们探究线程池原理的入口。execute是ExecutorService接口定义的,submit有三种方法重载都在AbstractExecutorService中定义,都是将要执行的任务包装为FutureTask来提交,使用者可以通过FutureTask来拿到任务的执行状态和执行最终的结果,最终调用的都是execute方法,其实对于线程池来说,它并不关心你是哪种方式提交的,因为任务的状态是由FutureTask自己维护的,对线程池透明。

代码清单2-34 AbstractExecutorService.java

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

这是AbstractExecutorService部分源码,其中newTaskFor有两个重载的方法,根据参数不同实例化不同的FutureTask对象。然而这些都不是重点,最重要的是提交了一个任务到线程池最终都会执行execute()方法,下面我们来看看execute()方法,如代码2-35所示。

代码清单2-35 ThreadPoolExecutor.java

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
//检查当前线程数是否达到核心线程数的限制
if (workerCountOf(c) < corePoolSize) {
//如果未达到核心线程数,会将任务添加到线程池
//提交任务后,会优先启动核心线程处理
        if (addWorker(command, true))
            return;
//如果添加任务失败,刷新ctl
        c = ctl.get();
    }
//检查线程池是否是运行状态,并且任务成功添加到等待队列,注意offer返回布尔
    if (isRunning(c) && workQueue.offer(command)) {
//任务添加成功,再次获取线程池状态
        int recheck = ctl.get();
//再次检查线程池是否运行,如果不是运行状态,就要把当前任务移除队列,
        if (! isRunning(recheck) && remove(command))
//两者都成功,执行相应拒绝策略
            reject(command);
//否则判断当前线程数量是否为0,
        else if (workerCountOf(recheck) == 0)
//否则如果线程数量为0则添加一个非核心线程,并且不指定首次执行任务
            addWorker(null, false);
    }
//否则如果添加非核心线程,指定首次执行任务,如果添加失败,执行异常策略
    else if (!addWorker(command, false))
        reject(command);
}

到这里并没有结束,我们知道了,提交的任务加入到线程池实际上是加入到Worker中,这个Worker是内部类,可以理解为线程池的工作队列,继承了AQS。下面我们来了解,这个任务是如何添加到Worker中的,这就要看一下addWorker是如何实现的,篇幅原因不贴出代码,读者可以跟着上述源码分析步骤自行阅读。读源码是一个很好的习惯,我们要勇敢的迈出第一步,任何人都不能手把手教大家一行一行读,我们要有自己学习和阅读源码的方式,才能应万变。addWorker首先经过来一些判断使线程数增加,如果首次添加失败还会CAS的方式继续增加,然后再去真正的生成Worker,这个步骤使用ReentrankLock进行同步手段,也就是我们无需担心并发场景下,任务加入线程池会出现线程安全问题,以上步骤都执行完毕,任务才会真正加入到Worker中,在线程池中这个Worker是HashSet维护的。当一个任务成功加入到了Worker中,是如何执行的呢?这就需要我们去看线程池中runWorker(Worker w)方法,这个方法最重要的操作是执行getTake()方法从阻塞队列里去任务,也就是说如果某一个线程试图从这个队列中取数据,而这个队列里没有数据的时候,线程就会进入休眠了,当然这也是可控的操作,取任务是根据timed选择是有时间期限的等待还是无时间期限的等待,这个就要看我们设定线程池的keepAliveTime属性是什么了,线程池中的所有线程都会在keepAliveTime时间超时后还未取到任务而退出。或者线程池已经STOP,那么所有线程都会被中断,然后退出。到这里就分析结束了,虽然用了简单的语言概括了线程池的原理,但是里面的细节还是很多的,读者有必要亲自查看源码,例如线程池的线程是如何复用等等。下面一图可以概括线程池执行流程,如图2-22所示。

图2-22 线程池执行流程

我们来结合图2-22线程池的执行流程来总结一下。

1.线程池刚创建时,里面没有一个线程。任务是作为参数传进来的。不过就算队列里面有任务,线程池也不会马上执行它们。

2.当调用 execute() 或submit()方法提交一个任务时,线程池会做如下判断:

① 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;

② 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;

③ 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;

④ 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常 RejectedExecutionException。

3.当一个线程完成任务时,它会从队列中取下一个任务来执行。

4.当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

2.8.2线程池的使用

在ThreadPoolExecutor中提供了4种构造方法帮助我们创建线程池.

代码清单2-36 ThreadPoolExecutor.java

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) 

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory) 

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler) 

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

这4个构造方法永远不变的是前5个参数,这5个参数很重要,需要大家完全了解他们在线程池表达的含义,剩下2个参数我们在创建线程池很少设置,但是很重要的参数。以下我们介绍这7个参数。

1.int corePoolSize:该线程池中核心线程数最⼤值 核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会⼀直存在于线程池中,而非核心线程如果长时间的闲置,就会被销毁,因为非核心线程是非线程池自己的,是向系统申请借用一下,所以用完要归还资源。

2.int maximumPoolSize:该线程池中线程所能创建的最大值。该值等于核心线程数量+非核心线程数量。

3.long keepAliveTime:非核心线程闲置超时时长。 ⾮核⼼线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程。

4.TimeUnit unit:keepAliveTime的单位。TimeUnit是⼀个枚举类型,包括以下属性:

a. NANOSECONDS:1微毫秒=1微秒/1000

b. MICROSECONDS:1微秒=1毫秒/1000

c. MILLISECONDS:1毫秒=1秒/1000

d. SECONDS:秒

e. MINUTES:分

f. HOURS:小时

g. DAYS:天

5.BlockingQueue workQueue:阻塞队列,在这里称任务队列,维护着等待执行的Runnable任务对象。常用的几个阻塞队列:

a. LinkedBlockingQueue:链式阻塞队列,先进先出原则,由于吞吐量比ArrayBlockingQueue高,所以在线程池作为常用的任务队列,默认大小是 Integer.MAX_VALUE,也可以指定大小。

b. ArrayBlockingQueue:数组阻塞队列,也是一个先进先出原则的队列,底层数据结构是数组,需要指定队列的大小。

c. SynchronousQueue:同步队列,内部容量为0,每个put操作必须等待⼀个take操作,否则将处于阻塞,吞吐量高于LinkedBlockingQueue。

d. PriorityBlockingQueue:具有优先级的无限阻塞队列。

6.ThreadFactory threadFactory:用于创建线程的工厂,一般使用默认的。

7.RejectedExecutionHandler handler:拒绝策略,也成饱和策略、失败策略。当线程数量大于最大线程数就会采用拒绝处理策略,默认AbortPolicy:默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。当然也可以指定策略,有以下几种可以选择:

a. AbortPolicy:默认拒绝处理策略,抛出异常。

b. DiscardPolicy:丢弃新来的任务,但是不抛出异常。

c. DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,也就是即将被执行的,然后重新尝试执行程序(如果再次失败,重复此过程)。

d. CallerRunsPolicy:由调用者的线程处理该任务。

以上把ThreadPoolExecutor提供的四种构造方法分析后大家应该对怎么样使用线程池有了一定了解,也会觉得创建一个线程池要设置这么多参数,真的很麻烦,所以JDK为我们提供了一个非常好用的线程池工厂类Executors,可用于大多数场景下的线程池,这个工厂提供的方法帮我们设定一些初始值,然后实例化一个ThreadPoolExecutor对象,最终线程池还是靠ThreadPoolExecutor去完成任务的,如果对上面参数有足够了解,那么在看这个工厂类就会很容易看懂。

1.newFixedThreadPool:固定大小线程池。需要传入一个线程数量的参数nThreads,线程池的核心线程数和最大线程数相等,都是nThreads, 而它的等待队列是一个LinkedBlockingQueue,它的容量限制是Integer.MAX_VALUE, 可以认为是没有边界的。核心线程keepAlive时间0,allowCoreThreadTimeOut默认false。所以这个方法创建的线程池适合能估算出需要多少核心线程数量的场景。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

2.newSingleThreadExecutor:单线程。仅有一个核心线程数,即corePoolSize == maximumPoolSize=1。适用在顺序执行场景下,因为同一时间只有一个线程活动。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>(),
                                threadFactory));
}
3.newCachedThreadPool:无界线程池,可以进行自动线程回收,也叫缓存线程池。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
4.newScheduledThreadPool:调度线程池。以延迟或定时的方式执行任务。类似于定时器,实际开发如果涉及到定时操作也会选择其他更为成熟的框架或中间件。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
        int corePoolSize, ThreadFactory threadFactory) {
    return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}

以上就是线程池的使用,我们可以用最方便的方式Executors提供的工厂方法创建线程池,也可以通过ThreadPoolExecutor类的构造方法创建线程池,这种方式也是《阿里巴巴开发手册》建议的方式,通过构造方法创建线程池可以使得每个参数都可自定义,对线程池和资源都是可控的。下面来了解一下线程池的参数应该如何配置才是较优的。

想要合理设置线程池的参数,就要分析任务的特性,可以从以下角度。

l 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。

l 任务的优先级:高,中和低。

l 任务的执行时间:长,中和短。

l 任务的依赖性:是否依赖其他系统资源,如数据库连接。

如果是CPU密集型任务,就需要尽量压榨CPU,线程数量可以设为 NCPU+1。

如果是IO密集型任务,线程数量可以设置为2*NCPU。

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

如果是有优先级的任务,可以用PriorityBlockingQueue队列来处理,如果任务较多时,可能会出现低优先级任务永远不会执行。

有任务长短时间的区分可以让短任务先执行,可以避免任务积压。

如果有依赖数据库数据的任务可以使线程数设置大一些。

如果可以手动选择任务队列的话,最好选择有界队列,同样是避免任务过多造成内存资源占用过大。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 晏霖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 TDMQ
消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档