前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Juc并发编程15——循环屏障CyclickBarrier使用与源码剖析

Juc并发编程15——循环屏障CyclickBarrier使用与源码剖析

作者头像
用户10127530
发布2022-10-26 18:08:31
1450
发布2022-10-26 18:08:31
举报
文章被收录于专栏:半旧的技术栈半旧的技术栈

前 言 🍉 作者简介:半旧518,长跑型选手,立志坚持写10年博客,专注于java后端 🍌 专栏简介:juc并发编程,讲解锁原理、锁机制、线程池、AQS、并发容器、并发工具等,深入源码,持续更新。 🌰 文章简介:本文主要介绍常用的并发工具类:循环屏障CyclickBarrier,将深入剖析源码,讲解其使用与原理

1.循环屏障的使用

如果打一场游戏,必须等待游戏的玩家足够以后才开始,并且为了公平,所有玩家必须同时进入游戏。循环屏障CyclickBarrier就是为了解决这种场景而设计的.

假如现在游戏需要10人才能开始,并且所有玩家需要同时进入游戏。我们可以这样实现。

代码语言:javascript
复制
     public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(10, () -> {
            System.out.println("Ready to start,please prepare...");
        });
        for (int i = 0; i < 10; i++) {
            final int  finalI = i;
            new Thread(()->{
                try {
                    Thread.sleep((long) (2000 * new Random().nextDouble()));
                    System.out.println("Player:" + finalI + " prepared,"+ barrier.getNumberWaiting()+ "/10");
                    barrier.await();
                    System.out.println("Player:" + finalI + " Join Game...");
                } catch (InterruptedException | BrokenBarrierException exception) {
                    exception.printStackTrace();
                }
            }).start();
        }

    }

输出结果如下。

在这里插入图片描述
在这里插入图片描述

可以看到循环屏障会不断的阻挡线程,知道线程数量足够多时,再一起冲破线程屏障。并且在冲破屏障后,可以执行屏障创建时指定的任务。

屏障是可以循环使用的,在被冲破后,会重新开始计数,继续阻挡后续的线程。比如我们将屏障的初始容量设置为5,看看执行结果。

代码语言:javascript
复制
  public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            System.out.println("Ready to start,please prepare...");
        });
        for (int i = 0; i < 10; i++) {
            final int  finalI = i;
            new Thread(()->{
                try {
                    Thread.sleep((long) (2000 * new Random().nextDouble()));
                    System.out.println("Player:" + finalI + " prepared,"+ barrier.getNumberWaiting()+ "/5");
                    barrier.await();
                    System.out.println("Player:" + finalI + " Join Game...");
                } catch (InterruptedException | BrokenBarrierException exception) {
                    exception.printStackTrace();
                }
            }).start();
        }

    }

执行结果如下。

在这里插入图片描述
在这里插入图片描述

当然,除了自动清零,我们也可以将循环屏障手动置零。

代码语言:javascript
复制
 public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            System.out.println("Ready to start,please prepare...");
        });
        for (int i = 0; i < 3; i++) {
            new Thread(()->{
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException exception) {
                    exception.printStackTrace();
                }
            }).start();
        }
        Thread.sleep(500);
        System.out.println(barrier.getNumberWaiting());
        barrier.reset();
        System.out.println(barrier.getNumberWaiting());
    }

执行结果如下。报了BrokenBarrierException,这是因为在循环屏障数达到3以后,还没有冲破屏障,我们就将循环屏障的计数清零了,之前处于等待状态的线程全部被中断,屏障被破坏了。

在这里插入图片描述
在这里插入图片描述

要是处于等待状态的线程被中断了呢?循环屏障的计数会不会自动减少?

代码语言:javascript
复制
 public static void main(String[] args)  {
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            System.out.println("Ready to start,please prepare...");
        });

        Runnable r = ()->
        {
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException exception) {
                    exception.printStackTrace();
                }
        };

       Thread t = new Thread(r);
       t.start();
       t.interrupt();
       new Thread(r).start();

    }

其结果如下。

在这里插入图片描述
在这里插入图片描述

第一个异常那个信息很好理解,是异常中断。第二个异常信息是因为屏障里的线程被取消,导致本轮屏障被破坏了。可以这么理解,约了三个朋友一起打麻将,结果有一个坑爹的队友临时爽约了,那他一个人就导致这局麻将组不成了。当然,我们还可以重新组局,我们也可以使用reset对屏障重新计数。

