前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java多线程探索(二):优秀的ThreadPoolExecutor到底是如何工作的?

Java多线程探索(二):优秀的ThreadPoolExecutor到底是如何工作的?

作者头像
闲宇非鱼
发布2022-02-08 11:18:01
3930
发布2022-02-08 11:18:01
举报

人生苦短,不如养狗

一、前言

  在上一篇Java多线程探索(一):为什么要使用ThreadPoolExecutor?中我们简单介绍了为什么推荐使用ThreadPoolExecutor的原因。今天我们就来具体分析一下ThreadPoolExecutor的工作原理。

二、ThreadPoolExecutor总览

  在探索具体工作流程之前,我们先来看一看ThreadPoolExecutor比较重要的成员变量、构造函数和几个重要的内部类。

(一)成员变量

代码语言:javascript
复制
// 主要的线程池控制状态变量,用32位二进制表示
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

// 阻塞队列,用于保存等待执行资源的任务,可以自行选择相应的阻塞队列
private final BlockingQueue<Runnable> workQueue;

private final ReentrantLock mainLock = new ReentrantLock();

// 用于保存线程池中所有的worker线程,只有持有mainLock才可以获取
private final HashSet<Worker> workers = new HashSet<Worker>();

// 用于支持awaitTermination()方法的等待条件
private final Condition termination = mainLock.newCondition();

// 用于追踪最大可获得的线程池的大小
private int largestPoolSize;

// 用于记录已经完成的任务数量
private long completedTaskCount;

// 线程池工厂,所有的线程都是通过addWorker()来创建
private volatile ThreadFactory threadFactory;

// 当线程池和阻塞队列满或者线程池被关闭时会启用handler
private volatile RejectedExecutionHandler handler;

// 空闲线程保持时间
private volatile long keepAliveTime;

// 核心线程池大小,即保持存活的最小worker的数量(不会超时,除非设置了allowCoreThreadTimeOut,此时最小值为0)
private volatile int corePoolSize;

// 最大线程池大小
private volatile int maximumPoolSize;

  以下只介绍其中需要理解的成员变量ctl,其余的大家可以参照注释看一下。 ctl:该变量是一个原子类型的整型变量,实际存放了32位的二进制数,主要用于表示两个概念性的属性——workerCount和runState,其中高3位用于表示runState,低29位用于表示workerCount。

线程池生命周期(状态流转)

  和线程一样,线程池也是有生命周期的。如上文所说,ctl中存放了两个属性,其中runState用于表示线程池的状态,一共有五种状态:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。以下是线程池五种状态的流转关系图,也就是线程池的生命周期:

RUNNING:此时线程池能够接受新的任务,并且能够处理阻塞队列中的任务。SHUTDOWN:此时线程池不接受新的任务,只处理阻塞队列中的任务。STOP:此时线程池既不接受新的任务,也不接受阻塞队列中的任务。TIDYING:此时所有的任务都被终止,workerCount等于0,正在转变成TIDYING状态的线程将会执行terminated()钩子方法。TERMINATED:此时terminated()执行完毕。

(二)构造函数

ThreadPoolExecutor中有许多构造函数,这里我们只看参数列表最全的那个:

代码语言:javascript
复制
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

  首先看下参数列表,前五个参数中规中矩,最后两个参数ThreadFactoryRejectedExecutionHandler都是接口类型的参数,这就意味着,我们可以根据项目中的实际情况来实现对应的接口,使用自己的实现类来进行初始化。   具体看下方法体。首先,进行了参数校验,corePoolSizemaximumPoolSizekeepAliveTime不能小于0,maximumPoolSize必须大于corePoolSize,否则抛出IllegalArgumentException异常。workQueuethreadFactoryhandler不能为null,否则抛出NullPointerException异常。完成参数校验之后就是初始化参数。

