谈谈线程池:ThreadPoolExecutor

蹲厕所的熊 转载请注明原创出处,谢谢!

1、线程池介绍

在web开发中,服务器需要接受并处理请求,所以会为一个请求来分配一个线程来进行处理。如果每次请求都新创建一个线程的话实现起来非常简便,但是存在一个问题:

如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁线程,如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多。

所以线程池就出现了。线程池为线程生命周期的开销和资源不足问题提供了解决方案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

使用线程池的好处:

降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

Java中的线程池是用ThreadPoolExecutor类来实现的. 本文就结合JDK 1.8的源码来分析一下这个类内部对于线程的创建, 管理以及后台任务的调度等方面的执行原理。

2、继承关系

我们首先来看一下线程池的类图

Executor接口

ExecutorService接口

AbstractExecutorService接口

3、ThreadPoolExecutor分析

想要深入理解ThreadPoolExecutor,就要先理解其中最重要的几个参数:

核心变量与方法(状态转换)

可能很多人看到上面的写法都蒙圈了。我其实基础也不太好,所以我看到这里的时候索性写了个工具类去测试他们的输出结果,如下:

输出结果为:

通过上面的注释以及测试用例可以发现,源码的作者巧妙的运用一个值代表了2种意思(前3bit位是状态,后29bit是工作数),下面我们来看看线程池最重要的5种状态:

RUNNING:能接受新提交的任务,并且也能处理阻塞队列中的任务;

SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);

STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;

TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。

TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。

下图为线程池的状态转换过程

构造方法

对于参数handler:线程池提供了4种策略:

AbortPolicy:直接抛出异常,这是默认策略。

CallerRunsPolicy:用调用者所在的线程来执行任务。

DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务。

DiscardPolicy:直接丢弃任务。

核心方法

execute方法

线程池最核心的方法莫过于execute了,execute()方法用来提交任务,下面我们顺着这个方法看看其实现原理:

在执行execute()方法时如果状态一直是RUNNING时,的执行过程如下:

如果workerCount

如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。

如果workerCount >= corePoolSize && workerCount

如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

addWorker方法

addWorker方法的主要工作是在线程池中创建一个新的线程并执行,firstTask参数 用于指定新增的线程执行的第一个任务。core为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize,代码如下:

注意一下这里的t.start()这个语句,启动时会调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。

Worker类

线程池中的每一个线程被封装成一个Worker对象,而Worker对象继承自AQS,自己实现了锁定的逻辑(AQS相关的内容本文不讲),ThreadPool维护的其实就是一组Worker对象。看一下Worker的定义:

Worker继承了AQS,使用AQS来实现独占锁的功能。为什么不使用ReentrantLock来实现呢?可以看到tryAcquire方法,它是不允许重入的,而ReentrantLock是允许重入的:

lock方法一旦获取了独占锁,表示当前线程正在执行任务中。

如果正在执行任务,则不应该中断线程。

如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。

线程池在执行shutdown方法或tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否是空闲状态。

之所以设置为不可重入,是因为我们不希望任务在调用像setCorePoolSize这样的线程池控制方法时重新获取锁。如果使用ReentrantLock,它是可重入的,这样如果在任务中调用了如setCorePoolSize这类线程池控制的方法,会中断正在运行的线程。

所以,Worker用于判断线程是否空闲以及是否可以被中断。

runWorker方法

在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:

getTask重要的地方是第二个if判断,目的是控制线程池的有效线程数量。由上文中的分析可以知道,在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加工作线程,但这时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。

processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入processWorkerExit方法,整个线程结束。

下面是从execute到线程销毁的整个流程图

其他外部调用方法

下面的方法都是用户可以自己进行调用的:

内部方法以及空方法

下面的方法都是用户自己调用不了的方法,这里也做一下说明:

4、线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

getTaskCount:线程池已经执行的和未执行的任务总数。

getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount。

getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize。

getPoolSize:线程池当前的线程数量。

getActiveCount:当前线程池中正在执行任务的线程数量。

通过这些方法,可以对线程池进行监控,在ThreadPoolExecutor类中提供了几个空方法,如beforeExecute方法,afterExecute方法和terminated方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自ThreadPoolExecutor来进行扩展。

5、合理的配置线程池

要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

任务的性质:CPU密集型任务,IO密集型任务和混合型任务。

任务的优先级:高,中和低。

任务的执行时间:长,中和短。

任务的依赖性:是否依赖其他系统资源,如数据库连接。

任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过 方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。

我参考了:http://ifeve.com/how-to-calculate-threadpool-size/ 这篇文章里的使用程序评估线程池大小。

6、结论

本文比较详细的分析了线程池的工作流程,总体来说有如下几个内容:

分析了线程的创建,任务的提交,状态的转换以及线程池的关闭。

这里通过execute方法来展开线程池的工作流程,execute方法通过corePoolSize,maximumPoolSize以及阻塞队列的大小来判断决定传入的任务应该被立即执行,还是应该添加到阻塞队列中,还是应该拒绝任务。

介绍了线程池关闭时的过程,也分析了shutdown方法与getTask方法存在竞态条件。

在获取任务时,要通过线程池的状态来判断应该结束工作线程还是阻塞线程等待新的任务,也解释了为什么关闭线程池时要中断工作线程以及为什么每一个worker都需要lock。

在向线程池提交任务时,除了execute方法,还有一个submit方法,submit方法会返回一个Future对象用于获取返回值,有关Future和Callable请自行了解一下相关的文章,这里就不介绍了。

7、扩展

一般开发中core线程数量是很难确定的,可以参考上面提到的如何合理的估算线程池的大小,但是一般都是开发者自己经过压测后得到的数据,之后到真正的线程环境验证,得出一个合理的core数字。假设是5,但是为了预防某些瞬时大流量(我们也无法预知到底流量会有多大),通常会再设置一个比core线程数要大的max线程,假设是10。那么当这种瞬时流量真的发生了,如果希望服务器能尽快的提高处理速度,当然是需要让MAX线程尽快启动起来,帮着处理任务。这时候我们就可以自己扩展线程池,可以参考Tomcat的线程池实现。

  • 发表于:
  • 原文链接http://kuaibao.qq.com/s/20180501G0XJOO00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。

扫码关注云+社区

领取腾讯云代金券