CyclicBarrier的功能与CountDownLatch类似,只是它与CountDownLatch比的优势是它能够被复用。
static class Worker extends Thread { private CyclicBarrier cyclicBarrier;
public Worker(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; }
@Override public void run() { super.run();
try { System.out.println(Thread.currentThread().getName() + "开始等待其他线程"); cyclicBarrier.await(); System.out.println(Thread.currentThread().getName() + "开始执行"); // 这里用Thread.sleep()来模拟业务处理 Thread.sleep(1000); System.out.println(Thread.currentThread().getName() + "执行完毕"); } catch (Exception e) { e.printStackTrace(); } } }
public static void main(String[] args) { int threadCount = 6; CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
for (int i = 0; i < threadCount; i++) { Worker worker = new Worker(cyclicBarrier); worker.start(); } }
结果:
Thread-0开始等待其他线程Thread-1开始等待其他线程Thread-2开始等待其他线程Thread-3开始等待其他线程Thread-4开始等待其他线程Thread-5开始等待其他线程Thread-5开始执行Thread-0开始执行Thread-1开始执行Thread-2开始执行Thread-3开始执行Thread-4开始执行Thread-5执行完毕Thread-0执行完毕Thread-1执行完毕Thread-3执行完毕Thread-4执行完毕Thread-2执行完毕
六个线程都会在await方法处等待,等到六个线程都到位后才会并发往下执行,接下来我们基于源码来分析下其内部实现原理。
// 屏障的每次使用都表示为一个Generation实例。每当障碍被触发或重置时,Generation都会发生变化// 如果有休息但没有后续的重置,则不需要活跃的Generationprivate static class Generation { boolean broken = false; }
// 防护栅栏入口的锁,非公平锁 private final ReentrantLock lock = new ReentrantLock(); // 在锁上的Condition上等待直到被触发的condition private final Condition trip = lock.newCondition(); // parties的数量 private final int parties; // 被触发时运行的命令 private final Runnable barrierCommand; // 当前generation private Generation generation = new Generation();
// 处于等待状态的parties数量。每一个generation都会将count从parties减到0。它会在开启一个新的generation或者broken时会被重置为parties private int count;
// CyclicBarrier会在当有足够数量(parties个)的线程在它上面等待时会触发,当被触发时会执行barrierAction中的内容,由进入屏障的最后一个线程执行 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; // count的初始值为parties this.count = parties; this.barrierCommand = barrierAction; } // 这个方法中只是没有所有在barrier被触发时需要执行的内容 public CyclicBarrier(int parties) { this(parties, null); }
CyclicBarrier会在当有足够数量(parties个)的线程在它上面等待时会触发,当被触发时会执行barrierAction中的内容,由进入屏障的最后一个线程执行,如果没有需要在barrier触发时执行的内容,可以传入null。
话不多说,直接上代码:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
可以看到,这个方法的主要内容都委托给了dowait方法,没有猜错的话,dowait方法的第二个值传0时表示一直阻塞,具体猜得对不对,咱们继续看dowait方法代码:
/** * Main barrier code, covering the various policies. */ private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { // 一把非公平锁,主要用于控制barrier的入口 final ReentrantLock lock = this.lock; // 上锁,会在finally中解锁 lock.lock(); try { // 拷贝到本地变量 final Generation g = generation; // 如果已经处理broken状态则抛异常,会在finally中解锁 if (g.broken) throw new BrokenBarrierException(); // 响应线程中断,如果外部调用了Thread.interrupt则这里会为true if (Thread.interrupted()) { // break 当前barrier,具体下面的的分析,注意下这里会唤醒所有处于等待状态的线程 breakBarrier(); throw new InterruptedException(); }
int index = --count; if (index == 0) { // tripped 当index为0时表明线程已经全部就位,直接触发barrier boolean ranAction = false;// 一个标识位,如果整个过程没能执行成功,则需要在finally中调用breakBarrier方法 try { // 检查有无需要在触发时执行的command final Runnable command = barrierCommand; if (command != null)// 有的话则执行,注意这里调用的是run方法,表明是在执行到index == 0成立时的这个线程中执行的,这个线程肯定是最后一个进入barrier的线程。 command.run(); // 标识为true ranAction = true; // 表明上一个周期结束,开始下一个周期,方法的具体代码见下面的分析 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } }
// loop until tripped, broken, interrupted, or timed out for (;;) {// 无限循环 try { // 如果没有时间限制则一直等待 if (!timed) // trip是一个conditon // 这个会添加一个condition状态的节点到condition队列,然后处于阻塞状态,直到被signal唤醒 trip.await(); else if (nanos > 0L) // 调用等待超时,如果超时则不再进行等待 nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 出现异常,如果还没有处理broken,则broken if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. // 尝试用interrupt来结束等待 Thread.currentThread().interrupt(); } } // 如果已经broken,则抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果g不是当前generation,证明中间generation发生了改变,这里返回进行到了哪个index,也就是还有多少线程未进入barrier,例如全部准备就绪会返回0 if (g != generation) return index; // 如果timed为true并且参数nanos也<=0则直接中断barrier if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { // 解锁 lock.unlock(); } }
/** * Sets current barrier generation as broken and wakes up everyone. * Called only while holding lock. */ private void breakBarrier() { // 设置broken状态为true generation.broken = true; // 将count的值复位为parties count = parties; // 唤醒等待状态的线程 trip.signalAll(); }
/** * Updates state on barrier trip and wakes up everyone. * Called only while holding lock. */ private void nextGeneration() { // signal completion of last generation // 唤醒所有的等待线程 trip.signalAll(); // set up next generation // 重置count的值 count = parties; // 将generation属性指向一个新的Generation,表示新的一代已经开始了 generation = new Generation(); }
java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject#awaitNanos:
这里主要总结如下几点:
最后,我们再来看一下reset方法:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { // 中止当前generation的操作,包括唤醒所有的等待线程等 breakBarrier(); // break the current generation // 开始新一轮的generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
主要做了两步:
CyclicBarrier与CountDownLatch的功能类似,但是实现是不同的,CyclicBarrier是基于ReentrantLock的非公平版本,而CountDownLatch是基于AQS的state值来实现,分为公平和非公平版本。还有一点就是CyclicBarrier是可以重用的。