(三)几个重要的内部类

  在ThreadPoolExecutor中定义了五个内部类,分别是AbortPolicyCallerRunsPolicyDiscardOldestPolicyDsicardPolicyWorker。   前四个内部类是ThreadPoolExecutor提供的拒绝策略,也就是当线程池和阻塞队列都已经满了之后,新到的线程应该如何处理的策略。当然,大家也可以根据自己的实际业务需求实现RejectedExecutionHandler定制化自己的拒绝策略。 Worker内部类可以说是ThreadPoolExecutor相当核心的一个内部类。它的主要作用是用于维护正在运行的任务的中断控制状态,并维护一些次要的信息。这句话是翻译自源码的注释,感觉其实还是没有解释清楚Worker到底是干什么的。其实Worker维护中断控制状态的目的是为了保证运行中的任务不被中断。是不是还有点迷糊,不要慌,让我们来看看Worker到底长什么样子:

代码语言:javascript
复制
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
	// 这里将state设置为-1是为了阻止中断,直到执行了runWorker方法
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

    // Lock methods
    //
    // The value 0 represents the unlocked state.
    // The value 1 represents the locked state.

    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
	// 可以看到这里和ReentrantLock的区别,只能获取锁一次,即不可重入
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}

  可以看到Worker既继承了AQS,又实现了Runnable接口。通过继承AQSWorker定制化的实现了一个简化版的ReentrantLock,其中最大区别就是Worker不可重入的独占锁。而通过实现Runnable接口,Worker又能像线程一样进行工作,作为线程池中任务运行的基本单位。这也就是为什么在它的构造方法中,能够通过指定的线程工厂和this对象创建对应工作线程。   现在我们在回头来看下上面所说的维护了运行任务的线程的中断控制状态,Worker是如何去进行中断控制的呢?这里Worker继承AQS实现了一个不可重入的独占锁,通过这个锁,Worker可以判断当前线程是否可以进行中断,如果当前线程获取到了该锁,说明当前线程正在运行,不能进行中断。而如果当前线程是空闲状态,也就是无锁状态,那么就可以对其进行中断。   再来看下Worker不可重入,其实还是为了保证运行中的线程不会被中断。举个例子,当任务在调用像setCoreSize()这样的线程池控制方法时,会执行这样一个方法 interruptIdleWorkers()

代码语言:javascript
复制
public void setCorePoolSize(int corePoolSize) {
	、、、
		if (workerCountOf(ctl.get()) > corePoolSize)
    		interruptIdleWorkers();
	、、、
}

