前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入理解线程池底层原理

深入理解线程池底层原理

作者头像
xcbeyond
发布2020-03-25 14:58:29
3680
发布2020-03-25 14:58:29
举报
文章被收录于专栏:技术那些事技术那些事

如何理解线程池的工作机制和原理?

1、线程池是用来干嘛的,用它有什么好处,怎么能更好的去用线程池?

线程池是用来干嘛的?

如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。

那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?

使用线程池的好处

1)降低资源消耗。重复利用已创建线程,降低线程创建与销毁的资源消耗。

2)提高响应效率。任务到达时,不需等待创建线程就能立即执行。

3)提高线程可管理性。

4)防止服务器过载。内存溢出、CPU耗尽

如何去使用线程池呢?

这个可以直接使用JUC中提供的四个不同的构造器,每个构造器中都有不同的参数,每个参数代表什么样的含义下面我会给大家解释。

2、线程池的核心

先了解一下线程池中出现的核心参数和变量的意思,以便后续理解。

这个是图是主要的父子孙类的关系结构

  • Executor: 所有线程池的接口,只有一个方法。
  • ExecutorService: 增加Executor的行为,是Executor实现类的最直接接口。
  • Executors:提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService 接口。
  • ThreadPoolExecutor:线程池的具体实现类,一般用的各种线程池都是基于这个类实现的。

在ThreadPoolExecutor中定义了一个Volatile变量,另外定义了几个static final变量表示线程池的各个状态:

runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;

下面的几个static final变量表示runState可能的几个取值。

1)当创建线程池后,初始时,线程池处于RUNNING状态;

2)如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;

3)如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;

4)当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

代码语言:javascript
复制
    volatile int runState;
    static final int RUNNING    = 0;
    static final int SHUTDOWN   = 1;
    static final int STOP       = 2;
    static final int TERMINATED = 3;

最重要的ThreadPoolExecutor(参数)这个是构造器,一共有四个构造器,里面的参数也不同。

代码语言:javascript
复制
public class ThreadPoolExecutor extends AbstractExecutorService {
    //四个空构造器,要理解每一个参数的含义
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

1)第一个参数:int corePoolSIze

核心池大小(其实这个就相当于是球队的主力队员,一般情况先都是这几个主力队员上场,但是如果遇到主力队员人数不够或者受伤之后不足以满足比赛才会启用maximumPoolSize这个参数),也就是线程池中会维持不被释放的线程数量。我们可以看到FixedThreadPool中这个参数值就是设定的线程数量,而SingleThreadExcutor中就是1,newCachedThreadPool中就是0,不会维持,只会缓存60L。但需要注意的是,在线程池刚创建时,里面并没有建好的线程,只有当有任务来的时候才会创建(除非调用方法prestartAllCoreThreads()与prestartCoreThread()方法),在corePoolSize数量范围的线程在完成任务后不会被回收。

2)第二个参数:int maximumPoolSize

(可以把这个参数当成是球队后背球员,当主力不足时才会让后备队员上场救急)线程池的最大线程数,代表着线程池中能创建多少线程池。超出corePoolSize,小于maximumPoolSize的线程会在执行任务结束后被释放。此配置在CatchedThreadPool中有效。

3)第三个参数:long keepAliveTime

刚刚说到的会被释放的线程缓存的时间。我们可以看到,正如我们所说的,在CachedThreadPool()构造过程中,会被设置缓存时间为60s(时间单位由第四个参数控制)。

4)第四个参数:TimeUnit unit

设置第三个参数keepAliveTime的时间单位。

5)第五个参数:BlockingQueue<Runnable> workQueue

就是四种阻塞的队列,也就是当线程池满了之后,再进来的任务都会放到这个阻塞队列中等待。

存储等待执行任务的阻塞队列,有多种选择,分别为:

  • SynchronousQueue——直接提交策略,适用于CachedThreadPool。它将任务直接提交给线程而不保持它们。如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求最大的 maximumPoolSize 以避免拒绝新提交的任务(正如CachedThreadPool这个参数的值为Integer.MAX_VALUE)。当任务以超过队列所能处理的量、连续到达时,此策略允许线程具有增长的可能性。吞吐量较高。
  • LinkedBlockingQueue——无界队列,适用于FixedThreadPool与SingleThreadExcutor。基于链表的阻塞队列,创建的线程数不会超过corePoolSizes(maximumPoolSize值与其一致),当线程正忙时,任务进入队列等待。按照FIFO原则对元素进行排序,吞吐量高于ArrayBlockingQueue。
  • ArrayListBlockingQueue——有界队列,有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。

6)第六个参数:RejectedExecutionHandler handler

这个参数是当任务到队列中之后缓存中队列阻塞的也已经满了的时候,会去启动备用后备队员去进行补充球队,但是如果此时后备队员也不够的话(),这个参数就会起到他的作用,会启用无法执行任务的策略:

代码语言:javascript
复制
    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

7)第七个参数:threadFactory

这个参数就是一个线程工厂,主要的功能就是用来创建线程的

