Executor线程池只看这一篇就够了

预计阅读时间:12分钟

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。

线程实现方式

Thread、Runnable、Callable

//实现Runnable接口的类将被Thread执行,表示一个基本任务
public interface Runnable {
    //run方法就是它所有内容,就是实际执行的任务
    public abstract void run();
}
//Callable同样是任务,与Runnable接口的区别在于它接口泛型,同时它执行任务候带有返回值;
//Callable的使用通过外层封装成Future来使用
public interface Callable<V> {
    //相对于run方法,call方法带有返回值
    V call() throws Exception;
}

注意:启动Thread线程只能用start(JNI方法)来启动,start方法通知虚拟机,虚拟机通过调用器映射到底层操作系统,通过操作系统来创建线程来执行当前任务的run方法

Executor框架

Executor接口是线程池框架中最基础的部分,定义了一个用于执行Runnable的execute方法。从图中可以看出Exectuor下有一个重要的子接口ExecutorService,其中定义了线程池的具体行为:

  • execute(Runnable runnable):执行Runnable类型的任务
  • submit(task):用来提交Callable或者Runnable任务,并返回代表此任务的Future对象
  • shutdown():在完成已经提交的任务后封闭办事,不在接管新的任务
  • shutdownNow():停止所有正在履行的任务并封闭办事
  • isTerminated():是一个钩子函数,测试是否所有任务都履行完毕了
  • isShutdown():是一个钩子函数,测试是否该ExecutorService是否被关闭

ExecutorService中的重点属性:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

ctl:对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段,它包含两部分信息:线程池的运行状态(runState)和线程池内有效线程的数量(workerCount)。

这里可以看到,使用Integer类型来保存,高3位保存runState,低29位保存workerCount。COUNT_BITS 就是29,CAPACITY 就是1左移29位减1(29个1),这个常量表示workerCount的上限值,大约是5亿。

ctl相关方法:

//获取运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//获取活动线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
//获取运行状态和活动线程数的值
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池的状态:

RUNNING = -1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011

1、RUNNING

  • 状态说明:线程池处于RUNNING状态,能够接收新任务,以及对已添加的任务进行处理。
  • 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。

2、SHUTDOWN

  • 状态说明:线程池处于SHUTDOWN状态,不接收新任务,能够处理已经添加的任务。
  • 状态切换:调用shutdown()方法时,线程池由RUNNING -> SHUTDOWN。

3、STOP

  • 状态说明:线程池处于STOP状态,不接收新任务,不处理已提交的任务,并且会中断正在处理的任务。
  • 状态切换:调用线程池中的shutdownNow()方法时,线程池由(RUNNING or SHUTDOWN) -> STOP。

4、TIDYING

  • 状态说明:当所有的任务已经停止,ctl记录“任务数量”为0,线程池会变为TIDYING状态。当线程池处于TIDYING状态时,会执行钩子函数 terminated()。terminated()在ThreadPoolExecutor类中是空, 的,若用户想在线程池变为TIDYING时,进行相应处理,可以通过重载 terminated()函数来实现。
  • 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行任务也为空时,就会由SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP-> TIDYING。

5、TERMINATED

  • 状态说明:线程池线程池彻底停止,线程池处于TERMINATED状态,
  • 状态切换:线程池处于TIDYING状态时,执行完terminated()之后, 就会由TIDYING->TERMINATED。

线程池使用

RunTask类:

public class RunTask implements Runnable {
    public void run() {
        System.out.println("Thread name:"+Thread.currentThread().getName());
    }
}

ExecutorSample类:

public class ExecutorSample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (int i=0;i<20;i++){
            //提交任务无返回值
            executor.execute(new RunTask());
            //任务执行完成后有返回值
            Future<Object> future = executor.submit(new RunTask());
        }
    }
}

线程池的具体使用:

  • ThreadPoolExecutor 默认线程池
  • ScheduledThreadPoolExecutor 定时线程池

ThreadPoolExecutor

线程池的创建:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         threadFactory, defaultHandler);
}
  • corePoolSize:线程池中的核心线程数。当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
  • maximumPoolSize:线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。
  • keepAliveTime:线程池维护线程所允许的空闲时间。当线程池中的线程数量大于corePoolSize时候,如果这时候没有新的任务提交,核心线程外的线程不会立即被销毁,而是会等待,直到等待的时间超过了keepAliveTime unit:keepAliveTime的单位时间
  • workQueue:用于保存等待被执行的任务的阻塞队列,且任务必须实现Runnable接口,在JDK中提供了如下阻塞队列:ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务。LinkedBlockingQueue:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQueue。SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常高于LinkedBlockingQueue。
  • PriorityBlockingQueue:具有优先级的无界阻塞队列。
  • threadFactory:ThreadFactory 类型的变量,用来创建新线程。默认使用ThreadFactory.defaultThreadFactory来创建线程, 会使新创建线程具有相同的NORM_PRIORITY优先级并且都是非守护线程,同时也设置了线程名称。
  • handler:线程池的饱和策略。当阻塞队列满了,且没有空闲的工作队列,如果继续提交任务,必须采用一种策略处理该任务.

线程池的监控:

public long getTaskCount() //线程池已执行与未执行的任务总数
public long getCompletedTaskCount() //已完成的任务数
public int getPoolSize() //线程池当前的线程数
public int getActiveCount() //线程池中正在执行任务的线程数量

线程池的原理

  • 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意这一个步骤需要获取全局锁)。
  • 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
  • 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意这一个步骤需要获取全局锁)。
  • 如果创建的新线程将使当前运行的线程超出maximumPoolSize,任务将被执行饱和策略。ThreadPoolExecutor 采用上述的设计思路,是为执行execute()方法时,尽可能避免获取全局锁(一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后,几乎所有的execute()方法调用都是在执行步骤2,而步骤2不需要获取全局锁。

End

关注【小强的进阶之路】技术公众号:

  • 回复“1”进大型内推群!
  • 回复“2”免费获取海量开发学习资料!
  • 回复“3”加入知识星球,和大牛一起学习!

原文发布于微信公众号 - 小强的进阶之路(xiaoqiang_code)

原文发表时间:2019-08-19

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券