private void interruptIdleWorkers() {
    interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
	    // 如果可以重入,则会导致进入到这个代码块
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

  可以看到,如果可以重入的话,那么if (!t.isInterrupted() && w.tryLock())判断语句就会为真,此时就会进入到代码块中执行中断方法,最终会导致自己将自己中断。   到这里,应该对Worker有了一个大致的了解,下面我们结合一个小例子来具体看一下Worker作为ThreadPoolExecutor的基本执行单位到底是如何工作的。

三、还是那个简单的小例子

  再来回顾一下上一篇文章中我们使用的小例子:

  通过这个例子我们来分析一下ThreadPoolExecutor是如何工作的。

(一)ThreadPoolExecutor的创建过程

  在上面这个例子中,创建线程池实际上调用的就是上文中提到的构造方法,这里闲鱼自己实现了一个简答的线程工厂TestThreadFactory:

代码语言:javascript
复制
public class TestThreadFactory implements ThreadFactory {

    /**
     * 姓名前缀
     */
    private final String namePrefix;

    private final AtomicInteger nextId = new AtomicInteger(0);

    public TestThreadFactory(String whatFeatureOfGroup){
        this.namePrefix = "From TestThreadFactory's " + whatFeatureOfGroup + "-Worker-";
    }

    @Override
    public Thread newThread(Runnable task) {
        String name = namePrefix
                + nextId.getAndIncrement();
        Thread thread = new Thread(null, task, name, 0);
        System.out.println(thread.getName());
        return thread;
    }
}

  在实际工作中,建议大家也可以类似去实现一个线程工厂,对于创建的每一个线程都能有一个明确的名字,这样在进行问题排查时会较为方便。其他参数都是比较基础的设置,这里就不再赘述。

(二)ThreadPoolExecutor的执行过程

  下面开始最重要的部分,线程池到底是如何执行的?   可以看到,代码中调用了这样一个方法execute(Runnable task),这个方法就是线程池任务执行方法,下面我们来具体看一下:

代码语言:javascript
复制
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

execute()方法的执行流程,一共分为三步:

  1. 首先判断当前正在运行的线程数是否少于corePoolSize,如果小于,则根据给出的command创建一个新的线程,并将其作为它的第一个任务。(此时可以认为创建的是核心线程)
  2. 如果当前运行的线程数不少于corePoolSize,那么就尝试将任务添加到阻塞队列中。如果添加成功,还会再进行一次检查是否需要创建一个新的线程。如果检查通过,同时根据workerCountOf(recheck)获得值为0,则开启一个新的线程。
  3. 当入队失败后,此时会再次尝试添加一个新的线程,如果添加失败,则根据任务给出的拒绝策略来执行reject(command)方法。

  上面主要是用于进行任务创建的控制,具体任务的创建则是通过addWorker()方法,具体方法如下:

代码语言:javascript
复制
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
	// 在这里进行线程池状态的检查和队列是否为空的检查
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker方法可以分成两部分,一部分是进行worker数量检查和增加操作,一部分是实际进行worker创建。在这两个过程中都进行了多次的线程池状态检查和线程池已运行数量检查,具体代码这里就不进行过多的解释,这里我们主要来看一下方法参数列表中的core变量。   通过代码wc >= (core ? corePoolSize : maximumPoolSize)可以发现,core变量是用于判断当前准备创建的线程是属于核心线程还是非核心线程,也就是超过corePoolSize部分的线程。   看完了线程创建部分,下面就到了线程运行。在上面的代码中,调用了t.start()方法。再往上看,我们可以发现实际上这个线程是Worker中的线程,所以最终线程运行会调用Worker中的run()方法。

代码语言:javascript
复制
public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
	// If pool is stopping, ensure thread is interrupted;
	// if not, ensure thread is not interrupted.  This
	// requires a recheck in second case to deal with
	// shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
					   // 启动任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
		  // 完成任务数加一
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

  通过上面的代码可以看到,最终任务的执行是在runWorker()方法中完成的。整体来看,runWorker()方法首先释放了锁来保证允许中断,然后在循环中进行实际的任务启动(自旋以保证任务执行)。在循环中进行任务执行时还提供了beforeExecute(wt, task)afterExecute(task, thrown)两个可自行扩展的方法,也就意味着,我们可以自行设计线程运行前和运行后需要执行的操作。在方法的最后会进行worker的退出操作。   这里需要注意的是一开始进行的解锁和后续的加锁操作。当使用Worker的构造方法进行对象创建时,此时Worker对象中同步状态state为-1。而当使用了unlock()方法之后,则会将同步状态state更改为0,此时后续的加锁操作才能继续进行。除此以外,这里进行解锁的另一个目的是为了保证其他线程在调用ThreadPoolExecutorinterruptIdleWorkers()interruptWorkers()方法能够进行中断Worker操作。而加锁操作则是为了保障任务执行的完整性。   最后会调用processWorkerExit(w, completedAbruptly)方法进行Worker退出操作。

总结

  通过上面的分析,相信大家对于线程池ThreadPoolExecutor的使用和线程池中线程的运行应该有了基本的了解。其中内部类Worker大家一定要自行对照源码和编写案例进行深入理解。   以上只是讲解了线程池运行大致的原理和闲鱼自己的理解,要想更好的使用还需要大家在项目中具体去实践。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-05-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Brucebat的伪技术鱼塘 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、前言
  • 二、ThreadPoolExecutor总览
    • (一)成员变量
      • 线程池生命周期(状态流转)
    • (二)构造函数
      • (三)几个重要的内部类
      • 三、还是那个简单的小例子
        • (一)ThreadPoolExecutor的创建过程
          • (二)ThreadPoolExecutor的执行过程
          • 总结
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档