专栏首页EffectiveCodingJava Concurrent CyclicBarrier

Java Concurrent CyclicBarrier

前言

CyclicBarrier 是JUC 所提供的比较好用且应用面十分广泛的一个并发工具。 CyclicBarrier 字面意思:循环 屏障,也就是一种循环可使用的同步屏障,可以一组线程等待都完成的时候放行。和CountDownLatch对比来看,就是CountDownLatch等待的是外部事件,而CyclicBarrier等待一组线程。虽然看上去功能是相似的,但是实现和使用上都存在一定的差异。

先看一下使用的demo

构造函数:

CyclicBarrier(int parties) 参数parties表示需要等待的线程数量。 CyclicBarrier(int parties, Runnable barrierAction) barrierAction表示放行时优先触发的事件。

demo

import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
    CyclicBarrier c = new CyclicBarrier(2, new Runnable() {
        @Override
        public void run() {
            System.out.println("hello EffectiveCoding");
        }
    });
    private static CyclicBarrierTest cyclicBarrierTest = new CyclicBarrierTest();
    public CyclicBarrier getC() {
        return c;
    }
    public void demo() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("1 到达");
                    c.await();
                    System.out.println("1 已放行");
                } catch (Exception e) {
                }
            }
        }).start();
        try {
            System.out.println("2 到达");
            c.await();
        } catch (Exception e) {
        }
        System.out.println("2 已放行");
        c.reset();
    }
    public static void main(String[] args) {
        cyclicBarrierTest.demo();
        System.out.println("--------------这是一条分界线-----------------");
        cyclicBarrierTest.demo();
    }
}

如果等待数量为3的话,则导致无法放行,因为并没有第三个线程到达。

实现

然后看一下实现:

private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private final int parties;
private final Runnable barrierCommand;
private Generation generation = new Generation();
private int count;

首先CyclicBarrier以Reentrant为核心实现,parties为等待数量,Condition来完成一个具体的条件队列(其中使用await方法让线程在统一条件队列等待,使用signalAll方法唤醒通过这一条件的等待线程),这里的Condition的实现类AQS中的内部类ConditionObject。

Condition

Object中与同步锁synchronized相呼应的唤醒和阻塞的方法:waitnotifynotifyAll 方法,当我们使用JDK中的锁的时候也是依赖于这类操作的,而这些操作依赖于Condition实现,下面是Condition的API(相对于wait等方法更加精确了): await():使当前线程在接到信号或被中断之前一直处于等待状态。 boolean await(long time, TimeUnit unit):带有timout的await void awaitUninterruptibly():使当前线程在接到信号之前一直处于等待状态。 boolean awaitUntil(Date deadline):使当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。 void signal():唤醒一个等待线程。 void signalAll():唤醒所有等待线程。

回到正文

barrierCommand 就是实例中释放时触发动作(可以理解为是一个回调操作),generation维护了一个boolean类型的变量,count可以理解为一个计数器,每当await被调用count就会减一,到0时释放。如何hold所有需要等待的线程,Cyclicbarrier使用的Reentrantlock 来完成显示的锁的实现, 下面是dowait里面的一点显示锁的使用细节:

final ReentrantLock lock = this.lock;
lock.lock();
try {
    final Generation g = generation;
}
...
finally {
    lock.unlock();
}

看一下上面已经使用到的几个函数: 构造函数: 这里不放代码都是根据所需要的功能对于前面展示的几个变量进行初始化。 Reset函数就是将cyclicbarrier还原到初始状态

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // break the current generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }

然后是await函数,可以明显的看到全部依赖于dowait方法

public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

下面开始最核心的dowait函数: 先看这一段,首先是完成了当前线程及cyclicbarrier状态的判断,然后对于count进行操作,如果已经计数为零那么出发对应的事件,最后打破栅栏,完成所有线程的唤醒。可以看到任务唤醒的时机是在其他线程之前的,这一点使用的时候是需要注意下的。

