前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >知道CountDownLatch是做什么的,那你知道它的底层是如何实现的吗?

知道CountDownLatch是做什么的,那你知道它的底层是如何实现的吗?

原创
作者头像
爪哇缪斯
发布2023-08-29 10:13:51
1620
发布2023-08-29 10:13:51
举报
文章被收录于专栏:爪哇缪斯

一、概述

CountDownLatch是一个多线程控制工具,用来控制线程的等待。设置需要countDown的数量num,然后每一个线程执行完毕后,调用countDown()方法,而主线程调用await()方法执行等待,直到num个子线程执行了countDown()方法 ,则主线程解除阻塞,开始继续执行。

其具体操作流程类似火箭发射,我们通过倒数三二一(3个子线程分别调用countDown()),那么火箭就发射升空了(主线程await()方法处就释放了阻塞,可以继续向下执行):

代码上的使用方法如下所示:

首先创建CountDownLatch实例对象,并传入需要倒数的count值; 【其次】在主线程处通过调用await()方法进行阻塞操作; 【最后】当子线程执行完某个任务之后,调用countDown()方法执行倒计时减1操作;当倒计时为0的时候,主线程解除阻塞,继续执行await()方法下面的代码逻辑;

我们以实例CountDownLatchDemo为例,看一下具体的代码实现:

二、构造函数解析

在CountDownLatch的构造函数中,我们通过指定入参count的值,来设置需要调用多少次countDown()方法才会释放对当前线程的阻塞。构造方法逻辑比较简单,如果我们设置的count值小于0,则说明是一个违规值,会随之抛出IllegalArgumentException异常;代码如下所示:

代码语言:javascript
复制
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

如果设置的count值是合法值,那么则通过setState(count)方法,将count赋值给AQS中的state变量。那么这个state值,就可以用来做倒计时的计数用了,如果为0,则表示倒计时结束,否则,则依然无法解除主线程的阻塞状态。

三、await()方法源码解析

从上面的演示示例中,我们已经看到,通过在主线程中调用countDownLatch.await()方法,使得主线程进入阻塞状态,那么其内部是如何实现的呢?我们把视角转移到await()方法中。在其方法内,只有一行代码,即,调用sync的acquireSharedInterruptibly(1)方法,此处需要额外说明一下,这个sync其实是继承了AQS类的实例对象,所以,它同时也具备了AQS的所有功能,那么从这里大家也能得出一个结论,就是CountDownLatch所具备的能力其实底层都是通过AQS实现的。代码如下所示:

代码语言:javascript
复制
public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

acquireSharedInterruptibly(...)方法中,如果发现发生过interrupt,则抛出InterruptedException异常;如果没发生过interrupt,则通过调用tryAcquireShared(arg)方法来判断是否倒计时已经结束了,如果state等于0,则表示倒计时结束了,那么该方法返回1,否则,返回-1;如果倒计时没有结束(即:tryAcquireShared(arg)返回-1),则继续执行doAcquireSharedInterruptibly(arg)方法,代码如下所示:

此处展示了tryAcquireShared(arg)方法的内部处理逻辑,即:如果state等于0,则表示倒计时结束了,那么该方法返回1,否则,返回-1;代码如下所述:

代码语言:javascript
复制
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; // 1表示倒计时结束;-1表示倒计时进行中;
}

如果倒计时没有结束,则会执行doAcquireSharedInterruptibly(-1)方法,在该方法内部主要由四部分逻辑组成,下面我们也会依次针对这些部分进行详细解析:

步骤1】创建AQS队列,将Node节点放到队列中。 【步骤2】如果倒计时未完成,则执行阻塞操作。 【步骤3】如果倒计时完成,解除阻塞操作。 【步骤4】如果存在异常发生,则对失败进行收尾工作。

3.1> 创建AQS队列

因为在上面已经说过——CountDownLatch所具备的能力其实底层都是通过AQS实现的。而AQS底层就是通过维护节点链表实现的抢锁行为,那么对于CountDownLatch我们也需要创建这样一个链表数据结构,这部分逻辑就在addWaiter(Node.SHARED)方法中。此处需要额外说明一下的就是,对于入参值Node.SHARED,仅仅是一个空属性值的Node节点。

addWaiter(...)方法内部,主要针对两种情况进行了处理:

情况1】如果链表已经创建过了,那么直接讲node放置到链表末尾即可,返回node; 【情况2】如果没创建,则创建链表,并将node插入到链表末尾,返回node;

针对enq(node)方法的内部逻辑,下图以节点数据结构进行了进一步的解释,请见下图所示:

