前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程进阶——JUC并发编程之CyclicBarrier源码一探究竟?

多线程进阶——JUC并发编程之CyclicBarrier源码一探究竟?

作者头像
须臾之余
发布2021-07-27 15:04:34
2760
发布2021-07-27 15:04:34
举报
文章被收录于专栏:须臾之余须臾之余

1、学习切入点

百度翻译大概意思就是:

一种同步辅助程序,允许一组线程相互等待到达一个公共的屏障点。CyclicBarrier在涉及固定大小的线程方的程序中非常有用,这些线程方有时必须相互等待。这个屏障被称为循环屏障,因为它可以在等待的线程被释放后重新使用。 CyclicBarrier支持可选的Runnable命令,该命令在参与方中的最后一个线程到达后,但在释放任何线程之前,每个屏障点运行一次。此屏障操作有助于在任何参与方继续之前更新共享状态。

动图演示:

在上文中我们分析完了 CountDownLatch源码,可以理解为减法计数器,是基于AQS的共享模式使用,而CyclicBarrier相比于CountDownLatch 来说,要简单很多,它类似于加法计数器,在源码中使用 ReentrantLock 和 Condition 的组合来使用。

2、案例演示 CyclicBarrier

代码语言:javascript
复制
//加法计数器
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        /**
         * 集齐5名队员,开始游戏
         */
        // 开始战斗的线程
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5,()->{
            System.out.println("欢迎来到王者荣耀,敌军还有五秒到达战场!全军出击!");
        });
        for (int i = 1; i <=5 ; i++) {
            final int temp = i;
            // lambda能操作到 i 吗
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+"第"+temp+"个进入游戏!");
                try {
                    cyclicBarrier.await(); // 等待
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

3、入手构造器

代码语言:javascript
复制
//构造器1
/** 创建一个新的CyclicBarrier,它将在给定数量的参与方(线程)等待时触发,并在触发屏障时执行给定的屏障操作,由最后一个进入屏障的线程执行 */   
public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

//构造器2
/** 创建一个新的CyclicBarrier,当给定数量的参与方(线程)在等待它时,它将跳闸,并且在屏障跳闸时不执行预定义的操作 */
public CyclicBarrier(int parties) {
        this(parties, null);
    }

其中构造器1为核心构造器,在这里你可以指定 parties 本局游戏的参与者的数量(要拦截的线程数)以及 barrierAction 本局游戏结束时要执行的任务。

3、入手成员变量

代码语言:javascript
复制
   /** 同步操作锁 */
    private final ReentrantLock lock = new ReentrantLock();
    /** 线程拦截器 Condition维护了一个阻塞队列*/
    private final Condition trip = lock.newCondition();
    /** 每次拦截的线程数 */
    private final int parties;
    /* 换代前执行的任务 */
    private final Runnable barrierCommand;
    /** 表示栅栏的当前代 类似代表本局游戏*/
    private Generation generation = new Generation();
    /** 计数器 */
    private int count;
    /** 静态内部类Generation  */
    private static class Generation {
        boolean broken = false;
    }

3、入手核心方法

3.1、【await】方法源码分析

下面分析这两个方法,分别为【非定时等待】和【定时等待】!

代码语言:javascript
复制
//非定时等待
public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }
//定时等待
public int await(long timeout, TimeUnit unit)throws InterruptedException,
              BrokenBarrierException,
              TimeoutException {
       return dowait(true, unit.toNanos(timeout));
   }

可以看到,最终两个方法都走【dowait】 方法,只不过参数不同。下面我们重点看看这个方法到底做了哪些事情。

代码语言:javascript
复制
//核心等待方法
 private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();//加锁操作
        try {
            final Generation g = generation;
            //检查当前栅栏是否被打翻
            if (g.broken)
                throw new BrokenBarrierException();
            //检查当前线程是否被中断
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }
            //每次都将计数器的值-1
            int index = --count;
            //计数器的值减为0,则需要唤醒所有线程并转换到下一代
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    //唤醒所有线程前先执行指定的任务
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    //唤醒所有线程并转换到下一代
                    nextGeneration();
                    return 0;
                } finally {
                    //确保在任务未成功执行时能将所有线程唤醒
                    if (!ranAction)
                        breakBarrier();
                }
            }
            //如果计数器不为0 则执行此循环
            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    //根据传入的参数来觉得是定时等待还是非定时等待
                    if (!timed)
                        //如果没有时间限制,则直接等待,直到被唤醒
                        trip.await();
                    else if (nanos > 0L)
                        //如果有时间限制,则等待指定时间
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    //若当前线程在等待期间被中断则打翻栅栏唤醒其它线程
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // 若在捕获中断异常前已经完成在栅栏上的等待,则直接调用中断操作
                        Thread.currentThread().interrupt();
                    }
                }
                //如果线程因为打翻栅栏操作而被唤醒则抛出异常
                if (g.broken)
                    throw new BrokenBarrierException();
                //如果线程因为换代操作而被唤醒则返回计数器的值
                if (g != generation)
                    return index;
                //如果线程因为时间到了而被唤醒则打翻栅栏并抛出异常
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();//最终解锁
        }
    }

分两步分析,首先计数器的值减为0的情况,和计数器不为0的情况,首先第一种情况下:

第二种情况,计数器不为0,则进入自旋for(;;):

多线程同时并发访问,如何阻塞当前线程?

我们翻看源码,这里就看一下没有时间限制的【trip.await】方法:

整个await的过程:

1、将当前线程加入到Condition锁队列中。特别主要要区分AQS的等待队列,这里进入的是Condition的FIFO队列 2、释放锁。这里可以看到【fullyRelease】将锁释放了,否则【acquireQueued(node, savedState)】别的线程就无法拿到锁而发生死锁。 3、自旋(while)挂起,直到被唤醒或者超时或者CACELLED等。 4、获取锁【acquireQueued】方法,并将自己从Condition的FIFO队列中释放,表面自己不再需要锁(我已经有锁了)

