
Java Review - 并发编程_ CountDownLatch原理&源码剖析介绍的CountDownLatch在解决多个线程同步方面相对于调用线程的join方法已经有了不少优化,但是CountDownLatch的计数器是一次性的,也就是等到计数器值变为0后,再调用CountDownLatch的await和countdown方法都会立刻返回,这就起不到线程同步的效果了。
所以为了满足计数器可以重置的需要,JDK开发组提供了CyclicBarrier类,并且CyclicBarrier类的功能并不限于CountDownLatch的功能。从字面意思理解,CyclicBarrier是回环屏障的意思,它可以让一组线程全部达到一个状态后再全部同时执行。
这里之所以叫作回环是因为当所有等待线程执行完毕,并重置CyclicBarrier的状态后它可以被重用。
之所以叫作屏障是因为线程调用await方法后就会被阻塞,这个阻塞点就称为屏障点,等所有线程都调用了await方法后,线程们就会冲破屏障,继续向下运行。
需求如下: 使用两个线程去执行一个被分解的任务A,当两个线程把自己的任务都执行完毕后再对它们的结果进行汇总处理。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/18 15:01
* @mark: show me the code , change the world
*/
public class CycleBarrierTest {
// 创建一个CycleBarrier实例,添加一个所有子线程全部到达屏障后的执行的任务
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println(Thread.currentThread().getName() + " merge result"));
public static void main(String[] args) {
// 创建一个线程数量固定为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 将线程A 提交到线程池
executorService.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " begin to handle task1-1");
System.out.println(Thread.currentThread().getName() + " enter into cyclicBarrier");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " enter out cyclicBarrier");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
// 将线程B 提交到线程池
executorService.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " begin to handle task1-2");
System.out.println(Thread.currentThread().getName() + " enter into cyclicBarrier");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " enter out cyclicBarrier");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
// 关闭线程池
executorService.shutdown();
}
}
【每次运行的结果可能不尽相同,但核心的流程是一致的】
由此可见多个线程之间是相互等待的,假如计数器值为N,那么随后调用await方法的N-1个线程都会因为到达屏障点而被阻塞,当第N个线程调用await后,计数器值为0了,这时候第N个线程才会发出通知唤醒前面的N-1个线程。也就是当全部线程都到达屏障点时才能一块继续向下执行
对于这个例子来说,使用CountDownLatch也可以得到类似的输出结果。下面再举个例子来说明CyclicBarrier的可复用性。
需求: 假设一个任务由阶段1、阶段2和阶段3组成,每个线程要串行地执行阶段1、阶段2和阶段3,当多个线程执行该任务时,必须要保证所有线程的阶段1全部完成后才能进入阶段2执行,当所有线程的阶段2全部完成后才能进入阶段3执行。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author 小工匠
* @version 1.0
* @description: TODO
* @date 2021/12/18 15:23
* @mark: show me the code , change the world
*/
public class CycleBarrierTest2 {
// 创建一个CycleBarrier实例,添加一个所有子线程全部到达屏障后的执行的任务
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> System.out.println(Thread.currentThread().getName() + " 阶段任务全部线程执行结束....开启下一轮"));
public static void main(String[] args) {
// 创建一个线程数量固定为2的线程池
ExecutorService executorService = Executors.newFixedThreadPool(2);
// 将线程A 提交到线程池
executorService.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " execute step1");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " execute step2");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " execute step3");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
// 将线程B 提交到线程池
executorService.submit(() -> {
try {
System.out.println(Thread.currentThread().getName() + " execute step1");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " execute step2");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + " execute step3");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
// 关闭线程池
executorService.shutdown();
}
}
在如上代码中,每个子线程在执行完阶段1后都调用了await方法,等到所有线程都到达屏障点后才会一块往下执行,这就保证了所有线程都完成了阶段1后才会开始执行阶段2。然后在阶段2后面调用了await方法,这保证了所有线程都完成了阶段2后,才能开始阶段3的执行。这个功能使用单个CountDownLatch是无法完成的。

由以上类图可知:
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
} /**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
private static class Generation {
boolean broken = false;
}当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}通过源码可以知道,在内部调用了dowait方法。第一个参数为false则说明不设置超时时间,这时候第二个参数没有意义
当前线程调用CyclicBarrier的该方法时会被阻塞,直到满足下面条件之一才会返回:
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}在内部调用了dowait方法。第一个参数为true则说明设置了超时时间,这时候第二个参数是超时时间。
CyclicBarrier的核心功能
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
// 1 如果index=0说明到所有线程都到达了屏障点,此时执行初始化时执行的任务
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
// 2 执行任务
command.run();
ranAction = true;
// 3 激活其他线程因为await方法而被阻塞的线程,并重置CyclicBarrier
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 4 如果index !=0
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 5 没有设置超时时间
if (!timed)
trip.await();
// 6 设置超时时间
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
} /**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// 7 从条件队列里唤醒里面的阻塞贤臣
// signal completion of last generation
trip.signalAll();
// 8 重置CyclicBarrier
// set up next generation
count = parties;
generation = new Generation();
}以上是dowait方法的核心代码
我们这里通过Demo说明了CycleBarrier与CountDownLatch的不同在于,CycleBarrier是可以复用的,并且CycleBarrier特别适合分段任务有序执行的场景。
然后分析了CycleBarrier通过独占锁ReentrantLock实现计数器原子性更新,并使用条件变量队列来实现线程同步。