代码语言:javascript
复制
 public static void main(String[] args) throws InterruptedException {
        CyclicBarrier barrier = new CyclicBarrier(5, () -> {
            System.out.println("Ready to start,please prepare...");
        });

        Runnable r = ()->
        {
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException exception) {
                    exception.printStackTrace();
                }
        };

       Thread t = new Thread(r);
       t.start();
       t.interrupt();
       Thread.sleep(500); // 等待中断结束
       barrier.reset();
       new Thread(r).start();

    }

执行结果如下。

在这里插入图片描述
在这里插入图片描述

大家是不是有种感觉,CountdownLatchCyclickBarrier还挺相似的。我们来总结下他们的区别。

  • CountdownLatch 一次性的,仅仅可以使用一次 多个线程等待指定数量的其它线程完成任务的同步工具
  • CyclickBarrier 可以重复使用 多个线程在同一个时间开始执行的工具

2.循环屏障的源码剖析

代码语言:javascript
复制
public class CyclicBarrier {
   
   // 每一代都会生成新的Generation
    private static class Generation {
    	// broken标记,用来存放屏障是否被损坏
    	// 被损坏的屏障是不能被使用的
        boolean broken = false;
    }

    /** 内部维护一个可重入锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 内部维护一个Condition */
    private final Condition trip = lock.newCondition();
    /** 屏障的最大容量 */
    private final int parties;
    /* 冲破屏障后被执行的任务 */
    private final Runnable barrierCommand;
    /** 生成当前轮的Generation */
    private Generation generation = new Generation();

  	// 默认为最大的阻挡容量,每加入一个线程减1
  	// 与CountDownLatch类似
  	// 当屏障被冲破或重置,会将count重置为最大的阻挡容量
    private int count;

   // 当屏障被冲破后,将调用该方法开启下一轮
    private void nextGeneration() {
        // 唤醒所有等待中的线程
        trip.signalAll();
        // 重置count
        count = parties;
        //创建新的Generation对象
        generation = new Generation();
    }

    // 破坏当前的屏障,破坏后当前轮次的屏障就不能够再使用了
    // 除非重置生成新代
    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }
    
   // 开始等待
 	public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
        	// 由于这里没有使用时间策略,因此如果出现超时,就是异常状况
            throw new Error(toe); 
        }
    }
    
    // 可超时的等待
  	public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }
    
    // 真正的等待流程
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        //加锁 因为会有多个线程同时调用await方法,
        // 需要保证每次只有一个线程能进入
        lock.lock(); 
        try {
            final Generation g = generation;
			// 确定屏障未被破坏
            if (g.broken)
                throw new BrokenBarrierException();
			// 需要破坏屏障的第一种情况:线程中断
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
			
            int index = --count;
            // 可以冲破屏障了
            if (index == 0) { 
                boolean ranAction = false;
                try {
                	// 执行冲破屏障后的任务
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    // 更新代数
                    nextGeneration();
                    return 0;
                } finally {
                	// 损坏屏障的第二种情况:执行任务异常
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 走到这说明加入的线程数量不够冲破屏障
            for (;;) { // 无限循环,直到冲破屏障,超时或者出现异常
                try {
                	// 看看是否是限时的
                    if (!timed)
                    	// breakBarrier|nextGeneration会唤醒trip
                        trip.await(); //非定时永久等待
                    else if (nanos > 0L) //定时等待指定时间
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                  // 破坏屏障的第一种情况:中断
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }
				//走到这说明trip被唤醒
				// 即使被唤醒,但是屏障被损坏的情况还是需要抛异常 
                if (g.broken)
                    throw new BrokenBarrierException();
				// 代数有更新,说明进行了换代
				// 返回,并带返回参数:当前是第几个等待的线程
                if (g != generation)
                    return index;
				// 等待超时,破坏屏障的第三种情况
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

  
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }


    public CyclicBarrier(int parties) {
        this(parties, null);
    }


    public int getParties() {
        return parties;
    }

    public boolean isBroken() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

   
    public int getNumberWaiting() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return parties - count;
        } finally {
            lock.unlock();
        }
    }
}

上面是通过内部类generation来实现屏障的更新迭代的,这种处理方式值得关注学习。

除此以外,上面的源码部分应该很好理解,这里就介绍到这里,下一篇文章将介绍并发工具类Semaphore和Exchanger,以及Fork/Join框架,这也会是并发编程基础篇的最后一篇,后面笔者还可能输出一些高阶内容,敬请期待。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-06-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.循环屏障的使用
  • 2.循环屏障的源码剖析
相关产品与服务
容器服务
腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档