3.2、Condition 队列与AQS等待队列 补充

AQS等待队列与Condition队列是两个相互独立的队列,【await】就是在当前线程持有锁的基础上释放锁资源,并新建Condition节点加入到Condition队列尾部,阻塞当前线程。【signal】就是将当前Condition的头结点移动到AQS等待队列节点尾部,让其等待再次获取锁。下面画图演示区别:

节点1执行Condition.await()->(1)将head后移 ->(2)释放节点1的锁并从AQS等待队列中移除->(3)将节点1加入到Condition的等待队列中->(4)更新lastWrite为节点1

节点2执行signal()操作->(1)将firstWrite后移->(2)将节点4移出Condition队列->(3)将节点4加入到AQS的等待队列中去->(4)更新AQS等待队列的tail

3.3、总结:

一、Condition的数据结构:

我们知道一个Condition可以在多个地方被await(),那么就需要一个FIFO的结构将这些Condition串联起来,然后根据需要唤醒一个或者多个(通常是所有)。所以在Condition内部就需要一个FIFO的队列。 private transient Node firstWaiter; private transient Node lastWaiter; 上面的两个节点就是描述一个FIFO的队列。我们再结合前面提到的节点(Node)数据结构。我们就发现Node.nextWaiter就派上用场了!nextWaiter就是将一系列的Condition.await 串联起来组成一个FIFO的队列。

二、线程何时阻塞和释放

阻塞:await()方法中,在线程释放锁资源之后,如果节点不在AQS等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁 释放:signal()后,节点会从condition队列移动到AQS等待队列,则进入正常锁的获取流程。

3.4、【signalAll】signalAll源码分析

signalAll】方法,唤醒所有在Condition阻塞队列中的线程

代码语言:javascript
复制
private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();//唤醒Condition中等待的线程
    }

public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
     }
/** 这个方法相当于把Condition队列中的所有Node全部取出插入到等待队列中去 */
private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
       }
代码语言:javascript
复制
/** 将节点从条件队列传输到同步队列AQS的等待队列中 */
final boolean transferForSignal(Node node) {
        //核心添加节点到AQS队列方法
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
代码语言:javascript
复制
/** 使用CAS+自旋方式插入节点到等待队列,如果队列为空,则初始化队列 */
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }

3.5、【reset】方法源码分析

最后,我们来看看怎么重置一个栅栏:

将屏障重置为初始状态。如果任何一方目前在隔离墙等候,他们将带着**BrokenBarrierException**返回。 请注意,由于其他原因发生中断后的重置可能很复杂;线程需要以其他方式重新同步,并选择一种方式执行重置。 最好是创建一个新的屏障供以后使用

代码语言:javascript
复制
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // break the current generation
            nextGeneration(); // start a new generation
        } finally {
            lock.unlock();
        }
    }

测试reset代码:

首先,打破栅栏,那意味着所有等待的线程(5个等待的线程)会唤醒,【await 】方法会通过抛出【BrokenBarrierException】异常返回。然后开启新一代,重置了 count 和 generation,相当于一切归0了。

4、CyclicBarrier 与 CountDownLatch 的区别

相同点:

1、都可以实现一组线程在到达某个条件之前进行等待 2、它们内部都有一个计数器,当计数器的值不断减为0的时候,所有阻塞的线程都会被唤醒!

不同点:

1、CyclicBarrier 的计数器是由它自己来控制,而CountDownLatch 的计数器则是由使用则来控制 2、在CyclicBarrier 中线程调用 await方法不仅会将自己阻塞,还会将计数器减1,而在CountDownLatch中线程调用 await方法只是将自己阻塞而不会减少计数器的值。 3、另外,CountDownLatch 只能拦截一轮,而CyclicBarrier 可以实现循环拦截。一般来说CyclicBarrier 可以实现 CountDownLatch的功能,而反之不能。

5、总结:

当调用【cyclicBarrier.await】方法时,最终都会执行【dowait】方法,使用了ReentrantLock去上锁,每次讲计数器count值-1,当计数器值-1为0的时候,会先执行指定任务,调用Condition的【trip.signalAll()】唤醒所有线程并进入下一代 如果当前计数器值-1不为0的时候,进入自旋,执行Condition的【await()】方法,将当前线程添加到Condition的条件队列中等待,执行【fullyRelease】调用【tryRelease】将count值-1,再判断count值是否为0,为0 则会先执行指定任务,调用Condition的【trip.signalAll()】唤醒所有线程并进入下一代,再判断是否在AQS等待队列中,如果不在的话就park当前线程进入AQS等待队列中,否则自旋直到被唤醒在Condition中的等待队列被signalAll进入AQS等待队列中获取锁

推荐阅读:

多线程进阶——JUC并发编程之抽象同步队列AQS框架设计理念一探究竟?

多线程进阶——JUC并发编程之CyclicBarrier源码一探究竟

多线程进阶——JUC并发编程之Semaphore源码一探究竟

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、学习切入点
  • 2、案例演示 CyclicBarrier
  • 3、入手构造器
    • 3、入手成员变量
      • 3、入手核心方法
        • 3.1、【await】方法源码分析
          • 3.2、Condition 队列与AQS等待队列 补充
            • 3.3、总结:
              • 一、Condition的数据结构:
              • 二、线程何时阻塞和释放
            • 3.4、【signalAll】signalAll源码分析
              • 3.5、【reset】方法源码分析
              • 4、CyclicBarrier 与 CountDownLatch 的区别
              • 5、总结:
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档