前言
" CountDownLatch 一个同步辅助工具,同样是基于 AQS 实现,本篇文件主要是介绍 CountDownLatch 的使用,以及源码。 "
1
介绍
一个同步辅助工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止
一个 CountDownLatch 初始化为给定计数。在 await 方法阻塞,调用 countDown 方法会减少计数直到达到零,此后所有等待的线程被释放,任何后续调用 await 都会立即返回。这是一次性的现象 - 计数不能复位。如果你需要一个版本重置计数,请考虑使用CyclicBarrier 。
CountDownLatch 是一种通用的同步工具,可用于多种用途。
CountDownLatch 一个有用的属性是,它不要求调用 countDown 线程等待计数到达零之前继续,它只是阻止任何线程通过await ,直到所有线程可以通过。
基本使用
在我之前 CAS 那篇文章《从JUC源码看CAS,我做了个笔记 ......》中介绍 CAS 举例时使用了 CountDownLatch,其代码如下:
public class CasTest {
private static final CountDownLatch LATCH = new CountDownLatch(10);
private static int NUM_I = 0;
private static volatile int NUM_J = 0;
private static final AtomicInteger NUM_K = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
threadPool.execute(new Runnable() {
public void run() {
for (int j = 0; j < 10000; j++) {
NUM_I++;
NUM_J++;
NUM_K.incrementAndGet();
}
LATCH.countDown();
}
});
}
LATCH.await();
System.out.println("NUM_I = " + NUM_I);
System.out.println("NUM_J = " + NUM_J);
System.out.println("NUM_K = " + NUM_K.get());
threadPool.shutdown();
}
}
简单介绍下这段代码的主要逻辑及功能:
问题疑问
2
源码分析
基本结构
通过类图可以看出,CountDownLatch 内部存在一个静态类 Sync,而 Sync 继承了 AbstractQueuedSynchronizer。具体内部是如何实现的,则下面通过源码和画图一步一步的进行介绍。
初始化
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
通过初始化构造器可以看出,在 new 创建对象时必须传递一个 int 类型的非负数。实现逻辑可以看出,是创建了一个 Sync 对象。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
}
之前在介绍 AQS 源码中已经介绍了 state 的含义,state 在不同子类中代表不同的含义。
countDown
public void countDown() {
sync.releaseShared(1);
}
递减锁存器的计数:
此处调用的是 AQS 的 releaseShard() 方法,释放共享资源。
// AQS 代码
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
在 AQS 释放共享资源方法中 tryReleaseShared(arg) 部分是在 CountDownLatch 的内部类 Sync 中实现的,代码部分如下:
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
递减更新 state ,如果 state 为 0 则返回 false,否则返回 true 。
此时再对照上面 AQS 代码,发现:如果 tryReleaseShared 返回 true ,则会唤醒后续节点开始执行操作。
所以也就是说,如果 state 不为 0,则不会唤醒后续节点,直到 state 为 0 。
await
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
导致当前线程等待,直到锁存器倒计数至零,除非线程被中断。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
AQS 定义了 tryAcquireShared 返回值分为 3 种:
其中 tryAcquireShared 同样由 CountDownLatch 的内部类 Sync 中实现,内部逻辑主要是判断 state 的值,进行返回。
在内部实现中返回的值只有 1 和 -1 ,说明在 state == 0 时,返回 1 ,即唤醒后续节点。不等于 0 时,会阻塞。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
3
总结
Q: CountDownLatch 和 AQS 有什么关系?
A: CountDownLatch 是基于 AQS 的共享模式实现的。
Q: CountDownLatch 的实现原理是什么?
A: 可以参考上面的源码解析,进行总结介绍。
CountDownLatch 是基于 AQS 共享模式实现的,在初始化时必须传入计数,该计数实际上是 AQS 的 state 值。
在 countDown 时对 state 进行递减,在 当 state 为 0 时 会唤醒 AQS 队列中的所有等待的节点 (因为是共享模式)。
而 await 方法是判断 state 的值,如果不是 0 ,则所有线程在队列中阻塞,等待唤醒。
Q: state 在代表的含义是什么?
A: 在之前介绍锁的时候已经介绍过,只不过在这里 state 又增加了一种含义。
相关资料
[1] Java SE 文档:
http://docs.oracle.com/javase/8/docs/api/
- <End /> -