CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行。
CountDownLatch使用一个计数器count实现,构建CountDownLatch时需要使用给定的count初始化CountDownLatch。在count到达0之前,调用await()方法的线程将一直阻塞,当count到达0时,会唤醒所有阻塞的线程。注意:计数器count无法被重置,即只能实现一次这种功能,这也是CountDownLatch与CyclicBarrier的区别。
内存一致性效果:线程中调用 countDown() 之前的操作 happen-before 紧跟在从另一个线程中对应 await() 成功返回的操作。
CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。
提供的方法:
1 //使当前线程阻塞直到计数器count变为0,除非被中断
2 public void await() throws InterruptedException
3 //使当前线程阻塞直到计数器count变为0,除非被中断或超过了指定时间
4 public boolean await(long timeout, TimeUnit unit) throws InterruptedException
5 //将计数器count递减,若count变为0则唤醒所有等待的线程
6 public void countDown()
7 //返回计数器count值
8 public long getCount()
使用示例:Driver主线程中控制N个Worker线程的启动,并等待所有Worker线程完成再退出。
1 class Driver { // ...
2 void main() throws InterruptedException {
3 CountDownLatch startSignal = new CountDownLatch(1); //启动信号
4 CountDownLatch doneSignal = new CountDownLatch(N); //完成信号
5
6 for (int i = 0; i < N; ++i) // create and start threads
7 new Thread(new Worker(startSignal, doneSignal)).start();
8
9 doSomethingElse(); // don't let run yet
10 //将启动信号的计数器置为0,启动等待的Worker线程
11 startSignal.countDown(); // let all threads proceed
12 doSomethingElse();
13 //等待N个Worker线程完成
14 doneSignal.await(); // wait for all to finish
15 }
16 }
17
18 class Worker implements Runnable {
19 private final CountDownLatch startSignal;
20 private final CountDownLatch doneSignal;
21 Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
22 this.startSignal = startSignal;
23 this.doneSignal = doneSignal;
24 }
25 public void run() {
26 try {
27 startSignal.await(); //等待启动信号
28 doWork();
29 doneSignal.countDown(); //发出完成信号,将完成信号的计数器减1
30 } catch (InterruptedException ex) {} // return;
31 }
32
33 void doWork() { ... }
34 }
CountDownLatch基于AQS实现,使用AQS的同步状态state表示计数器count。
先看一下CountDownLatch的内部类Syns的实现:
1 private static final class Sync extends AbstractQueuedSynchronizer {
2 private static final long serialVersionUID = 4982264981922014374L;
3
4 Sync(int count) {
5 setState(count); //设置同步状态state为count
6 }
7
8 int getCount() {
9 return getState(); //查询同步状态
10 }
11 //重写AQS的共享式获取同步状态的方法。当state=0时返回1,获取成功;当state=1时返回-1,获取失败。
12 protected int tryAcquireShared(int acquires) {
13 return (getState() == 0) ? 1 : -1;
14 }
15 //重写AQS的共享式释放同步状态的方法。基于自旋CAS递减同步状态
16 protected boolean tryReleaseShared(int releases) {
17 // Decrement count; signal when transition to zero
18 //如果state=0,那么直接返回false
19 //如果state>0,那么递减state。若更新后的state=0则返回true,释放同步状态成功;反之,返回false。
20 for (;;) {
21 int c = getState();
22 if (c == 0)
23 return false;
24 int nextc = c-1;
25 if (compareAndSetState(c, nextc))
26 return nextc == 0;
27 }
28 }
29 }
由Sync源码可以看出,CountDownLatch基于AQS的共享式获取和释放同步状态的机制实现。
await()
1 //调用AQS提供的共享式可中断获取同步状态方法。
2 //若获取成功(state=0),继续执行后续代码;否则(state>0),阻塞当前线程。
3 public void await() throws InterruptedException {
4 sync.acquireSharedInterruptibly(1);
5 }
countDown()
1 //调用AQS提供的共享式释放同步状态方法。
2 //若释放成功(tryReleaseShared返回true),唤醒同步队列上的后继节点;若释放失败(tryReleaseShared返回false),不做任何操作。
3 public void countDown() {
4 sync.releaseShared(1);
5 }