JUC源码分析之CyclicBarrier简介关键方法与参数源码解析CountDownLatch和CyclicBarrier的区别与联系应用场景小结

简介

一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point)。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用。因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier。

CyclicBarrier 支持一个可选的 Runnable 命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作 很有用。

对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException(如果它们几乎同时被中断,则用 InterruptedException)以反常的方式离开。

关键方法与参数

核心参数

/** 要屏障的线程数 */
private final int parties;
/* 当线程都到达barrier,运行的 barrierCommand*/
private final Runnable barrierCommand;

//-------------------------函数列表------------------------------
//构造函数,指定参与线程数
public CyclicBarrier(int parties)
//构造函数,指定参与线程数,并在所有线程到达barrier之后执行给定的barrierAction逻辑
public CyclicBarrier(int parties, Runnable barrierAction);
//等待所有的参与者到达barrier
public int await();
//等待所有的参与者到达barrier,或等待给定的时间
public int await(long timeout, TimeUnit unit);
//获取参与等待到达barrier的线程数
public int getParties();
//查询barrier是否处于broken状态
public boolean isBroken();
//重置barrier为初始状态
public void reset();
//返回等待barrier的线程数量
public int getNumberWaiting();
  1. Generation:每个使用中的barrier都表示为一个generation实例。当barrier触发trip条件或重置时generation随之改变。使用barrier时有很多generation与线程关联,由于不确定性的方式,锁可能分配给等待的线程。但是在同一时间只有一个是活跃的generation(通过count变量确定),并且其余的要么被销毁,要么被trip条件等待。如果有一个中断,但没有随后的重置,就不需要有活跃的generationCyclicBarrier的可重用特性就是通过Generation来实现,每一次触发tripped都会new一个新的Generation
  2. barrierCommand:CyclicBarrier的另一个特性是在所有参与线程到达barrier触发一个自定义函数,这个函数就是barrierCommand,在CyclicBarrier的构造函数中初始化。

创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。

创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行

源码解析

在CyclicBarrier中,最重要的方法就是await(),在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。

await()

await内部调用dowait()