3.2> 执行阻塞操作

当我们执行完上面的addWaiter(Node.SHARED)方法,创建了AQS队列之后,我们就开始了下面的无限for循环逻辑。在for(;;)无限循环中,会尝试获得r值,其含义如下所示:

r==1】表示state等于0,倒计时完毕。 【r==-1】表示state不等于0,倒计时还在进行中。

那么,此处我们的前提条件就是——倒计时还在进行中;所以r等于-1,无法满足下面一行的if(r>=0)的判断条件,所以,不执行该if逻辑。而是直接跳转到“步骤3:执行阻塞操作”这部分红框代码中了,具体请见下图所示:

在“步骤3:执行阻塞操作”这步骤中,主要执行了两个方法:shouldParkAfterFailedAcquire(p, node)parkAndCheckInterrupt(),下面我们就分别来分析这两个方法的具体执行过程。

shouldParkAfterFailedAcquire(p, node)方法中,会执行如下逻辑:

代码语言:javascript
复制
static final int CANCELLED =  1; /** waitStatus value to indicate thread has cancelled */
static final int SIGNAL    = -1; /** waitStatus value to indicate successor's thread needs unparking */
static final int CONDITION = -2; /** waitStatus value to indicate thread is waiting on condition */
static final int PROPAGATE = -3; /** waitStatus value to indicate the next acquireShared should unconditionally propagate */

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; 
    if (ws == Node.SIGNAL) // SIGNAL值,表示后继线程需要unparking
        return true;
    if (ws > 0) { // ws大于0,说明是CANCELLED节点,清理该节点
        do { 
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 否则,将当前节点赋值为SIGNAL
    }
    return false;
}

如果节点的waitStatus == -1】则直接返回true; 【如果节点的waitStatus > 0】说明是CANCELLED节点,那么清理该节点及所有相邻前置的CANCELLED节点,并返回false; 【如果节点的waitStatus是其他值】通过CAS将节点的waitStatus值变为-1(Node.SIGNAL),并返回false

那么由于head指针指向的Node节点waitStatus等于0,所以,第一次执行shouldParkAfterFailedAcquire(...)方法之后,head节点的waitStatus从0变为-1;那么当再次执行shouldParkAfterFailedAcquire(...)方法的时候,则满足:waitStatus == -1,直接返回true了,请见下图所示:

shouldParkAfterFailedAcquire(p, node)方法在执行第二遍之后返回了true,那么就轮到触发parkAndCheckInterrupt()方法的时刻了,它内部逻辑非常简单,就是执行了两个步骤:步骤1,调用LockSupport.park方法对当前线程进行阻塞;步骤2,解除阻塞后,如果发生了interrupt,则返回true;否则返回false;代码如下所示:

代码语言:javascript
复制
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 线程在此处被阻塞
    return Thread.interrupted(); // 如果发生了interrupt,则返回true;否则返回false
}

3.3> 解除阻塞操作

当“倒计时”结束,即:执行了足够次数的countDown()方法(此步骤会在“四、countDown()方法源码解析”章节进行介绍);则会触发解除阻塞的操作了,即:下图红框内的代码。

那么在上述红框代码中,关键的代码逻辑就是setHeadAndPropagate(node, r),其中:node为存储了当前线程的节点(即:node.thread=主线程),r等于1

setHeadAndPropagate(node, r)方法的作用是用于,请见如下源码所示:

代码语言:javascript
复制
static final Node SHARED = new Node(); // 空值节点

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // 暂存旧的头节点
    setHead(node); // 设置新的头节点
    if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;  // 获取node的下一个节点
        if (s == null || s.isShared()) // 如果node就是尾节点或者s.nextWaiter等于SHARED
            doReleaseShared();
    }
}

/**
 * 设置头节点
 */
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

/**
 * 如果node在共享模式下等待,返回true。
 */
final boolean isShared() {
    return nextWaiter == SHARED;
}

对于CountDownLatch来说,doReleaseShared()方法其实没有什么作用,因为原本链表就两个节点,一个虚拟头结点(head指针),一个是当下主线程节点(tail指针);当head指针指向下一个节点时,则head==tail,那么就会直接break跳出无限for循环(for(;;)

代码语言:javascript
复制
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 由于此时h等于tail,所以不满足if判断
        if (h != null && h != tail) { 
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) 
                    continue;         
                unparkSuccessor(h);
            }
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;               
        }

        // 由于h等于tail(可参照3.2画的链表图),跳出该方法
        if (h == head) break;
    }
}

执行完上面所说的setHeadAndPropagate(node, r)方法之后,基本就可以结束await()方法的逻辑,继续执行主线程剩下的逻辑代码了。

