一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。
CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。
对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。
核心参数
/** 要屏障的线程数 */
private final int parties;
/* 当线程都到达barrier,运行的 barrierCommand*/
private final Runnable barrierCommand;
//-------------------------函数列表------------------------------
//构造函数,指定参与线程数
public CyclicBarrier(int parties)
//构造函数,指定参与线程数,并在所有线程到达barrier之后执行给定的barrierAction逻辑
public CyclicBarrier(int parties, Runnable barrierAction);
//等待所有的参与者到达barrier
public int await();
//等待所有的参与者到达barrier,或等待给定的时间
public int await(long timeout, TimeUnit unit);
//获取参与等待到达barrier的线程数
public int getParties();
//查询barrier是否处于broken状态
public boolean isBroken();
//重置barrier为初始状态
public void reset();
//返回等待barrier的线程数量
public int getNumberWaiting();
generation
实例。当barrier触发trip条件或重置时generation
随之改变。使用barrier时有很多generation
与线程关联,由于不确定性的方式,锁可能分配给等待的线程。但是在同一时间只有一个是活跃的generation
(通过count
变量确定),并且其余的要么被销毁,要么被trip条件等待。如果有一个中断,但没有随后的重置,就不需要有活跃的generation
。CyclicBarrier
的可重用特性就是通过Generation
来实现,每一次触发tripped都会new一个新的Generation。CyclicBarrier
的另一个特性是在所有参与线程到达barrier触发一个自定义函数,这个函数就是barrierCommand
,在CyclicBarrier
的构造函数中初始化。
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行
在CyclicBarrier中,最重要的方法就是await(),在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
await内部调用dowait()
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//独占锁
final ReentrantLock lock = this.lock;
//获取独占锁
lock.lock();
try {
//保存当前"Generation"
final Generation g = generation;
//当前generation“已损坏”,抛BrokenBarrierException
//抛该异常一般因为某个线程在等待某个处于“断开”状态的CyclicBarrier
if (g.broken)
throw new BrokenBarrierException();
//当前线程中断,通过breakBarrier终止CyclicBarrier
if (Thread.interrupted()) {
//线程被中断,终止Barrier,唤醒所有等待线程
breakBarrier();
throw new InterruptedException();
}
//计数器自减
int index = --count;
//如果计数器 == 0
//表示所有线程都已经到位,触发动作(是否执行某项任务)
if (index == 0) { // tripped
boolean ranAction = false;
try {
//barrierCommand线程要执行的任务
final Runnable command = barrierCommand;
//执行的任务!=null,执行任务
if (command != null)
command.run();
ranAction = true;
//唤醒所有等待线程,并更新generation
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//循环一直执行,直到下面三个if一个条件满足才会退出循环
//自旋等待 所有parties到达 | generation被销毁 | 线程中断 | 超时
for (;;) {
try {
//如果不是超时等待,则调用await等待
if (!timed)
trip.await();
//调用awaitNanos等待
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
//当前generation“已损坏”,抛出BrokenBarrierException异常
//抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrier
if (g.broken)
throw new BrokenBarrierException();
//generation已经更新,返回index
if (g != generation)
return index;
//“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//释放独占锁
lock.unlock();
}
}
dowait()的处理逻辑
说明:dowait()
是await()
的实现函数,它的作用就是让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。当所有parties到达barrier(count=0
),如果barrierCommand
不为空,则执行barrierCommand
。然后调用nextGeneration()
进行换代操作。
在for(;;)
自旋中。timed
是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()
进行等待;否则,调用awaitNanos()
进行超时等待。
在dowait中有Generation这样一个对象。该对象是CyclicBarrier的一个成员变量
Generation描述着CyclicBarrier的更新换代。 在CyclicBarrier中,同一批线程属于同一代。 当有parties个线程到达barrier,generation就会被更新换代。 其中broken标识该当前CyclicBarrier是否已经处于中断状态。
对于中断,CyclicBarrier是通过
实现的
在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。
在超时的判断中,CyclicBarrier根据timed的值来执行不同的wait。await、awaitNanos都是Condition中的方法。
当index = --count
等于0时,标志"有parties个线程到达barrier",临界条件到达,则执行相应的动作。执行完动作后,则调用nextGeneration
更新换代
reset
重置后使用,因此被称为是循环的barrier多线程环境计算数据,最后合并计算结果
CyclicBarrier
主要通过独占锁ReentrantLock
和Condition
配合实现。类本身实现很简单,重点是分清CyclicBarrier
和CountDownLatch
的用法及区别,还有在jdk1.7新增的另外一个与它们相似的同步锁Phaser
,在后面文章中会详细讲解。