首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java并发之CyclicBarrier-栅栏详解

Java并发之CyclicBarrier-栅栏详解

作者头像
胖虎
发布2019-06-26 17:08:57
8791
发布2019-06-26 17:08:57
举报
文章被收录于专栏:晏霖晏霖

前言

上一篇我们介绍了CountDownLatch,和我今天要说的栅栏CyclicBarrier有相似之处,笔者英语烂,给读者翻译成读音:塞克勒柏瑞尔,莫笑。它允许一组线程互相等待,直到到达某个公共屏障点,然后释放这些线程,重置屏障点继续等待,知道所有要执行的线程都执行完毕。

正文

为了让读者更容易理解这个栅栏的含义,我做一个比喻,目前有100个人要坐车去另一个地方,每个车可以装10个人,那么这个屏障点就是车里坐满了10个人,然后发车,接着马上重置,然后让后面10个人在坐上车,以此类推,直到这100个人都执行完毕。

CyclicBarrier源码解析

还是老套路,介绍最基础最常用方法都源码。

首先这个工具类可以用两个构造方法调用,一个是默认都构造方法(单个参数,只设置屏障点),一个是用于线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,但一般选用单个参数的。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) 
         throw new IllegalArgumentException();
this.parties = parties;this.count = parties;    this.barrierCommand = barrierAction;
    }
    public CyclicBarrier(int parties) {
     this(parties, null);
    }

await方法

调用await方法的线程告诉CyclicBarrier有一个线程已经到达同步点,然后当前线程被阻塞。直到parties(设置的屏障数量)个参与线程调用了await方法。CyclicBarrier同样提供带超时时间的await方法。

public int await() throws InterruptedException, BrokenBarrierException {
       try {
           return dowait(false, 0L);
       } catch (TimeoutException toe) {
           throw new Error(toe); // cannot happen
       }
   }
public int await(long timeout, TimeUnit unit)
       throws InterruptedException,
              BrokenBarrierException,
              TimeoutException {
       return dowait(true, unit.toNanos(timeout));
   }

dowait方法

这个方法是核心方法,他就是真正线程安安静静的等待。但也不是绝对但等待,有以下几个点就释放的。

  • 最后一个线程到达,即index == 0
  • 某个参与线程等待超时
  • 某个参与线程被中断
  • 调用了CyclicBarrier的reset()方法。该方法会将屏障重置为初始状态
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
            TimeoutException {
    // 获取独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 当前代
        final Generation g = generation;
        // 如果这代损坏了,抛出异常
        if (g.broken)
            throw new BrokenBarrierException();
 
        // 如果线程中断了,抛出异常
        if (Thread.interrupted()) {
            // 将损坏状态设置为true
            // 并通知其他阻塞在此栅栏上的线程
            breakBarrier();
            throw new InterruptedException();
        }
 
        // 获取下标
        int index = --count;
        // 如果是 0,说明最后一个线程调用了该方法
        if (index == 0) {  // tripped
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                // 执行栅栏任务
                if (command != null)
                    command.run();
                ranAction = true;
                // 更新一代,将count重置,将generation重置
                // 唤醒之前等待的线程
                nextGeneration();
                return 0;
            } finally {
                // 如果执行栅栏任务的时候失败了,就将损坏状态设置为true
                if (!ranAction)
                    breakBarrier();
            }
        }
 
        // loop until tripped, broken, interrupted, or timed out
        for (;;) {
            try {
                 // 如果没有时间限制,则直接等待,直到被唤醒
                if (!timed)
                    trip.await();
                // 如果有时间限制,则等待指定时间
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                // 当前代没有损坏
                if (g == generation && ! g.broken) {
                    // 让栅栏失效
                    breakBarrier();
                    throw ie;
                } else {
                    // 上面条件不满足,说明这个线程不是这代的
                    // 就不会影响当前这代栅栏的执行,所以,就打个中断标记
                    Thread.currentThread().interrupt();
                }
            }
 
            // 当有任何一个线程中断了,就会调用breakBarrier方法
            // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
 
            // g != generation表示正常换代了,返回当前线程所在栅栏的下标
            // 如果 g == generation,说明还没有换代,那为什么会醒了?
            // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
            // 正是因为这个原因,才需要generation来保证正确。
            if (g != generation)
                return index;
            
            // 如果有时间限制,且时间小于等于0,销毁栅栏并抛出异常
            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        // 释放独占锁
        lock.unlock();
    }
}

到这里核心方法介绍完了,其实还有许多方法没有说,大家可以自己看源码把作者的注释翻译过来就能读懂了。

下面写一个简单的案例让大家知道这个到底怎么用。

由于我是在测试类里写的,所以很多命名不规范,简单说一下,下面的逻辑是,使用了一个缓存线程执行18个任务,任务要求每次处理的线程达到6个时候就释放一次,不足6个等待。

@Test
    public void test1() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
        for (int i = 1; i <= 18; i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "开始等待其他线程");
                    try {
                        cyclicBarrier.await();
                        System.out.println(Thread.currentThread().getName() + "开始执行业务逻辑,耗时0.5秒");
                        // 工作线程开始处理,这里用Thread.sleep()来模拟业务处理
                        Thread.sleep(500);
                        System.out.println(Thread.currentThread().getName() + "业务逻辑执行完毕");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executorService.shutdown();
    }

下面是我打印的结果,可以看出来6个线程一组,每次达到6个就开始执行。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2018-12-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 晏霖 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 正文
    • CyclicBarrier源码解析
      • await方法
        • dowait方法
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档