CountDownLatch是我们常用的并发工具,主要用于倒数计数等场景,如在zookeeper连接管理中用于初始化连接数。CountDownlatch是AbstractQueueSynchronizer的共享模式实现。实际上,我们可以理解AQS为什么没有将所有方法定义为abstract方法,这是因为子类可以根据共享还是独占模式来自由选择需要实现的方法。 CountDownLatch的类结构如下:
翻译部分: CountDownLatch是一种同步工具,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成为止。 CountDownLatch使用count进行初始化,await方法将当前线程阻塞,直到调用countDown方法而导致当前计数器归零,此后所有的线程均被释放,并且await的后续调用将立即返回。这是一种一次性的现象,计数器无法重置,如果需要用于重置的计数器版本,请考虑使用CyclicBarrier。 CountDownLatch是一种多功能的同步工具,可以用于多种用途,当CountDownLatch初始化为1时,用于一个简单的on/off开关。所有调用await的线程在await等待,直到被调用countDown,初始化为N的CountDownLatch可用于使一个线程等待直到N线程已完成某项操作或者某项操作已完成N次。 CountDownLatch的一个有用的属性是,它不需要调用countDown的线程在继续计数之前就等待计数值到达0,它只是防止任何线程经过await直到所有的线程都可以通过。 用法示例: 这是一组类,其中一组线程使用两个CountDownLatch。 第一个是启动信号,可防止任何worker继续前进,直到driver都完成准备为止。第二个是完成信号,允许driver等到所有worker都完成为止。
class Driver { // ...
void main() throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
for (int i = 0; i < N; ++i) // create and start threads
new Thread(new Worker(startSignal,
doSomethingElse(); // don't let run yet
startSignal.countDown(); // let all threads proceed
doSomethingElse();
doneSignal.await(); // wait for all to finish
}
}
class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
doWork();
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}}
另外一个典型的用法是将问题分为N个部分,用Runnable描述每个部分,该Runnable执行该部分并在CountDownLatch中递减计数,并将所有的Runnable排队给执行程序。当所有的子部分都执行完成时,协调线程将能够通过等待。当线程必须以此方法反复递减计数器时,请使用CyclicBarrier。
class Driver2 { // ...
void main() throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(N);
Executor e = ...
for (int i = 0; i < N; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
}
}
class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
try {
doWork(i);
doneSignal.countDown();
} catch (InterruptedException ex) {} // return;
}
void doWork() { ... }
}}
内存一致性分析,在计数器到达0之前,在调用CountDown()之前在线程中执行操作happen-before于在另外一个线程中await()成功返回的动作。
这是AQS的实现类,主要实现了两个方法,tryAcquireShared和tryReleaseShared。
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//根据state是否为0来决定此方法的返回,这是因为,如果为0,则说明条件已经达到,await的方法阻塞的线程需要被唤醒,反之则说明条件没有达到,后续线程需要继续WAIT。这说明tryAcquireShared方法是一个控制AQS在共享模式下后续运行的方法。
protected int tryAcquireShared(int acquires) {
//判断state是否为0,如果为0则返回1,反之返回-1
return (getState() == 0) ? 1 : -1;
}
//释放
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//死循环
for (;;) {
//拿到state
int c = getState();
//判断是否为0 如果为0 返回false 即倒计时器不能再减
if (c == 0)
return false;
//计算减1之后的值
int nextc = c-1;
//cas修改这个值
if (compareAndSetState(c, nextc))
//只有当减去之后为0 则返回true 其他情况都返回false
return nextc == 0;
}
}
}
Sync类只重写了两个方法,tryAcquireShared与tryReleaseShared。 tryAcquireShared是用户控制acquire流程的方法,此处将其重写为当state为0时返回1,否则返回-1.只有当doAcquireShared小于0的时候才会执行doAcquireSharedInterruptibly或者doAcquireShared。也就是说,当state不为0的时候,就可以将调用countDownLatch的await方法的线程进行阻塞。 tryReleaseShared方法则是设计为countDown方法所使用。当state调用countDown之后减1为0 则返回true。这样才会执行doReleaseShared方法,将前面阻塞的Node的线程都唤醒。
此方法用于初始化CountDownLatch。
public CountDownLatch(int count) {
//校验count不能为负数。
if (count < 0) throw new IllegalArgumentException("count < 0");
//之后new Sync
this.sync = new Sync(count);
}
count的值就是AQS中state的值。
此处调用acquireSharedInterruptibly方法。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
acquireSharedInterruptibly方法除了一开始判断中断状态之外,实际上调用的是前面重写的tryAcquireShared方法。
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
此方法与前面的同理,只是加入了超时时间。
public void countDown() {
sync.releaseShared(1);
}
此方法调用前面重写的releaseShared,当减1之后值为0的时候,就调用doReleaseShard,将wait的线程唤醒。
public long getCount() {
return sync.getCount();
}
这个getCount主要是返回state的值。此方法也重写在Sync中了。
int getCount() {
return getState();
}
CountDownLatch是在AQS基础之上实现的一个倒计时器,这个类先初始化count,之后在state不为0的时候将调用await的线程阻塞,之后当其他线程调用countDown的时候,回逐渐将state减少,直到state为0的时候,将之前被阻塞的线程唤醒。 这是AQS基于共享模式的一种实现,所谓共享模式就是对于AQS的操作,不关心state为非0的时候,获得资源的线程究竟是谁,只用关心state的状态。如果是独占模式,则除了关心state的状态之外,还需要关心获得阻塞的线程是谁。 也就是说,ReentrantLock,如果一旦被线程持有,那么其他任何线程在想要获得锁的时候,就会返回失败,只有这个线程自己可以再次重入继续获得锁,从而将链表增加。而共享模式则只用关心state的状态。