CyclicBarrier 是JUC 所提供的比较好用且应用面十分广泛的一个并发工具。 CyclicBarrier 字面意思:循环 屏障,也就是一种循环可使用的同步屏障,可以一组线程等待都完成的时候放行。和CountDownLatch对比来看,就是CountDownLatch等待的是外部事件,而CyclicBarrier等待一组线程。虽然看上去功能是相似的,但是实现和使用上都存在一定的差异。
CyclicBarrier(int parties)
参数parties表示需要等待的线程数量。
CyclicBarrier(int parties, Runnable barrierAction)
barrierAction表示放行时优先触发的事件。
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
CyclicBarrier c = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
System.out.println("hello EffectiveCoding");
}
});
private static CyclicBarrierTest cyclicBarrierTest = new CyclicBarrierTest();
public CyclicBarrier getC() {
return c;
}
public void demo() {
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("1 到达");
c.await();
System.out.println("1 已放行");
} catch (Exception e) {
}
}
}).start();
try {
System.out.println("2 到达");
c.await();
} catch (Exception e) {
}
System.out.println("2 已放行");
c.reset();
}
public static void main(String[] args) {
cyclicBarrierTest.demo();
System.out.println("--------------这是一条分界线-----------------");
cyclicBarrierTest.demo();
}
}
如果等待数量为3的话,则导致无法放行,因为并没有第三个线程到达。
然后看一下实现:
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;
首先CyclicBarrier以Reentrant
为核心实现,parties
为等待数量,Condition
来完成一个具体的条件队列(其中使用await方法让线程在统一条件队列等待,使用signalAll方法唤醒通过这一条件的等待线程),这里的Condition
的实现类AQS中的内部类ConditionObject。
Object中与同步锁synchronized相呼应的唤醒和阻塞的方法:wait
、notify
、notifyAll
方法,当我们使用JDK中的锁的时候也是依赖于这类操作的,而这些操作依赖于Condition实现,下面是Condition的API(相对于wait等方法更加精确了):
await():使当前线程在接到信号或被中断之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
:带有timout的await
void awaitUninterruptibly()
:使当前线程在接到信号之前一直处于等待状态。
boolean awaitUntil(Date deadline)
:使当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
void signal()
:唤醒一个等待线程。
void signalAll()
:唤醒所有等待线程。
barrierCommand
就是实例中释放时触发动作(可以理解为是一个回调操作),generation
维护了一个boolean类型的变量,count可以理解为一个计数器,每当await被调用count就会减一,到0时释放。如何hold所有需要等待的线程,Cyclicbarrier使用的Reentrantlock 来完成显示的锁的实现,
下面是dowait里面的一点显示锁的使用细节:
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
}
...
finally {
lock.unlock();
}
看一下上面已经使用到的几个函数:
构造函数:
这里不放代码都是根据所需要的功能对于前面展示的几个变量进行初始化。
Reset
函数就是将cyclicbarrier还原到初始状态
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
然后是await
函数,可以明显的看到全部依赖于dowait
方法
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
下面开始最核心的dowait函数: 先看这一段,首先是完成了当前线程及cyclicbarrier状态的判断,然后对于count进行操作,如果已经计数为零那么出发对应的事件,最后打破栅栏,完成所有线程的唤醒。可以看到任务唤醒的时机是在其他线程之前的,这一点使用的时候是需要注意下的。
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
然后是后半段代码:当某个线程执行完前半部分之后,count并没有到达0时,那么开始陷入空转检查,符合条件或者到达timeout 当到达对应条件时完成释放。
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
核心的差不多就这么多,主要倾向于原理解读,具体的细节性使用问题请参照api。