CyclicBarrier 循环屏障允许多个线程在执行某项任务时相互协调,直到它们都达到某个共同的屏障点,才能继续执行后续操作。由于其可以被重用,因此特别适用于需要分阶段的同步场景。
一、CyclicBarrier 的适用场景
分阶段的并行计算:多个线程完成一部分任务后,必须等待其他线程完成相应任务才能继续。
模拟并行计算中的同步:例如多个计算节点处理数据,之后再进行汇总或分析。
协同工作任务:多个线程必须在某个阶段同时进行某些操作。
二、并行计算任务应用
假设我们有一个计算任务,需要将任务分成若干子任务并行执行,等所有线程执行完相应的任务后,再进行汇总或整合。
1、任务分解
将一组数据分成若干部分,每个线程计算数据的一个部分。
所有线程完成计算后,进行汇总统计(比如总和、平均值等)。
然后重复这个过程,每轮汇总都需要等待其他线程完成。
2、示例代码:
import java.util.concurrent.*;
public class CyclicBarrierExample { // 数据总量和线程数 private static final int DATA_SIZE = 100; private static final int THREAD_COUNT = 5; public static void main(String[] args) throws InterruptedException, BrokenBarrierException { // 模拟的数据 int[] data = new int[DATA_SIZE]; for (int i = 0; i < DATA_SIZE; i++) { data[i] = i + 1; } // 使用 CyclicBarrier 来协调多个线程 CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, new Runnable() { @Override public void run() { System.out.println("所有线程已完成当前阶段,准备进行下一阶段处理。"); } }); // 将任务划分为 THREAD_COUNT 个子任务 ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); int chunkSize = DATA_SIZE / THREAD_COUNT; for (int i = 0; i < THREAD_COUNT; i++) { int start = i * chunkSize; int end = (i == THREAD_COUNT - 1) ? DATA_SIZE : (i + 1) * chunkSize; // 最后一个线程处理剩余数据 executor.submit(new Task(data, start, end, barrier)); } executor.shutdown(); } static class Task implements Runnable { private int[] data; private int start; private int end; private CyclicBarrier barrier; public Task(int[] data, int start, int end, CyclicBarrier barrier) { this.data = data; this.start = start; this.end = end; this.barrier = barrier; } @Override public void run() { try { // 执行计算任务:计算当前区间的和 int sum = 0; for (int i = start; i < end; i++) { sum += data[i]; } System.out.println(Thread.currentThread().getName() + " 计算的部分和: " + sum); // 等待所有线程计算完成 barrier.await(); // 这里可以执行下一阶段的操作,比如整合计算结果 // 这里只做简单的示例:输出阶段标识 System.out.println(Thread.currentThread().getName() + " 进入下一阶段。"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } } }}
3、运行结果:
pool-1-thread-2 计算的部分和: 610 pool-1-thread-5 计算的部分和: 1810 pool-1-thread-4 计算的部分和: 1410 pool-1-thread-1 计算的部分和: 210 pool-1-thread-3 计算的部分和: 1010 所有线程已完成当前阶段,准备进行下一阶段处理。 pool-1-thread-3 进入下一阶段。 pool-1-thread-2 进入下一阶段。 pool-1-thread-1 进入下一阶段。 pool-1-thread-4 进入下一阶段。 pool-1-thread-5 进入下一阶段。
4、代码解读
我们有一个包含 100 个整数的数组 data,分给制定线程数执行,每个线程负责计算数组的一部分。
使用 ExecutorService来管理线程池,创建CyclicBarrier确保线程之间的同步。
并且为屏障添加了一个Runnable,用于在所有线程都到达屏障后执行一些动作(这里简单地输出一条消息)。
每个线程计算自己负责的数据区间的和,输出部分和。
然后调用 barrier.await() 使线程在这里等待,直到所有线程都完成各自的计算。
所有线程都到达屏障后,主屏障的 Runnable 被执行,所有线程继续执行后续任务。
三、CyclicBarrier 优点
简化多线程同步逻辑:CyclicBarrier 提供了一个便捷的方式让线程同步到达指定的屏障点,避免了手动编写复杂的同步代码。
灵活的屏障动作:可以通过在构造方法中指定屏障动作(Runnable),让其在所有线程到达屏障点后执行特定任务,增强了工具的功能性。
重复使用:与 CountDownLatch 不同,CyclicBarrier 是可循环使用的。这非常适合需要线程多次同步的场景,例如分阶段的计算任务。
支持并发编程模型:使用 CyclicBarrier 可以让多个线程更有序地协同工作,尤其适合需要阶段性汇合的分布式计算。
四、CyclicBarrier 缺点
单点故障问题:如果有线程在等待屏障点时发生异常或超时,CyclicBarrier 的状态可能变为 broken(损坏),导致后续所有线程都无法正常执行。
线程阻塞等待:所有线程在 await() 方法处会阻塞等待,可能导致线程资源长时间占用,影响系统性能。
复杂性增加:尽管 CyclicBarrier 简化了同步逻辑,但对于开发者来说,理解屏障损坏(BrokenBarrierException)以及如何处理异常会增加编程复杂性。
性能开销:在高并发场景下,频繁的屏障同步会引入额外的性能开销,不适合对性能敏感的任务。
五、最后总结
CyclicBarrier是一个强大的同步工具,适用于多个线程需要在某个特定点同步的场景,尤其是当这些线程需要重复多个周期性任务时。通过使用CyclicBarrier,我们可以轻松实现并发任务的协同工作,并确保所有线程在某个屏障点达到同步。
领取专属 10元无门槛券
私享最新 技术干货