CountDownLatch是一个多线程控制工具,用来控制线程的等待。设置需要countDown的数量num
,然后每一个线程执行完毕后,调用countDown()
方法,而主线程调用await()
方法执行等待,直到num个子线程执行了countDown()方法 ,则主线程解除阻塞,开始继续执行。
其具体操作流程类似火箭发射,我们通过倒数三二一(3个子线程分别调用countDown()
),那么火箭就发射升空了(主线程await()方法处就释放了阻塞,可以继续向下执行
):
代码上的使用方法如下所示:
【首先】
创建CountDownLatch实例对象
,并传入需要倒数的count
值; 【其次】在主线程处通过调用await()
方法进行阻塞操作; 【最后】当子线程执行完某个任务之后,调用countDown()
方法执行倒计时减1操作;当倒计时为0的时候,主线程解除阻塞
,继续执行await()方法下面的代码逻辑;
我们以实例CountDownLatchDemo为例,看一下具体的代码实现:
在CountDownLatch的构造函数中,我们通过指定入参count
的值,来设置需要调用多少次countDown()
方法才会释放对当前线程的阻塞。构造方法逻辑比较简单,如果我们设置的count值小于0,则说明是一个违规值,会随之抛出IllegalArgumentException
异常;代码如下所示:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
如果设置的count值是合法值,那么则通过setState(count)
方法,将count赋值给AQS中的state变量。那么这个state值,就可以用来做倒计时的计数用了,如果为0,则表示倒计时结束,否则,则依然无法解除主线程的阻塞状态。
三、await()方法源码解析
从上面的演示示例中,我们已经看到,通过在主线程中调用countDownLatch.await()
方法,使得主线程进入阻塞状态,那么其内部是如何实现的呢?我们把视角转移到await()方法中。在其方法内,只有一行代码,即,调用sync的acquireSharedInterruptibly(1)
方法,此处需要额外说明一下,这个sync其实是继承了AQS类的实例对象,所以,它同时也具备了AQS的所有功能,那么从这里大家也能得出一个结论,就是CountDownLatch所具备的能力其实底层都是通过AQS实现的。代码如下所示:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
在acquireSharedInterruptibly(...)
方法中,如果发现发生过interrupt,则抛出InterruptedException
异常;如果没发生过interrupt,则通过调用tryAcquireShared(arg)方法来判断是否倒计时已经结束了,如果state等于0,则表示倒计时结束了,那么该方法返回1,否则,返回-1;如果倒计时没有结束(即:tryAcquireShared(arg)返回-1),则继续执行doAcquireSharedInterruptibly(arg)
方法,代码如下所示:
此处展示了tryAcquireShared(arg)
方法的内部处理逻辑,即:如果state
等于0,则表示倒计时结束了,那么该方法返回1
,否则,返回-1
;代码如下所述:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; // 1表示倒计时结束;-1表示倒计时进行中;
}
如果倒计时没有结束,则会执行doAcquireSharedInterruptibly(-1)
方法,在该方法内部主要由四部分逻辑组成,下面我们也会依次针对这些部分进行详细解析:
【步骤1】创建AQS队列,将Node节点放到队列中。 【步骤2】如果倒计时未完成,则执行阻塞操作。 【步骤3】如果倒计时完成,解除阻塞操作。 【步骤4】如果存在异常发生,则对失败进行收尾工作。
因为在上面已经说过——CountDownLatch所具备的能力其实底层都是通过AQS实现的。而AQS底层就是通过维护节点链表实现的抢锁行为,那么对于CountDownLatch我们也需要创建这样一个链表数据结构,这部分逻辑就在addWaiter(Node.SHARED)
方法中。此处需要额外说明一下的就是,对于入参值Node.SHARED,仅仅是一个空属性值的Node节点。
在addWaiter(...)
方法内部,主要针对两种情况进行了处理:
【情况1】如果链表已经创建过了,那么直接讲node放置到链表末尾即可,返回node; 【情况2】如果没创建,则创建链表,并将node插入到链表末尾,返回node;
针对enq(node)
方法的内部逻辑,下图以节点数据结构进行了进一步的解释,请见下图所示:
当我们执行完上面的addWaiter(Node.SHARED)
方法,创建了AQS队列之后,我们就开始了下面的无限for循环逻辑。在for(;;)
无限循环中,会尝试获得r
值,其含义如下所示:
【r==1】表示
state
等于0,倒计时完毕。 【r==-1】表示state
不等于0,倒计时还在进行中。
那么,此处我们的前提条件就是——倒计时还在进行中
;所以r等于-1,无法满足下面一行的if(r>=0)
的判断条件,所以,不执行该if逻辑。而是直接跳转到“步骤3:执行阻塞操作”这部分红框代码中了,具体请见下图所示:
在“步骤3:执行阻塞操作”这步骤中,主要执行了两个方法:shouldParkAfterFailedAcquire(p, node)
和parkAndCheckInterrupt()
,下面我们就分别来分析这两个方法的具体执行过程。
在shouldParkAfterFailedAcquire(p, node)
方法中,会执行如下逻辑:
static final int CANCELLED = 1; /** waitStatus value to indicate thread has cancelled */
static final int SIGNAL = -1; /** waitStatus value to indicate successor's thread needs unparking */
static final int CONDITION = -2; /** waitStatus value to indicate thread is waiting on condition */
static final int PROPAGATE = -3; /** waitStatus value to indicate the next acquireShared should unconditionally propagate */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // SIGNAL值,表示后继线程需要unparking
return true;
if (ws > 0) { // ws大于0,说明是CANCELLED节点,清理该节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 否则,将当前节点赋值为SIGNAL
}
return false;
}
【如果节点的waitStatus == -1】则直接返回
true
; 【如果节点的waitStatus > 0】说明是CANCELLED节点,那么清理该节点及所有相邻前置的CANCELLED节点,并返回false
; 【如果节点的waitStatus是其他值】通过CAS将节点的waitStatus值变为-1(Node.SIGNAL
),并返回false
;
那么由于head指针指向的Node节点waitStatus
等于0,所以,第一次执行shouldParkAfterFailedAcquire(...)
方法之后,head节点的waitStatus
从0变为-1;那么当再次执行shouldParkAfterFailedAcquire(...)
方法的时候,则满足:waitStatus == -1,直接返回true了,请见下图所示:
当shouldParkAfterFailedAcquire(p, node)
方法在执行第二遍之后返回了true,那么就轮到触发parkAndCheckInterrupt()
方法的时刻了,它内部逻辑非常简单,就是执行了两个步骤:步骤1,调用LockSupport.park
方法对当前线程进行阻塞;步骤2,解除阻塞后,如果发生了interrupt,则返回true;否则返回false;代码如下所示:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 线程在此处被阻塞
return Thread.interrupted(); // 如果发生了interrupt,则返回true;否则返回false
}
当“倒计时”结束,即:执行了足够次数的countDown()方法(此步骤会在“四、countDown()方法源码解析
”章节进行介绍);则会触发解除阻塞的操作了,即:下图红框内的代码。
那么在上述红框代码中,关键的代码逻辑就是setHeadAndPropagate(node, r)
,其中:node为存储了当前线程的节点(即:node.thread=主线程
),r等于1
;
setHeadAndPropagate(node, r)
方法的作用是用于,请见如下源码所示:
static final Node SHARED = new Node(); // 空值节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 暂存旧的头节点
setHead(node); // 设置新的头节点
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
Node s = node.next; // 获取node的下一个节点
if (s == null || s.isShared()) // 如果node就是尾节点或者s.nextWaiter等于SHARED
doReleaseShared();
}
}
/**
* 设置头节点
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* 如果node在共享模式下等待,返回true。
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
对于CountDownLatch来说,doReleaseShared()
方法其实没有什么作用,因为原本链表就两个节点,一个虚拟头结点(head指针),一个是当下主线程节点(tail指针);当head指针指向下一个节点时,则head==tail
,那么就会直接break跳出无限for循环(for(;;)
)
private void doReleaseShared() {
for (;;) {
Node h = head;
// 由于此时h等于tail,所以不满足if判断
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue;
}
// 由于h等于tail(可参照3.2画的链表图),跳出该方法
if (h == head) break;
}
}
执行完上面所说的setHeadAndPropagate(node, r)
方法之后,基本就可以结束await()方法的逻辑,继续执行主线程剩下的逻辑代码了。
如果顺利的解除阻塞的话,failed变量会被赋值为false,那么在finally中的cancelAcquire(node)
方法则不会被调用。反之,如果failed等于true,则说明阻塞并未按照正常的unpark方式解除阻塞,即,通过异常的方式解除的阻塞,那么我们就需要执行cancelAcquire(node)
方法进行失败后的收尾工作了,具体代码如下所示:
在cancelAcquire
方法中,尝试在AQS的队列链表中断开node节点的,即,剔除掉node节点。由于此处并非主流程,所以具体的代码和注释如下所示,就不再赘述了。
private void cancelAcquire(Node node) {
if (node == null) // node是保存了主线程的节点,不为空
return;
node.thread = null; // 将node保存的线程置为空,即,丢弃之前保存的主线程
Node pred = node.prev;
while (pred.waitStatus > 0) // pred的waitStatus等于-1,不满足
node.prev = pred = pred.prev;
Node predNext = pred.next; // predNext等于null
node.waitStatus = Node.CANCELLED;
// node等于tail,将tail指针指向pred节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); // 将p
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
子线程通过调用countDown()
方法来实现“倒计时”操作,所以,下面我们就来着重分析一下这个方法的具体执行过程,代码如下所示:
public void countDown() {
sync.releaseShared(1);
}
在releaseShared(1)
方法中,首先通过tryReleaseShared(arg)
进行判断,只有倒计时最后一次countDown调用才会返回true,其他情况都会返回false;而如果返回的是true,才会继续执行if方法内的逻辑,即:doReleaseShared()
方法。
在该方法内部,首先开启了无限for循环,那么首先获取了当前的倒计时总数state的值,如果等于0,则说明在本次调用countDown()
方法之前,倒计时就已经结束了,则此时直接返回false;
如果倒计是没有结束,则继续往下执行,先将倒计时总数减1,如果等于0,则说明本次调用countDown()
方法是倒计时的最后一次,那么应该可以触发后续的解除主线程阻塞的操作了,那么此时直接返回true;但是,如果不等于0,则表示倒计时仍在继续中,则此时直接返回false;具体代码如下所示:
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState(); // 获得倒计时总数state
if (c == 0) return false; // 如果等于0,则表示倒计时结束,返回false
int nextc = c - 1; // 否则,倒计时总数减1
if (compareAndSetState(c, nextc)) // 然后将最新的倒计时数,更新到state的值
return nextc == 0; // 如果等于0,返回true;否则,返回false
}
}
在doReleaseShared()
方法中,我们要开始真正的执行解除阻塞的操作了。方法首先开启了无限for循环,然后进行了一系列的判断,对于当前AQS队列的情况,上面已经通过图的方式表现了,为了便于大家回忆,我又把它粘贴到了doReleaseShared()
方法源码的下面,此时h不等于null,并且h不等于tail,并且h的waitStatus等于-1(Node.SIGNAL),所以是可以顺利执行unparkSuccessor(h)
这行代码的;当解除阻塞后,此时head指针向后移动一个节点,那么在第二次循环时,由于无法满足h!=tail,则执行第14行——if(h==head) break;
跳出无限循环,结束本方法了。具体代码如下所示:
在unparkSuccessor(h)
方法中,我们获得了head头节点的下一个节点s,即:也就是保存主线程的节点,然后针对s节点存储的thread(即:主线程)执行unpark操作。因此,主线程的阻塞被解除了。具体代码如下所示:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus; // 此时ws等于0
if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // s就是head节点的next节点,也就是保存主线程的节点
// s不等于null,并且s.waitStatus等于0
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 针对s中存储的thread(即:主线程)执行unpark操作
if (s != null) LockSupport.unpark(s.thread);
}
今天的文章内容就这些了:
写作不易,笔者几个小时甚至数天完成的一篇文章,只愿换来您几秒钟的 点赞 & 分享 。
更多技术干货,欢迎大家关注公众号“爪哇缪斯” ~ \(^o^)/ ~ 「干货分享,每天更新」
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。