3、先理解一下线程池的大概几步主要的工作流程思路分析?

注意:这个是整个线程池的大体流程的代码,下面一步一步分解思路

代码语言:javascript
复制
        //有任务提交过来的话,会执行这个方法
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
         //这个是先做第一个判断当前线程是不是大于等于核心线程池(说明满了),如果大于会继续执行第二步把提交过来的任务添加到任务队列中去
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        //如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列;

            if (runState == RUNNING && workQueue.offer(command)) {
              //如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,
              //则说明需要启用备用球员来上场(maximumPoolSize可以把这个看成是紧急预备队),来去处理这个提交的任务
                if (runState != RUNNING || poolSize == 0)
                //然后去处理任务
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }

第一大步:

第一步:当有任务提交过来的时候其实也就是当执行恶心excute()的时候,首先会先判断当前正在运行的线程是不是大于核心线程池(corePoolSIze)。

(1)如果大于说明核心线程池已经满了,然后把当前线程放到阻塞对列中:(2)如果小于说明核心线程池还有剩余,就直接创建一个线程执行任务,也就是调用addIfUnderCorePoolSize(command):

步骤一:

代码语言:javascript
复制
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        //先获取锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //这部分重复判断(poolSize < corePoolSize是为了防止新提交的任务也会走到这里,大佬考虑的比较全)
            if (poolSize < corePoolSize && runState == RUNNING)
             //创建线程去执行firstTask任务   
                t = addThread(firstTask);       
            } finally {
            //释放锁
            mainLock.unlock();
        }
        if (t == null)
            return false;
        //到这里说明当第一次当前线程池小于核心池时,提交的任务就已经执行了,结束。(但是这种情况比较少)
        t.start();
        return true;
    }

步骤二:

会走到这个方法中addThread(firstTask):

代码语言:javascript
复制
    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        Thread t = threadFactory.newThread(w);  //创建一个线程,执行任务   
        if (t != null) {
            w.thread = t;            //将创建的线程的引用赋值为w的成员变量       
            workers.add(w);
            int nt = ++poolSize;     //当前线程数加1       
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

步骤三:

这里会去首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中

代码语言:javascript
复制
private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }

    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            boolean ran = false;
            beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据
            //自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等           
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }

    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //当任务队列中没有任务时,进行清理工作       
        }
    }
}

步骤四:

然后继续走到关键的一步,走到run()的方法中去执行这个线程

代码语言:javascript
复制
public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}

步骤五:

这里有个小的方法,就是getTask()方法,他是不断的从那个缓冲队列中去任务,然后执行。

代码语言:javascript
复制
    Runnable getTask() {
        for (;;) {
            try {
                int state = runState;
                if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
                    //则通过poll取任务,若等待一定的时间取不到任务,则返回null
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {    //如果没取到任务,即r为null,则判断当前的worker是否可以退出
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();   //中断处于空闲状态的worker
                    return null;
                }
                // Else retry
            } catch (InterruptedException ie) {
                // On interruption, re-check runState
            }
        }
    }

**注意:**

在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。

如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。

如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。

步骤六:

这一步其实就是如果没有从缓冲队列中取到任务,就退出的方法workerCanExit()

代码语言:javascript
复制
private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //如果runState大于等于STOP,或者任务缓存队列为空了
    //或者  允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

步骤七:补充可能会出现的情况:

因为之前线程的状态有:

代码语言:javascript
复制
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;

这四种情况,这里讨论的是如果出现STOP的状态的情况下:

也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,我们看一下interruptIdleWorkers()的实现:

代码语言:javascript
复制
void interruptIdleWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)  //实际上调用的是worker的interruptIfIdle()方法
            w.interruptIfIdle();
    } finally {
        mainLock.unlock();
    }
}

实现可以看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中

代码语言:javascript
复制
void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {    //注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的
                                //如果成功获取了锁,说明当前worker处于空闲状态
        try {
    if (thread != Thread.currentThread())  
    thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}

第二大步:

第二步:就是沿着第一步的1)之后继续走下去,这一步关键的是当任务放入缓冲阻塞队列中是不是成功(也就是等待执行任务的队列是不是满了)。代码就是workQueue.offer(command),如果加入成功之后就是提交的任务就在这个队列中等待着执行。然后执行

这里有个执行的过程:

(这里我们先假设队列满了,才会执行到下面的第三步,否则就不会执行第三步了)

第三大步:

其实还是会重复第一大步的过程,只不过是这次是当前线程池的大小跟maximumPoolSize (应急队员的个数进行比较,看是不是应急队员足够来弥补缺少的队员)如果可以的话。其实就是重复第一大步骤的内容,只不过是这次调用的是addIfUnderMaximumPoolSize(Runnable firstTask)。其实后面的核心的思想和过程都是在重复第一大步的内容。

代码语言:javascript
复制
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

是不是跟第一大步中的第一个步骤的逻辑都是一样的!希望能够帮助大家理解到线程池有关的源码问题。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-12-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿技术大咖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档