首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

Java多线程并发编程CyclicBarrier循环屏障源码解读

昨天看了CountDownLatch(倒计时门栓)的源码和应用,它适用于需要一次性等待多个线程完成的场景。一旦计数器为 0,所有等待的线程会被唤醒,并且计数器无法重置。

不同于 CountDownLatch,CyclicBarrier 循环屏障在所有线程到达屏障后会“重置”并允许下一轮的同步,它适用于需要多次分阶段同步的场景。

一、CyclicBarrier简介

CyclicBarrier 中的Cyclic是可循环的,Barrier 是屏障, 荷载一起可以理解为可循环的屏障,是可以循环使用的倒计时门栓。

二、CyclicBarrier源码解读

1、构造函数

public CyclicBarrier(int parties) {

this(parties, null);

}

parties:需要等待的线程数,通常就是参与同步的线程数量。

public CyclicBarrier(int parties, Runnable barrierAction) {

if (paramInt <= 0)

throw new IllegalArgumentException();

this.parties = parties;

this.count = paramInt;

this.barrierAction= barrierAction;

}

barrierAction:所有线程到达屏障后,执行的动作。这个动作在所有线程到达屏障后,只有一个线程会执行,通常用于一些资源的初始化或日志记录等。

2、核心成员变量

private final ReentrantLock lock = new ReentrantLock();

private final Condition trip = lock.newCondition();

private final int parties;

private int count;

private Runnable barrierAction;

private int generation = 0;

private boolean broken = false;

lock: 使用 ReentrantLock 来保证线程安全。

trip: 使用 Condition 来实现线程的等待与唤醒。

parties: 需要等待的线程数。

count: 当前等待线程的计数器,初始值等于 parties。

barrierAction: 所有线程到达屏障后要执行的动作。

generation: 屏障的代数(每当屏障重置时,代数加 1)。

broken: 用于标记屏障是否处于损坏状态。

3、关键方法

await():线程调用该方法时会进入等待,直到所有线程都调用 await(),才能继续执行。

public int await() throws InterruptedException, BrokenBarrierException {

try {

return dowait(false, 0L);

} catch (TimeoutException timeoutException) {

throw new Error(timeoutException);

}

}

public int await(long paramLong, TimeUnit paramTimeUnit) throws InterruptedException, BrokenBarrierException, TimeoutException {

return dowait(true, paramTimeUnit.toNanos(paramLong));

}

private int dowait(boolean paramBoolean, long paramLong) throws InterruptedException, BrokenBarrierException, TimeoutException {

ReentrantLock reentrantLock = this.lock;

reentrantLock.lock();

try {

Generation generation = this.generation;

if (generation.broken)

throw new BrokenBarrierException();

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

int i = --this.count;

if (i == 0) {

boolean bool = false;

try {

Runnable runnable = this.barrierCommand;

if (runnable != null)

runnable.run();

bool = true;

nextGeneration();

return 0;

} finally {

if (!bool)

breakBarrier();

}

}

while (true) {

try {

if (!paramBoolean) {

this.trip.await();

} else if (paramLong > 0L) {

paramLong = this.trip.awaitNanos(paramLong);

}

} catch (InterruptedException interruptedException) {

if (generation == this.generation && !generation.broken) {

breakBarrier();

throw interruptedException;

}

Thread.currentThread().interrupt();

}

if (generation.broken)

throw new BrokenBarrierException();

if (generation != this.generation)

return i;

if (paramBoolean && paramLong <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

reentrantLock.unlock();

}

}

这个方法是 CyclicBarrier 的核心方法。它使线程进入等待状态,直到所有线程到达屏障点。

3.1、线程计数:线程调用 await() 后,首先会减少 count 的值(通过 --count),表示一个线程已经到达屏障。

3.2、最后一个线程的处理:当 count 减到 0 时,说明所有线程都已经到达屏障,此时:

如果有 barrierAction,就执行该动作。

之后,count 被重置为 parties,表示屏障的计数器重置。

generation 增加,表示屏障进入下一代,准备下一轮同步。

通过 trip.signalAll() 唤醒所有正在等待的线程。

最后返回 0 给最后一个到达的线程。

3.3、其他线程的处理,对于不是最后到达的线程:

他们会通过 trip.await() 等待。

线程会被唤醒并继续执行,返回值是其在屏障中的索引(index)。

如果屏障被破坏(通过 isBroken() 方法检查),则会抛出 BrokenBarrierException。

reset() 方法

public void reset() {

ReentrantLock reentrantLock = this.lock;

reentrantLock.lock();

try {

breakBarrier();

nextGeneration();

} finally {

reentrantLock.unlock();

}

}

当屏障被重置时,所有参与的线程都会重新计算计数器。reset() 方法将 count 重置为 parties,并且会增加 generation 值,标志着屏障进入了新的一代。

isBroken() 方法

检查当前屏障是否处于"损坏"状态(即是否发生了异常或线程中断)。

当某个线程在执行 await() 时被中断,或者某个线程通过调用 breakBarrier() 破坏了屏障时,broken 标志位会被置为 true,此时调用 isBroken() 会返回 true,表示当前的屏障是破坏状态。

getNumberWaiting() 方法:返回当前等待的线程数。

public int getNumberWaiting() {

ReentrantLock reentrantLock = this.lock;

reentrantLock.lock();

try {

return this.parties - this.count;

} finally {

reentrantLock.unlock();

}

}

该方法返回当前正在等待的线程数量。通过 parties - count 来计算。

CyclicBarrier 的基本功能是:当线程达到某个屏障点时,都会在此点等待,直到所有参与线程都到达屏障后才会继续执行。

三、CyclicBarrier 的工作流程

当线程调用 await() 时,它会减小 count 的值。

如果当前线程是最后一个到达的线程,它会执行 barrierAction(如果提供了的话),并将 count 重置。

其他线程在 trip.await() 上等待直到所有线程都到达屏障。

一旦所有线程到达屏障,所有线程会被唤醒并继续执行。

四,最后总结

CyclicBarrier 是一种非常有用的并发工具,适用于需要协调多个线程共同完成任务的场景。它的实现是通过 ReentrantLock 和 Condition 来保证线程同步,并且设计了 "重置" 的机制,可以在每次所有线程到达屏障后重新使用。

  • 发表于:
  • 原文链接https://page.om.qq.com/page/OrTlxN_BzOrqypJhJNF4Pzog0
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券