前言
" 看完 CountDownLatch 正准备表示一番,突然看到了一个 CyclicBarrier —— 回环屏障。沃特?回环还屏障?说比 CountDownLatch 要多一个回环,那咱可得瞧一瞧,看一看了! "
1
介绍
一个同步辅助,它允许一组线程的所有等待彼此达成共同屏障点。CyclicBarrier 在涉及固定线程数且必须等待彼此的程序非常有用。
该屏障被称为回环屏障 ,因为它在等待的线程被释放后可以被重新利用。
CyclicBarrier 支持一个可选的 Runnable 命令,该命令在障碍中的最后一个线程到达之后,但在释放任何线程之前,每个屏障点运行一次。
此屏障操作对于在任何一方继续之前更新共享状态很有用。
通过上面的源码注释基本可以得出以下结论:
基本使用
既然上面总结了三个结论,下面当然从三个方面演示如何使用的:
- 屏障功能
public class CyclicBarrierTest {
private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(11);
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
ExecutorService pool = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 10; i++) {
pool.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 开始执行");
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + " 执行结束,准备调用 await");
CYCLIC_BARRIER.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
System.out.println("主线程执行 —————————————— >>>");
CYCLIC_BARRIER.await();
System.out.println("主线程继续执行 —————————————— >>>");
pool.shutdown();
}
}
通过上面代码其实模拟了个类似 CountDownLatch 的功能,让所有线程等待,直到都调用 await 之后,各个线程继续执行,同时主线程也继续往下执行。
不过相对 CountDownLatch 的指定一个线程或多个等待,直到其他线程执行结束,等待的线程才继续执行来说,CyclicBarrier 相对来说还是逊色。
差别总结如下:
- 回环功能
public class CyclicBarrierTest2 {
private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(5);
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
ExecutorService pool = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1024),
new ThreadFactoryBuilder().setNameFormat("Thread-pool-%d").build(),
new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < 5; i++) {
pool.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 开始执行");
CYCLIC_BARRIER.await();
System.out.println(Thread.currentThread().getName() + " 冲破屏障 >>> 1");
CYCLIC_BARRIER.await();
System.out.println(Thread.currentThread().getName() + " 冲破屏障 >>>>> 2");
CYCLIC_BARRIER.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
pool.shutdown();
}
}
上面演示的回环的用法。
- 回环 Runnable
这块只需要在声明的 CyclicBarrier 修改为以下即可:
private static final CyclicBarrier CYCLIC_BARRIER = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("执行一次 Runnable ");
}
});
打印结果如下:
可以看出只是在下一个计数开始之前,先执行 Runnable 。至于是不是在释放屏障之前,那很容易,直接 Debug 走一遭就知道了!专门录制了个视频:
通过 debug 可以看出Runnable 会在释放线程之前执行。
问题疑问
下面就带着疑问去源码阅读,一探究竟!
2
源码分析
基本结构
通过 UML 乍一看,CyclicBarrier 和 AQS 并无什么关系,那下面开始从参数、构造器、await()方法分别看源码。
参数
public class CyclicBarrier {
/**
* 屏障的每次使用都表示为一个生成实例。
* broken 表示屏障是否被打破。
*/
private static class Generation {
boolean broken = false;
}
/** 锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 条件等待,直到屏障 */
private final Condition trip = lock.newCondition();
/** 等待计数 */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** 当前 generation 新创建的*/
private Generation generation = new Generation();
/** 仍在等待的 parties 数量,递减 为 0 会重置 */
private int count;
}
通过上面可以看出:
内部使用了一个静态类 Generation ,它有什么功能呢?通过注释了解到,每次使用屏障的时候都会生成,具体有什么用,其实就是用来标示屏障是否被打破。
内部还有一个 parties 表示等待计数,count 表示仍在等待的计数。
那就继续往下看吧!
构造器
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
这里的入参有两个:
单参数构造,其实就是将 barrierAction 赋值为 null。
await() 方法
在示例中用的 await() 方法, 那就从 await() 方法入手:
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
await() 才是重头戏, 先来根据源码注释,了解是干嘛的,看看作者怎么讲:
看到这些,咱们最想看的当然是 2.a ,等待最后一个线程到达屏障,之后所有的线程一起继续执行。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 在这里用到了这个代
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 线程终中断标示
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 对计数进行递减
int index = --count;
// 如果是 0 则
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 不是 null 先执行行为
if (command != null)
// 这里不是新开线程
command.run();
ranAction = true;
// 下一代
nextGeneration();
return 0;
} finally {
// 任务未成功时,即 ranAction 还是 false 打破屏障
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
// 自旋
for (;;) {
try {
// 没有设置超时时间
if (!timed)
// 进入等待
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
// 已经下一代了
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
这一大坨代码,完全没有看的欲望,直接划过去吧!
所以…… 直接看到了这里吧。
代码还是要阅读的,分开来看(异常流程省略):
这后面还有两个方法不能少:
private void nextGeneration() {
// 唤醒线程
trip.signalAll();
// 更新 count 为 parties
count = parties;
// 更新 Generation
generation = new Generation();
}
// 打破屏障,并唤醒全部
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
reset()
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
将屏障重置为其初始状态,reset() 方法其实还是调用的 breakBarrier() 和 nextGeneration(),前者时打破当前代,后者是开始新的一轮。
3
总结
Q: CyclicBarrier 和 AQS 有什么关系?
A: 通过阅读源码,其实发现是使用了 ReentrantLock 互斥锁 以及 Condition 的等待唤醒功能。
Q: CyclicBarrier 的实现原理是什么?
A: 内部含有两个计数,分别是 parties 和 count ,初始是二者相等,当有线程调用 await() 时,count 递减,只要 count 不为 0 , 就会阻塞线程,直到 count 递减为 0 时,此时会所有线程一起释放,同时将 count 重置为 parties。
Q: CyclicBarrier 是如何实现回环的?
A: 使用两个计数,count 递减,当 count 为 0 时,会重置为 parties,从而达到回环效果。
Q: 为什么 count 的 --count 操作没有使用 CAS?
A: 因为已经 lock.lock() 了,使用了 ReentrantLock 锁能够保证 count 的原子性。
CyclicBarrier 和 CountDownLatch 的区别
结束语
本文主要介绍了 CyclicBarrier 的常用方式,通过源码方式,分析如何达到屏障以及回环的效果。不对之处,请多指正。
相关资料
[1] Java SE API :
https://docs.oracle.com/javase/8/docs/api/?xd_co_f=7e1f6bdd-a729-472f-a82e-8db833dd2e06
- <End /> -