前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >线程池实现原理-1

线程池实现原理-1

作者头像
Java识堂
发布2019-08-13 14:14:50
6640
发布2019-08-13 14:14:50
举报
文章被收录于专栏:Java识堂Java识堂

前言

设计到一部分AQS和阻塞队列的内容,可以看一下如下分享

深入理解AbstractQueuedSynchronizer

深入理解阻塞队列

作用

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗
  2. 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

使用

通过Executors类,提供四种线程池

public class Task extends Thread{

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + " is running");
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
public class TestCachedThreadPool {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 5; i++) {
            Task task = new Task();
            executorService.execute(task);
        }
        //pool-1-thread-1 is running
        //pool-1-thread-5 is running
        //pool-1-thread-2 is running
        //pool-1-thread-4 is running
        //pool-1-thread-3 is running
        //必须显式结束,不然程序永远不会结束
        executorService.shutdown();
    }
}

这个看起来好像没有用到线程池,其实是因为没有可复用的线程,所以就一直创建新的线程了

public class TestFixedThreadPool {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        for (int i = 0; i < 5; i++) {
            Task task = new Task();
            executorService.execute(task);
        }
        //pool-1-thread-1 is running
        //pool-1-thread-2 is running
        //pool-1-thread-1 is running
        //pool-1-thread-2 is running
        //pool-1-thread-1 is running
        executorService.shutdown();
    }
}
public class TestScheduledThreadPool {

    public static void main(String[] args) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        //任务,第1次任务延迟的时间,2次任务间隔时间,时间单位
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("task 1 " + System.currentTimeMillis());
            }
        }, 1, 5, TimeUnit.SECONDS);
        //两者互不影响
        executorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("task 2 " + System.currentTimeMillis());
            }
        }, 1, 2,TimeUnit.SECONDS);
        //task 1 1521949174111
        //task 2 1521949174112
        //task 2 1521949176106
        //task 2 1521949178122
        //task 1 1521949179104
        //task 2 1521949180114
    }
}
public class TestSingleThreadExecutor {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            Task task = new Task();
            executorService.execute(task);
        }
        //pool-1-thread-1 is running
        //pool-1-thread-1 is running
        //pool-1-thread-1 is running
        //pool-1-thread-1 is running
        //pool-1-thread-1 is running
        executorService.shutdown();
    }
}

源码

基于jdk1.8.0_20 ,先说一下我的理解,以便对线程池的工作方式有个大概了解,以前我们运行线程的时候new Thread().start()即可,如果线程数多了,频繁的创建线程和销毁线程很费时间,于是Doug Lea将实现了Runnable接口的任务放到一个容器中,然后启动一个线程执行完自己的任务后,还能从容器中拿出任务,调用Runnable接口的run方法,这样一个Thread类就能执行多个任务了,当然可以启动多个线程同时消费容器中的任务,线程池就这样实现了

状态

先了解一下线程池的状态及线程数量的表示方式

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

AtomicInteger类型的ctl代表了ThreadPoolExecutor的控制状态,是一个复合类型的变量,借助高低位包装了2个概念

  1. runState 线程池运行状态,占据ctrl的高三位
  2. workerCount 线程池中当前活动的线程数量,占据ctl的低29位

COUNT_BITS代表了workerCount所占的位数,即29,而CAPACITY表示线程池理论的最大活动线程数量,即536870911

0在Java底层是由32个0表示的,无论左移多少位,还是32个0,即SHUTDOWN的值为0,TIDYING则是高三位为010,低29为0

private static int runStateOf(int c)     { return c & ~CAPACITY; }

~是按位取反的意思,CAPACITY表示的是高位的3个0,和低位的29个1,而~CAPACITY则表示高位的3个1,和低位的29个0,然后再与入参c执行按位与操作,即高3位保持原样,低29位全部设置为0,也就获取了线程池的运行状态runState

private static int ctlOf(int rs, int wc) { return rs | wc; }

传入的rs表示线程池运行状态runState,其是高3位有值,低29位全部为0的int,而wc则代表线程池中有效线程的数量workerCount,其为高3位全部为0,而低29位有值的int,将runState和workerCount做按位或,即用runState的高3位,workerCount的低29位填充的数字

构造函数

从上面例子的代码开始看起

ExecutorService executorService = Executors.newCachedThreadPool();
// Executors
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
// ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}
// ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

这里简单说一下corePoolSize和maximumPoolSize,可以进行如下类比学习,corePoolSize=公司的基本人员,maximumPoolSize=公司的基本人员+外包人员。corePoolSize,保持存活的工作线程的最小数目,当小于corePoolSize时,会直接启动新的一个线程来处理任务,而不管线程池中是否有空闲线程

keepAliveTime是空闲线程的存活时间,默认用于非核心线程,但是当allowCoreThreadTimeOut=true时(这个值默认是false),同样用于核心线程

ThreadFactory是一个工厂类接口,我们可以实现这个接口,来自定义产生线程的方式

public interface ThreadFactory {

    /**
     * Constructs a new {@code Thread}.  Implementations may also initialize
     * priority, name, daemon status, {@code ThreadGroup}, etc.
     */
    Thread newThread(Runnable r);
}