private int dowait(boolean timed, long nanos)
            throws InterruptedException, BrokenBarrierException,
            TimeoutException {
        //独占锁
        final ReentrantLock lock = this.lock;
        //获取独占锁
        lock.lock();
        try {
            //保存当前"Generation"
            final Generation g = generation;
            //当前generation“已损坏”,抛BrokenBarrierException
            //抛该异常一般因为某个线程在等待某个处于“断开”状态的CyclicBarrier
            if (g.broken)
                throw new BrokenBarrierException();

            //当前线程中断,通过breakBarrier终止CyclicBarrier
            if (Thread.interrupted()) {
                //线程被中断,终止Barrier,唤醒所有等待线程
                breakBarrier();
                throw new InterruptedException();
            }

            //计数器自减
            int index = --count;
            //如果计数器 == 0
            //表示所有线程都已经到位,触发动作(是否执行某项任务)
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //barrierCommand线程要执行的任务
                    final Runnable command = barrierCommand;
                    //执行的任务!=null,执行任务
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //唤醒所有等待线程,并更新generation
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //循环一直执行,直到下面三个if一个条件满足才会退出循环
            //自旋等待 所有parties到达 | generation被销毁 | 线程中断 | 超时
            for (;;) {
                try {
                    //如果不是超时等待,则调用await等待
                    if (!timed)
                        trip.await();
                        //调用awaitNanos等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                //当前generation“已损坏”,抛出BrokenBarrierException异常
                //抛出该异常一般都是某个线程在等待某个处于“断开”状态的CyclicBarrier
                if (g.broken)
                    throw new BrokenBarrierException();

                //generation已经更新,返回index
                if (g != generation)
                    return index;

                //“超时等待”,并且时间已到,则通过breakBarrier()终止CyclicBarrier
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            //释放独占锁
            lock.unlock();
        }
    }

dowait()的处理逻辑

  • 首先判断该barrier是否已经断开了,如果断开则抛出BrokenBarrierException异常
  • 判断计数器index是否等于0,如果等于0,则表示所有的线程准备就绪,已经到达某个公共屏障点了,barrier可以进行后续工作了(是否执行某项任务(构造函数决定));然后调用nextGeneration方法进行更新换代工作(其中会唤醒所有等待的线程);
  • 通过for循环(for(;;))使线程一直处于等待状态。直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生。

说明:dowait()await()的实现函数,它的作用就是让当前线程阻塞,直到“有parties个线程到达barrier” 或 “当前线程被中断” 或 “超时”这3者之一发生,当前线程才继续执行。当所有parties到达barrier(count=0),如果barrierCommand不为空,则执行barrierCommand。然后调用nextGeneration()进行换代操作。 在for(;;)自旋中。timed是用来表示当前是不是“超时等待”线程。如果不是,则通过trip.await()进行等待;否则,调用awaitNanos()进行超时等待。

在dowait中有Generation这样一个对象。该对象是CyclicBarrier的一个成员变量

Generation描述着CyclicBarrier的更新换代。 在CyclicBarrier中,同一批线程属于同一代。 当有parties个线程到达barrier,generation就会被更新换代。 其中broken标识该当前CyclicBarrier是否已经处于中断状态。

对于中断,CyclicBarrier是通过

breakBarrier()

实现的

在breakBarrier()中除了将broken设置为true,还会调用signalAll将在CyclicBarrier处于等待状态的线程全部唤醒。

在超时的判断中,CyclicBarrier根据timed的值来执行不同的wait。await、awaitNanos都是Condition中的方法。

index = --count等于0时,标志"有parties个线程到达barrier",临界条件到达,则执行相应的动作。执行完动作后,则调用nextGeneration更新换代

CountDownLatch和CyclicBarrier的区别与联系

  1. 作用
    • CountDownLatch的作用是允许1或n个线程等待其他线程完成执行
    • CyclicBarrier则是允许n个线程相互等待等满足一定条件之后才能继续执行后续操作
  2. 都使用计数器实现
    • CountDownLatch的计数器无法被重置,只能使用一次
    • CyclicBarrier的计数器可以被 reset重置后使用,因此被称为是循环的barrier

应用场景

多线程环境计算数据,最后合并计算结果

小结

CyclicBarrier主要通过独占锁ReentrantLockCondition配合实现。类本身实现很简单,重点是分清CyclicBarrierCountDownLatch的用法及区别,还有在jdk1.7新增的另外一个与它们相似的同步锁Phaser,在后面文章中会详细讲解。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏编舟记

Java高编译低运行错误(ConcurrentHashMap.keySet)

本地使用maven编译和运行时一切都正常,但是通过ci的方式,编译、打包、发布到部署环境,运行时抛出了一条显而易见的JDK版本的错误。

1123
来自专栏JAVA烂猪皮

Java线程池使用与原理

我们可以利用java很容易创建一个新线程,同时操作系统创建一个线程也是一笔不小的开销。所以基于线程的复用,就提出了线程池的概念,我们使用线程池创建出若干个线程,...

1091
来自专栏Java后端技术栈

Java中的锁原理、锁优化、CAS、AQS

Java编程语言允许线程访问共享变量, 为了确保共享变量能被准确和一致地更新,线程应该确保通过排他锁单独获得这个变量。Java语言提供了volatile,在某些...

2194
来自专栏Java帮帮-微信公众号-技术文章全总结

Java并发学习3【面试+工作】

ReadWriteLock是jdk5中提供的读写分离锁。读写分离锁可以有效的帮助减少锁竞争,以提升性能。用锁分离的机制来提升性能非常容易理解,比如线程A1,A2...

1384
来自专栏顶级程序员

40个多线程问题总结

源 / 架构师小秘圈 文 / 五月的仓颉 1、多线程有什么用? 一个可能在很多人看来很扯淡的一个问题:我会用多线程就好了,还管它有什么用?在我看来,这个回...

3867
来自专栏chenssy

【死磕Java并发】—–J.U.C之并发工具类:CyclicBarrier

CyclicBarrier,一个同步辅助类,在API中是这么介绍的: 它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point...

3404
来自专栏芋道源码1024

【死磕Java并发】—- J.U.C之并发工具类:CyclicBarrier

此篇博客所有源码均来自JDK 1.8 CyclicBarrier,一个同步辅助类,在API中是这么介绍的: 它允许一组线程互相等待,直到到达某个公共屏障点 (c...

3444
来自专栏架构师小秘圈

40个多线程问题总结

作者:五月的仓颉 来自:cnblogs.com/xrq730/p/5060921.html ? 1、多线程有什么用? 一个可能在很多人看来很扯淡的一个问题:我会...

3917
来自专栏微信公众号:Java团长

Java中的锁原理、锁优化、CAS、AQS

结论:如果volatile变量修饰符使用恰当的话,它比synchronized的使用和执行成本更低,因为它不会引起线程上下文的切换和调度。

911
来自专栏Java技术栈

史上最全 Java 多线程面试题及答案

这些多线程的问题,有些来源于各大网站、有些来源于自己的思考。可能有些问题网上有、可能有些问题对应的答案也有、也可能有些各位网友也都看过,但是本文写作的重心就是所...

721

扫码关注云+社区

领取腾讯云代金券