昨天看了CountDownLatch(倒计时门栓)的源码和应用,它适用于需要一次性等待多个线程完成的场景。一旦计数器为 0,所有等待的线程会被唤醒,并且计数器无法重置。
不同于 CountDownLatch,CyclicBarrier 循环屏障在所有线程到达屏障后会“重置”并允许下一轮的同步,它适用于需要多次分阶段同步的场景。
一、CyclicBarrier简介
CyclicBarrier 中的Cyclic是可循环的,Barrier 是屏障, 荷载一起可以理解为可循环的屏障,是可以循环使用的倒计时门栓。
二、CyclicBarrier源码解读
1、构造函数
public CyclicBarrier(int parties) {
this(parties, null);
}
parties:需要等待的线程数,通常就是参与同步的线程数量。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (paramInt <= 0)
throw new IllegalArgumentException();
this.parties = parties;
this.count = paramInt;
this.barrierAction= barrierAction;
}
barrierAction:所有线程到达屏障后,执行的动作。这个动作在所有线程到达屏障后,只有一个线程会执行,通常用于一些资源的初始化或日志记录等。
2、核心成员变量
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private int count;
private Runnable barrierAction;
private int generation = 0;
private boolean broken = false;
lock: 使用 ReentrantLock 来保证线程安全。
trip: 使用 Condition 来实现线程的等待与唤醒。
parties: 需要等待的线程数。
count: 当前等待线程的计数器,初始值等于 parties。
barrierAction: 所有线程到达屏障后要执行的动作。
generation: 屏障的代数(每当屏障重置时,代数加 1)。
broken: 用于标记屏障是否处于损坏状态。
3、关键方法
await():线程调用该方法时会进入等待,直到所有线程都调用 await(),才能继续执行。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException timeoutException) {
throw new Error(timeoutException);
}
}
public int await(long paramLong, TimeUnit paramTimeUnit) throws InterruptedException, BrokenBarrierException, TimeoutException {
return dowait(true, paramTimeUnit.toNanos(paramLong));
}
private int dowait(boolean paramBoolean, long paramLong) throws InterruptedException, BrokenBarrierException, TimeoutException {
ReentrantLock reentrantLock = this.lock;
reentrantLock.lock();
try {
Generation generation = this.generation;
if (generation.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int i = --this.count;
if (i == 0) {
boolean bool = false;
try {
Runnable runnable = this.barrierCommand;
if (runnable != null)
runnable.run();
bool = true;
nextGeneration();
return 0;
} finally {
if (!bool)
breakBarrier();
}
}
while (true) {
try {
if (!paramBoolean) {
this.trip.await();
} else if (paramLong > 0L) {
paramLong = this.trip.awaitNanos(paramLong);
}
} catch (InterruptedException interruptedException) {
if (generation == this.generation && !generation.broken) {
breakBarrier();
throw interruptedException;
}
Thread.currentThread().interrupt();
}
if (generation.broken)
throw new BrokenBarrierException();
if (generation != this.generation)
return i;
if (paramBoolean && paramLong <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
reentrantLock.unlock();
}
}
这个方法是 CyclicBarrier 的核心方法。它使线程进入等待状态,直到所有线程到达屏障点。
3.1、线程计数:线程调用 await() 后,首先会减少 count 的值(通过 --count),表示一个线程已经到达屏障。
3.2、最后一个线程的处理:当 count 减到 0 时,说明所有线程都已经到达屏障,此时:
如果有 barrierAction,就执行该动作。
之后,count 被重置为 parties,表示屏障的计数器重置。
generation 增加,表示屏障进入下一代,准备下一轮同步。
通过 trip.signalAll() 唤醒所有正在等待的线程。
最后返回 0 给最后一个到达的线程。
3.3、其他线程的处理,对于不是最后到达的线程:
他们会通过 trip.await() 等待。
线程会被唤醒并继续执行,返回值是其在屏障中的索引(index)。
如果屏障被破坏(通过 isBroken() 方法检查),则会抛出 BrokenBarrierException。
reset() 方法
public void reset() {
ReentrantLock reentrantLock = this.lock;
reentrantLock.lock();
try {
breakBarrier();
nextGeneration();
} finally {
reentrantLock.unlock();
}
}
当屏障被重置时,所有参与的线程都会重新计算计数器。reset() 方法将 count 重置为 parties,并且会增加 generation 值,标志着屏障进入了新的一代。
isBroken() 方法
检查当前屏障是否处于"损坏"状态(即是否发生了异常或线程中断)。
当某个线程在执行 await() 时被中断,或者某个线程通过调用 breakBarrier() 破坏了屏障时,broken 标志位会被置为 true,此时调用 isBroken() 会返回 true,表示当前的屏障是破坏状态。
getNumberWaiting() 方法:返回当前等待的线程数。
public int getNumberWaiting() {
ReentrantLock reentrantLock = this.lock;
reentrantLock.lock();
try {
return this.parties - this.count;
} finally {
reentrantLock.unlock();
}
}
该方法返回当前正在等待的线程数量。通过 parties - count 来计算。
CyclicBarrier 的基本功能是:当线程达到某个屏障点时,都会在此点等待,直到所有参与线程都到达屏障后才会继续执行。
三、CyclicBarrier 的工作流程
当线程调用 await() 时,它会减小 count 的值。
如果当前线程是最后一个到达的线程,它会执行 barrierAction(如果提供了的话),并将 count 重置。
其他线程在 trip.await() 上等待直到所有线程都到达屏障。
一旦所有线程到达屏障,所有线程会被唤醒并继续执行。
四,最后总结
CyclicBarrier 是一种非常有用的并发工具,适用于需要协调多个线程共同完成任务的场景。它的实现是通过 ReentrantLock 和 Condition 来保证线程同步,并且设计了 "重置" 的机制,可以在每次所有线程到达屏障后重新使用。
领取专属 10元无门槛券
私享最新 技术干货