前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ThreadPoolExecutor源码分析(一):重要的成员变量

ThreadPoolExecutor源码分析(一):重要的成员变量

作者头像
100000860378
发布2018-12-26 16:12:20
3820
发布2018-12-26 16:12:20
举报

ThreadPoolExecutor部分重要成员变量: 1、AtomicInteger ctl 2、workQueue 3、corePoolSize 4、maximumPoolSize 5、keepAliveTime 6、handler

一、AtomicInteger ctl
代码语言:javascript
复制
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

AtomicInteger类型的ctl代表了ThreadPoolExecutor中的控制状态,它是一个复核类型的成员变量,是一个原子整数,借助高低位包装了两个概念:

  1. workerCount:线程池中当前活动的线程数量,占据ctl的低29位;
  2. runState:线程池运行状态,占据ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。

workerCount:

COUNT_BITS的定义代表了workerCount所占位数:

代码语言:javascript
复制
private static final int COUNT_BITS = Integer.SIZE - 3;
//在Java中,一个int占据32位,而COUNT_BITS的结果不言而喻,Integer大小32减去3,就是29。

CAPACITY的定义限制了workerCount的理论最大活跃线程数:

代码语言:javascript
复制
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//运算过程为1左移29位,也就是00000000 00000000 00000000 00000001 --> 001 0000 00000000 00000000 00000000,
//再减去1,就是 000 11111 11111111 11111111 11111111,
//前三位代表线程池运行状态runState,
//所以这里workerCount的理论最大值就应该是29个1,即536870911。

workerCount作为其中一个概念复合在AtomicInteger ctl中,ThreadPoolExecutor也提供了从AtomicInteger ctl中解析出workerCount的方法,如下:

代码语言:javascript
复制
private static int workerCountOf(int c)  { 
    return c & CAPACITY; 
}
//传入的c代表的是ctl的值,即高3位为线程池运行状态runState,低29位为线程池中当前活动的线程数量workerCount.
//将其与CAPACITY进行与操作&,也就是与000 11111 11111111 11111111 11111111进行与操作,
//c的前三位通过与000进行与操作,无论c前三位为何值,最终都会变成000,也就是舍弃前三位的值,
//而c的低29位与29个1进行与操作,c的低29位还是会保持原值,
//这样就从AtomicInteger ctl中解析出了workerCount的值。

runState:

线程池运行状态,它占据ctl的高3位,有RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五种状态。我们先分别解释下这五种状态:

  1. RUNNING:接受新任务,并处理队列任务
代码语言:javascript
复制
//Accept new tasks and process queued tasks
private static final int RUNNING    = -1 << COUNT_BITS;

-1在Java底层是由32个1表示的,左移29位的话,即111 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位全部为1的话,表示RUNNING状态,即-536870912;

  1. SHUTDOWN:不接受新任务,但会处理队列任务
代码语言:javascript
复制
//Don't accept new tasks, but process queued tasks
private static final int SHUTDOWN   =  0 << COUNT_BITS;

0在Java底层是由32个0表示的,无论左移多少位,还是32个0,即000 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位全部为0的话,表示SHUTDOWN状态,即0。

  1. STOP:不接受新任务,不会处理队列任务,而且会中断正在处理过程中的任务
代码语言:javascript
复制
//Don't accept new tasks, don't process queued tasks,
//and interrupt in-progress tasks
private static final int STOP       =  1 << COUNT_BITS;

1在Java底层是由前面的31个0和1个1组成的,左移29位的话,即001 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位为001的话,表示STOP状态,即536870912;

  1. TIDYING:所有的任务已结束,workerCount为0,线程过渡到TIDYING状态,将会执行terminated()钩子方法。
代码语言:javascript
复制
/*
All tasks have terminated, workerCount 
is zero,the thread transitioning to 
state TIDYING will run the terminated() hook method
*/
private static final int TIDYING    =  2 << COUNT_BITS;

2在Java底层是由前面的30个0和1个10组成的,左移29位的话,即010 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位为010的话,表示TIDYING状态,即1073741824。

  1. TERMINATED:terminated()方法已经完成
代码语言:javascript
复制
//terminated() has completed
private static final int TERMINATED =  3 << COUNT_BITS;

3在Java底层是由前面的30个0和1个11组成的,左移29位的话,即011 00000 00000000 00000000 00000000,也就是低29位全部为0,高3位为011的话,表示TERMINATED状态,即1610612736。

由上面我们可以得知,运行状态的值按照RUNNING-->SHUTDOWN-->STOP-->TIDYING-->TERMINATED顺序值是递增的,这些值之间的数值顺序很重要。随着时间的推移,运行状态单调增加,但是不需要经过每个状态。那么,可能存在的线程池状态的转换是什么呢?如下:

