前面我们已经分析了闭锁(CountDownLatch)和信号量(Semaphore)的实现原理及案例,接下去继续看下一个JDK内置同步器——循环屏障(CyclicBarrier)。通过循环屏障可以实现对多线程的并发控制,只有当到达屏障的线程数量达到指定值时屏障才会放行。实际上CyclicBarrier也可以看成是一个倒计数器,倒计数器的最大值即是屏障的大小,每个线程调用await方法都会让倒计数器的值减一,当倒计数器的值为0时则会让所有等待的线程往下执行。
循环屏障主要的应用场景是在某些节点约束N个线程,比如让指定数量的线程共同到达某个节点后这些线程才能一起往下执行。如下图中,对于一个倒计数器最大值为3的循环屏障,初始时三个线程都未调用await方法。当线程一调用await方法后倒计数器的值变为2,接着线程二继续调用await方法使倒计数器的值变为1,然后线程三也调用await方法,此时倒计数器的值为0,三个线程都通过屏障继续往下执行。最后倒计数器的值又重新恢复到最大值3,这就是为什么叫循环屏障的原因。
01
三要素
循环屏障的三要素为:倒计数器最大值、await方法以及触发点Runnable任务。倒计数器的最大值在构建CyclicBarrier对象时指定,它表示需要等待的线程数。await方法能让倒计数器的值减一,并且让线程进入等待状态。触发点Runnable任务指的是当指定数量的线程到达屏障后会触发执行的任务。
02
实现原理
与前面的闭锁和信号量的实现不同的是,循环屏障CyclicBarrier是通过可重入锁ReentrantLock来实现的。但如果往更底层追究的话也同样是使用AQS同步器,因为ReentrantLock的实现是基于AQS同步器的。
为了更方便阅读,这里将循环屏障的核心源码整理成两部分,分别对着两部分进行讲解。第一部分如下图,CyclicBarrier类中的ReentrantLock对象和Condition对象用于控制线程,parties变量表示倒计数器的最大值,count变量表示倒计数器当前值,而Runnable对象即为触发点任务。主要有两个构造函数,一种传入倒计数器最大值,另外一种传入倒计数器最大值和触发点任务。nextGeneration方法表示已经达到屏障倒计数器的最大值,准备进行下一轮,它会将屏障中所有的线程放行,而且将倒计数器的当前值重置为最大值。getNumberWaiting方法用于获取当前屏障中等待的线程数,其值为倒计数器的最大值减去倒计数器的当前值。
第二部分我们来分析C最核心的部分——await方法。它会调用dowait方法,所以我们主要看dowait方法。首先为了线程安全,通过lock.lock()进行加锁,然后判断当前线程是否被中断,如果被中断则往上抛InterruptedException异常。线程调用await方法会让倒计数器减一,所以接下去会将当前倒计数器的当前值减一。如果倒计数器当前值为0则需要执行一个Runnable对象,它就是前面构造函数传入的触发点任务,然后调用nextGeneration方法进入下一轮。而如果倒计数器当前值不为0的话,则调用Condition对象的await方法进入等待状态,当然如果设置了超时的话则使用awaitNanos方法,中间如果发生中断异常则通过Thread.currentThread().interrupt()设置当前线程的中断标识。此外,如果等待时间超过指定时间则抛TimeoutException异常。最后调用lock.unlock()释放锁。
03
案例 1
在例子一中,我们创建一个CyclicBarrier对象,倒计数器最大值为3。然后创建三个线程,在线程中会在不同时机调用await方法。程序的最终输出在下面,但输出看不出过程,我们看看执行的过程是怎样的。线程一启动后输出thread1 is waiting,然后调用await方法进入等待状态,倒计数器的值减一后为2。线程二启动后先睡眠两秒钟然后输出thread2 is waiting,再调用await方法进入等待状态,倒计数器的值再减一后为1。线程三启动后先睡眠4秒钟并输出thread3 is waiting,再调用await方法进入等待状态,倒计数器的减减一后为0。此时所有等待的线程都将被放行往下执行,随机输出xxx goes。
04
案例 2
下面这个例子演示了触发点Runnable任务,以等b女朋友吃饭为场景。假设我与女朋友约好去饭馆吃饭,我准时到达饭后后就开始等女朋友。而女朋友呢?还在化妆,前前后后花费了半个小时才到达饭馆。此时两个人都到齐了,于是两个人开始点餐。
05
循环屏障 VS 闭锁
这里的循环屏障与前面讲到的闭锁有点类似,它们都用于多线程并发的控制,机制都类似一种倒计数器。但它们也存在不同的地方,可以认为闭锁针对的是倒计数器的值,而循环屏障针对的是线程数。这句话如何理解呢?就是说假如倒计数器的值为5,那么对于闭锁来说只要调用五次countDown方法便能让等待的线程往下执行,而不管是一个线程调五次countDown方法还是五个线程分别调用一次countDown方法。对于循环屏障来说,必须要有五个线程分别调用await方法才能使得等待的线程往下执行。循环屏障和闭锁都是等倒计数器的值为0时让所有等待的线程通过并往下执行,只是循环屏障规定倒计数器的减一操作只能由不同的线程来操作。
比如下面两者使用对比的例子中,主线程启动线程一后调用闭锁的await方法进入等等等状态,此时线程一睡眠两秒后连续调用两次countDown方法将倒计数器的值减为0,使得主线程得以往下执行。接着主线程启动线程二,线程二睡眠两秒后本想着打算调用两次循环屏障的await方法让倒计数器的值变为0,但实际上失败了,因为第一次执行barrier.await()时该线程就已经进入等待了,所以无法往下执行。这个程序只能输出“CountDownLatch”和“CyclicBarrier”,并不能输出“can you get here”。
06
总结
本文介绍了一个JDK内置的同步器——循环屏障(CyclicBarrier),通过它能够让指定数量的线程共同到达某个节点后才让这些线程一起往下执行。它可以看成是一个倒计数器,每个线程调用await方法都会进入等待状态并且导致倒计数器减一,当倒计数器的值为0时所有等待的线程才能往下执行。我们深入分析了CyclicBarrier的实现原理,它不直接通过AQS同步器来实现,而是通过ReentrantLock实现,但ReentrantLock的实现也是基于AQS同步器,所以它其实还是间接地通过AQS同步器来实现。同时我们提供了两个例子来帮助我们理解循环屏障,最后还对比了循环屏障与闭锁的异同
阅读建议
将闭锁(CountDownLatch)、信号量(Semaphore)、循环屏障(CyclicBarrier)对比分析更容易理解原理
- END -