来看一下上面的例子使用的默认的工厂类,这个默认的工厂类是Executors类的一个静态内部类

// Executors
static class DefaultThreadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    DefaultThreadFactory() {
        // 可以定义代码能访问那些资源的管理器,一般不做设置
        SecurityManager s = System.getSecurityManager();
        group = (s != null) ? s.getThreadGroup() :
                              Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" +
                      poolNumber.getAndIncrement() +
                     "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r,
                              namePrefix + threadNumber.getAndIncrement(),
                              0);
        // 如果是守护进程,设置成非守护进程
        if (t.isDaemon())
            t.setDaemon(false);
        // 设置成一般的优先级
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

在看看上面例子打印的Thread.currentThread().getName(),你是不是知道因为啥了?

RejectedExecutionHandler是一个接口,有4个实现类,对应4种处理策略,这4个实现类是ThreadPoolExecutor的静态内部类

饱和策略接口,当队列和线程池都满了,说明线程处于饱和状态,必须采取策略处理提交的任务,我们可以实现这个接口来自定义处理策略

public interface RejectedExecutionHandler {

    // 用executor执行r
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

来看上面例子中使用的饱和策略的实现方式,其他策略方式实现也挺简单,不再介绍

// 丢弃任务,抛运行时异常
// ThreadPoolExecutor
public static class AbortPolicy implements RejectedExecutionHandler {

    public AbortPolicy() { }


    // RejectedExecutionException继承自RuntimeException
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                             " rejected from " +
                                             e.toString());
    }
}

ThreadPoolExecutor的各种属性,前面基本上介绍的差不多了

public class ThreadPoolExecutor extends AbstractExecutorService {

    // 任务队列,用于保存等待执行的任务的阻塞队列
    private final BlockingQueue<Runnable> workQueue;

    // Lock held on access to workers set and related bookkeeping.
    private final ReentrantLock mainLock = new ReentrantLock();

    // 存放线程池中的worker线程,mainLock加锁时才能访问
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // Wait condition to support awaitTermination
    private final Condition termination = mainLock.newCondition();

    // 记录线程池达到的最大数量,mainLock加锁时才能访问
    private int largestPoolSize;

    // 完成任务的计数器,仅在工作线程终止时更新,mainLock加锁才能访问
    private long completedTaskCount;

    // 用于设置线程的工厂
    private volatile ThreadFactory threadFactory;

    // 在线程池饱和或者线程池处于shutdown 状态时被调用
    private volatile RejectedExecutionHandler handler;

    // 当allowCoreThreadTimeOut = true || 当前线程数量 > corePoolSize
    // 空闲线程超时等待的时间,否则一直阻塞等待
    private volatile long keepAliveTime;

    // 为false(默认),核心线程仍然活着,即使在空闲
    // 为true,核心线程使用keepAliveTime超时等待工作
    // 如果超过keepAliveTime则销毁
    private volatile boolean allowCoreThreadTimeOut;

    // 核心线程池数量
    private volatile int corePoolSize;

    // 线程池最大数量
    private volatile int maximumPoolSize;

    // 默认策略是 丢弃任务,抛运行时异常
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

    /*
     * 检查是否有modifyThread权限
     *  Modification of threads, e.g., via calls to Thread
     * interrupt, stop, suspend,
     * resume, setDaemon, setPriority,
     * setName and setUncaughtExceptionHandler
     * methods
     * This allows an attacker to modify the behaviour of
     * any thread in the system.
     */
    private static final RuntimePermission shutdownPerm =
        new RuntimePermission("modifyThread");

}

execute实现

将线程放入线程池有2种方式,一种是execute,一种是submit,这里我们先说一下execute执行流程

  1. 首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务。满了,则进入下个流程。
  2. 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里。满了,则进入下个流程。
  3. 最后线程池判断整个线程池是否已满(即线程数是否小于线程池最大容量)?没满,则创建一个新的工作线程来执行任务,满了,则交给饱和策略来处理这个任务。

简单类比一下创建过程

  1. 来项目了,如果公司有比较闲的基本人员,则让这些闲的基本人员干活
  2. 公司的基本人员都有活干,则看看以后的日程排满没?没有,则把这些项目排到日程后面,以后干就行
  3. 项目太多,以后也没时间干,看看公司还有剩余工位不,有工位,来一个项目招一个外包,工位满了,该经理做决定了,是不接这个项目还是其他做法
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    // 当前线程数<corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 线程池处于RUNNING状态,并且任务成功放入阻塞队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 线程池处于RUNNING状态,但是没有线程,创建线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

执行到addWorker(null, false)这个方法说明,任务刚来的时候核心线程都在工作,结果它就被放到阻塞队列中了,然后核心线程都执行完了(并且都被销毁了),如果不调用一下这个方法,则放到阻塞队列中的任务就不会被执行

核心线程不是一直都存在的吗?为什么会被销毁了,这还得从一个属性allowCoreThreadTimeOut说起,这个属性默认是false,意思是核心线程不会被销毁,如果设置为ture,则在workQueue为空的时候,核心线程有可能全被销毁了

reject其实就是根据我们设置的策略,来处理无法运行的任务

final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
}
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-06-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Java识堂 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 使用
    • execute实现
    相关产品与服务
    容器服务
    腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档