final ReentrantLock lock = this.lock;
lock.lock();
try {
    final Generation g = generation;
    if (g.broken)
        throw new BrokenBarrierException();
    if (Thread.interrupted()) {
        breakBarrier();
        throw new InterruptedException();
    }
    int index = --count;
    if (index == 0) {  // tripped
        boolean ranAction = false;
        try {
            final Runnable command = barrierCommand;
            if (command != null)
                command.run();
            ranAction = true;
            nextGeneration();
            return 0;
        } finally {
            if (!ranAction)
                breakBarrier();
        }
    }

然后是后半段代码:当某个线程执行完前半部分之后,count并没有到达0时,那么开始陷入空转检查,符合条件或者到达timeout 当到达对应条件时完成释放。

// loop until tripped, broken, interrupted, or timed out
    for (;;) {
        try {
            if (!timed)
                trip.await();
            else if (nanos > 0L)
                nanos = trip.awaitNanos(nanos);
        } catch (InterruptedException ie) {
            if (g == generation && ! g.broken) {
                breakBarrier();
                throw ie;
            } else {
                // We're about to finish waiting even if we had not
                // been interrupted, so this interrupt is deemed to
                // "belong" to subsequent execution.
                Thread.currentThread().interrupt();
            }
        }
        if (g.broken)
            throw new BrokenBarrierException();
        if (g != generation)
            return index;
        if (timed && nanos <= 0L) {
            breakBarrier();
            throw new TimeoutException();
        }
    }
} finally {
    lock.unlock();
}

核心的差不多就这么多,主要倾向于原理解读,具体的细节性使用问题请参照api。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Go 内存管理 -- 内存分配 二

    首先会申请一段连续的内存空间以供使用,大小(64位机器上)512M(spans_mapped)+16G(bitmap_mapped)+512G(arena)。 ...

    邹志全
  • Go 并发实战 -- sync WaitGroup

    waitgroup也是一个非常有用的并发工具,有点像是Java中的CyclicBarrier,只不过Go中的WaitGroup等待的是协程而已。 通常来说,W...

    邹志全
  • Go 语言进阶--基础概念

    本系列文章到现在已经将Go非常基础的部分介绍完成了,后面就开始设计非常具有Go特色的内容了,因为之后会出现一系列的名词和概念,为了方便本篇先把这些基础概念和我的...

    邹志全
  • Android中使用Contentprovider导致进程被杀死

    Contentprovider也是四大组件之一,支持跨进程调用,因此肯定会用到IPC的Binder机制来实现跨进程调用,在应用层就是AIDL

    大大大大大先生
  • dubbo源码分析之filter加载机制

    然后在doExportUrlsFor1Protocol方法中会进行url的拼接操作,将一些参数拼接到url的后面,形式为ip:port/com.rt.Servi...

    开发架构二三事
  • 【程序源代码】python贪吃蛇小游戏

    在网络还不发达,没有平板电脑和手机的童年;电视机里的动画片和小游戏曾经陪伴我们度过了欢乐的时光.扫雷、贪吃蛇、俄罗斯方块、58坦克大战、超级玛丽、魂斗罗...每...

    程序源代码
  • tomcat源码解读三(2) tomcat中JMX的源码分析

         在这里我是将tomcat中的jmx给拆分出来进行单独分析,希望通过此种方式能够尽可能的出现更多的问题,以便对其有更多的了解,首先需要声明的是tomca...

    cfs
  • MyBatis <set>标签的使用

    MyBatis在生成update语句时若使用if标签,如果前面的if没有执行,则可能导致有多余逗号的错误。 使用set标签可以将动态的配置SET 关键字,和剔...

    唐怀瑟
  • 【批处理学习笔记】第十五课:语句结构(1)

        类似于C语言,批处理也有它的语句结构。批处理的语句结构主要有选择结构(if语句)、循环结构(for语句)等。   if语句(选择结构)     if语...

    Angel_Kitty
  • Spring 源码解读第七弹!bean 标签的解析

    松哥原创的 Spring Boot 视频教程已经杀青,感兴趣的小伙伴戳这里-->Spring Boot+Vue+微人事视频教程

    江南一点雨

扫码关注云+社区

领取腾讯云代金券