这个类使一个线程等待其他线程各自执行完毕后再执行。是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) { // 设置锁的数量 setState(count); }
int getCount() { // 获取锁的数量 return getState(); }
// 尝试获取锁 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 每释放一个count减1 for (;;) {// 无限循环 // 用AQS状态值作为锁的count值 int c = getState(); // 锁数量为0时返回false if (c == 0) return false; // 不为0,减1 int nextc = c-1; // 将最新值cas设置到state中去 if (compareAndSetState(c, nextc)) return nextc == 0; } } }
tryAcquireShared方法在调用时只有在state值为0时才会返回1,否则会一直返回-1。
private final Sync sync;
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
内部的主要工作是通过Sync来处理。
下面来看下该类的主要的两个方法,await和countDown方法。
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
内部调用的是java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireSharedInterruptibly方法:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//获取不到许可时,调用doAcquireSharedInterruptibly方法 doAcquireSharedInterruptibly(arg); }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 向AQS队列添加一个SHARED状态的节点 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) {// 无限循环 // 获取前置节点 final Node p = node.predecessor(); // 如果前置节点是头节点 if (p == head) { // 尝试获取共享许可 int r = tryAcquireShared(arg); if (r >= 0) { // 成功了则设置头节点并进行传播 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } // 判断是否需要在失败时进行park(即等待) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) // 取消获取许可 cancelAcquire(node); } }
这里有一点需要注意的是如果有new CountDownLatch(10),那么state的值就被设置为10,调用await方法的线程调用tryAcquireShared方法时会返回-1,然后进入shouldParkAfterFailedAcquire方法,线程最终会park,直到state被调用countDown方法的线程减少到0。
public void countDown() { sync.releaseShared(1); }
此时再回过头来看下releaseShared方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero // 每释放一个count减1 for (;;) {// 无限循环 // 用AQS状态值作为锁的count值 int c = getState(); // 锁数量为0时返回false if (c == 0) return false; // 不为0,减1 int nextc = c-1; // 将最新值cas设置到state中去 if (compareAndSetState(c, nextc)) return nextc == 0; } }
// 共享模式下的释放动作-表示唤醒后继节点并确保传播 private void doReleaseShared() { for (;;) {// 无限循环 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // cas头节点的waitStatus if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒后继节点 unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
public class Worker implements Runnable{
private CountDownLatch downLatch; private String name;
public Worker(CountDownLatch downLatch, String name){ this.downLatch = downLatch; this.name = name; }
public void run() { this.doWork(); try{ TimeUnit.SECONDS.sleep(new Random().nextInt(10)); }catch(InterruptedException ie){ } System.out.println(this.name + "工作结束!"); this.downLatch.countDown();
}
private void doWork(){ System.out.println(this.name + "正在工作!"); }
}
public class Boss implements Runnable {
private CountDownLatch downLatch;
public Boss(CountDownLatch downLatch){ this.downLatch = downLatch; }
public void run() { System.out.println("老板正在等所有人工作完成......"); try { this.downLatch.await(); } catch (InterruptedException e) { } System.out.println("所有人工作完成了,老板开始检查工作!"); }
}
public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(3);
Worker w1 = new Worker(latch,"zs"); Worker w2 = new Worker(latch,"ls"); Worker w3 = new Worker(latch,"ww");
Boss boss = new Boss(latch);
executor.execute(w3); executor.execute(w2); executor.execute(w1); executor.execute(boss);
executor.shutdown(); }
总共三个许可,三个worker,一个boss,每个worker工作完成之后打卡,boss一直等待直到所有worker工作完成。