代码语言:javascript
复制
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed
     *
  1. RUNNING -> SHUTDOWN:调用shutdown()方法后,或者线程池实现了finalize方法,在里面调用了shutdown方法,即隐式调用。
  2. (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()方法后。
  3. SHUTDOWN -> TIDYING:线程池和队列均为空时。
  4. STOP -> TIDYING:线程池为空时。
  5. TIDYING -> TERMINATED:terminated()钩子方法完成时。

接下来看看如何从ctl中提取runState:

代码语言:javascript
复制
private static int runStateOf(int c)     { return c & ~CAPACITY; }

~ 是按位取反的意思,CAPACITY表示的是高位的3个0,和低位的29个1,而~CAPACITY则表示高位的3个1,2低位的9个0,然后再与入参c执行按位与操作,即高3位保持原样,低29位全部设置为0,也就获取了线程池的运行状态runState。

最后看一下这个方法:

代码语言:javascript
复制
private static int ctlOf(int rs, int wc) { return rs | wc; }

将runState和workerCount做或操作|处理,即用runState的高3位,workerCount的低29位填充的数字,而默认传入的runState、workerCount分别为RUNNING和0。

接下来看一下其他几个重要的成员变量:

代码语言:javascript
复制
    /**
     * The queue used for holding tasks and handing off to worker
     * threads.  We do not require that workQueue.poll() returning
     * null necessarily means that workQueue.isEmpty(), so rely
     * solely on isEmpty to see if the queue is empty (which we must
     * do for example when deciding whether to transition from
     * SHUTDOWN to TIDYING).  This accommodates special-purpose
     * queues such as DelayQueues for which poll() is allowed to
     * return null even if it may later return non-null when delays
     * expire.
     */
    private final BlockingQueue<Runnable> workQueue;

    /**
     * Lock held on access to workers set and related bookkeeping.
     * While we could use a concurrent set of some sort, it turns out
     * to be generally preferable to use a lock. Among the reasons is
     * that this serializes interruptIdleWorkers, which avoids
     * unnecessary interrupt storms, especially during shutdown.
     * Otherwise exiting threads would concurrently interrupt those
     * that have not yet interrupted. It also simplifies some of the
     * associated statistics bookkeeping of largestPoolSize etc. We
     * also hold mainLock on shutdown and shutdownNow, for the sake of
     * ensuring workers set is stable while separately checking
     * permission to interrupt and actually interrupting.
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Set containing all worker threads in pool. Accessed only when
     * holding mainLock.
     */
    private final HashSet<Worker> workers = new HashSet<Worker>();

    /**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     */
    private long completedTaskCount;

    /*
     * All user control parameters are declared as volatiles so that
     * ongoing actions are based on freshest values, but without need
     * for locking, since no internal invariants depend on them
     * changing synchronously with respect to other actions.
     */

    /**
     * Factory for new threads. All threads are created using this
     * factory (via method addWorker).  All callers must be prepared
     * for addWorker to fail, which may reflect a system or user's
     * policy limiting the number of threads.  Even though it is not
     * treated as an error, failure to create threads may result in
     * new tasks being rejected or existing ones remaining stuck in
     * the queue.
     *
     * We go further and preserve pool invariants even in the face of
     * errors such as OutOfMemoryError, that might be thrown while
     * trying to create threads.  Such errors are rather common due to
     * the need to allocate a native stack in Thread.start, and users
     * will want to perform clean pool shutdown to clean up.  There
     * will likely be enough memory available for the cleanup code to
     * complete without encountering yet another OutOfMemoryError.
     */
    private volatile ThreadFactory threadFactory;

    /**
     * Handler called when saturated or shutdown in execute.
     */
    private volatile RejectedExecutionHandler handler;

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     */
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     */
    private volatile int maximumPoolSize;

    /**
     * The default rejected execution handler
     */
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();
代码语言:javascript
复制
private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小
                                                              //、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
 
private volatile long  keepAliveTime;    //线程存货时间   
private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
 
private volatile int   poolSize;       //线程池中当前的线程数
 
private volatile RejectedExecutionHandler handler; //任务拒绝策略
 
private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
 
private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
 
private long completedTaskCount;   //用来记录已经执行完毕的任务个数
二、workQueue:

阻塞队列,用来存储等待执行的任务,这里的阻塞队列有以下几种选择:

代码语言:javascript
复制
ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
三、corePoolSize:

核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中。

四、maximumPoolSize:

线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

五、keepAliveTime:

表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

六、handler:

表示当拒绝处理任务时的策略,有以下四种取值:

代码语言:javascript
复制
//当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018.12.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、AtomicInteger ctl
  • 二、workQueue:
  • 三、corePoolSize:
  • 四、maximumPoolSize:
  • 五、keepAliveTime:
  • 六、handler:
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档