3.4> 针对执行失败后的收尾工作

如果顺利的解除阻塞的话,failed变量会被赋值为false,那么在finally中的cancelAcquire(node)方法则不会被调用。反之,如果failed等于true,则说明阻塞并未按照正常的unpark方式解除阻塞,即,通过异常的方式解除的阻塞,那么我们就需要执行cancelAcquire(node)方法进行失败后的收尾工作了,具体代码如下所示:

cancelAcquire方法中,尝试在AQS的队列链表中断开node节点的,即,剔除掉node节点。由于此处并非主流程,所以具体的代码和注释如下所示,就不再赘述了。

代码语言:javascript
复制
private void cancelAcquire(Node node) {
    if (node == null) // node是保存了主线程的节点,不为空
        return;

    node.thread = null; // 将node保存的线程置为空,即,丢弃之前保存的主线程
    Node pred = node.prev;
    while (pred.waitStatus > 0) // pred的waitStatus等于-1,不满足
        node.prev = pred = pred.prev;

    Node predNext = pred.next; // predNext等于null
    node.waitStatus = Node.CANCELLED;
    // node等于tail,将tail指针指向pred节点
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null); // 将p
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

四、countDown()方法源码解析

子线程通过调用countDown()方法来实现“倒计时”操作,所以,下面我们就来着重分析一下这个方法的具体执行过程,代码如下所示:

代码语言:javascript
复制
public void countDown() {
    sync.releaseShared(1);
}

releaseShared(1)方法中,首先通过tryReleaseShared(arg)进行判断,只有倒计时最后一次countDown调用才会返回true,其他情况都会返回false;而如果返回的是true,才会继续执行if方法内的逻辑,即:doReleaseShared()方法。

4.1> tryReleaseShared(arg)

在该方法内部,首先开启了无限for循环,那么首先获取了当前的倒计时总数state的值,如果等于0,则说明在本次调用countDown()方法之前,倒计时就已经结束了,则此时直接返回false

如果倒计是没有结束,则继续往下执行,先将倒计时总数减1,如果等于0,则说明本次调用countDown()方法是倒计时的最后一次,那么应该可以触发后续的解除主线程阻塞的操作了,那么此时直接返回true;但是,如果不等于0,则表示倒计时仍在继续中,则此时直接返回false;具体代码如下所示:

代码语言:javascript
复制
protected boolean tryReleaseShared(int releases) {
    for (;;) {
        int c = getState(); // 获得倒计时总数state
        if (c == 0) return false; // 如果等于0,则表示倒计时结束,返回false
        int nextc = c - 1; // 否则,倒计时总数减1
        if (compareAndSetState(c, nextc)) // 然后将最新的倒计时数,更新到state的值
            return nextc == 0; // 如果等于0,返回true;否则,返回false
    }
}

4.2> doReleaseShared()

doReleaseShared()方法中,我们要开始真正的执行解除阻塞的操作了。方法首先开启了无限for循环,然后进行了一系列的判断,对于当前AQS队列的情况,上面已经通过图的方式表现了,为了便于大家回忆,我又把它粘贴到了doReleaseShared()方法源码的下面,此时h不等于null,并且h不等于tail,并且h的waitStatus等于-1(Node.SIGNAL),所以是可以顺利执行unparkSuccessor(h)这行代码的;当解除阻塞后,此时head指针向后移动一个节点,那么在第二次循环时,由于无法满足h!=tail,则执行第14行——if(h==head) break;跳出无限循环,结束本方法了。具体代码如下所示:

unparkSuccessor(h)方法中,我们获得了head头节点的下一个节点s,即:也就是保存主线程的节点,然后针对s节点存储的thread(即:主线程)执行unpark操作。因此,主线程的阻塞被解除了。具体代码如下所示:

代码语言:javascript
复制
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus; // 此时ws等于0
    if (ws < 0) compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next; // s就是head节点的next节点,也就是保存主线程的节点

    // s不等于null,并且s.waitStatus等于0
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }

    // 针对s中存储的thread(即:主线程)执行unpark操作
    if (s != null) LockSupport.unpark(s.thread);
}

今天的文章内容就这些了:

写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享

更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、概述
  • 二、构造函数解析
    • 3.1> 创建AQS队列
      • 3.2> 执行阻塞操作
        • 3.3> 解除阻塞操作
          • 3.4> 针对执行失败后的收尾工作
          • 四、countDown()方法源码解析
            • 4.1> tryReleaseShared(arg)
              • 4.2> doReleaseShared()
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档