源码阅读之CyclicBarrier

源码阅读是基于JDK7,本篇主要涉及CyclicBarrier常用方法源码分析。文中代码若格式排版不对,可点击底部的阅读原文阅读。

1.概述

CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(也可以叫同步点),即相互等待的线程都完成调用await方法,所有被屏障拦截的线程才会继续运行await方法后面的程序。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该屏障点在释放等待线程后可以重用,所以称它为循环的屏障点。CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达屏障点之后(但在释放所有线程之前),该命令只在所有线程到达屏障点之后运行一次,并且该命令由最后一个进入屏障点的线程执行。

2.使用样例

下面的代码演示了CyclicBarrier简单使用的样例。

public class CyclicBarrierDemo {

@Test

public void test() {

final CyclicBarrier barrier = new CyclicBarrier(2, myThread);

new Thread(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName());

barrier.await();

System.out.println(Thread.currentThread().getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}, "thread1").start();

new Thread(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName());

barrier.await();

System.out.println(Thread.currentThread().getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}, "thread2").start();

}

Thread myThread = new Thread(new Runnable() {

@Override

public void run() {

System.out.println("myThread");

}

}, "thread3");

}

输出结果如下所示:

thread1

thread2

myThread

thread2

thread1

3.数据结构

CyclicBarrier中声明了如下一些属性及变量:

private final ReentrantLock lock = new ReentrantLock();

private final Condition trip = lock.newCondition();

private final int parties;

private final Runnable barrierCommand;

private Generation generation = new Generation();

private int count;

(1)lock用于保护屏障入口的锁;

(2)trip线程等待条件;

(3)parties参与等待的线程数;

(4)barrierCommand当所有线程到达屏障点之后,首先执行的命令;

(5)count实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties。

4.构造方法

提供了两个构造函数可供使用。

//创建一个CyclicBarrier实例,parties指定参与相互等待的线程数,

//barrierAction指定当所有线程到达屏障点之后,首先执行的操作,该操作由最后一个进入屏障点的线程执行。

public CyclicBarrier(int parties, Runnable barrierAction) {

if (parties <= 0) throw new IllegalArgumentException();

this.parties = parties;

this.count = parties;

this.barrierCommand = barrierAction;

}

//创建一个CyclicBarrier实例,parties指定参与相互等待的线程数

public CyclicBarrier(int parties) {

this(parties, null);

}

5.getParties方法

//返回参与相互等待的线程数

public int getParties() {

return parties;

}

6.await方法

//该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态

//直到所有线程都到达屏障点,当前线程才会被唤醒

public int await() throws InterruptedException, BrokenBarrierException {

try {

return dowait(false, 0L);

} catch (TimeoutException toe) {

throw new Error(toe); // cannot happen;

}

}

//该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态

//在timeout指定的超时时间内,等待其他参与线程到达屏障点

//如果超出指定的等待时间,则抛出TimeoutException异常,如果该时间小于等于零,则此方法根本不会等待

public int await(long timeout, TimeUnit unit)

throws InterruptedException,

BrokenBarrierException,

TimeoutException {

return dowait(true, unit.toNanos(timeout));

}

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();

try {

final Generation g = generation;

if (g.broken)

throw new BrokenBarrierException();

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

int index = --count;

if (index == 0) { // tripped

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

//当所有参与的线程都到达屏障点,立即去唤醒所有处于休眠状态的线程,恢复执行

nextGeneration();

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

if (!timed)

//让当前执行的线程阻塞,处于休眠状态

trip.await();

else if (nanos > 0L)

//让当前执行的线程阻塞,在超时时间内处于休眠状态

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We're about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to

// "belong" to subsequent execution.

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

if (g != generation)

return index;

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

//唤醒所有处于休眠状态的线程,恢复执行

//重置count值为parties

//重置中断状态为false

private void nextGeneration() {

// signal completion of last generation

trip.signalAll();

// set up next generation

count = parties;

generation = new Generation();

}

//唤醒所有处于休眠状态的线程,恢复执行

//重置count值为parties

//重置中断状态为true

private void breakBarrier() {

generation.broken = true;

count = parties;

trip.signalAll();

}

这个等待的await方法,其实是使用ReentrantLock和Condition控制实现的。

7.isBroken方法

public boolean isBroken() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

return generation.broken;

} finally {

lock.unlock();

}

}

判断此屏障是否处于中断状态。如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此屏障点,或者因为异常而导致某个屏障操作失败,则返回true;否则返回false。

8.reset方法

//将屏障重置为其初始状态。

public void reset() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

//唤醒所有等待的线程继续执行,并设置屏障中断状态为true

breakBarrier(); // break the current generation

//唤醒所有等待的线程继续执行,并设置屏障中断状态为false

nextGeneration(); // start a new generation

} finally {

lock.unlock();

}

}

9.getNumberWaiting方法

//返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。

public int getNumberWaiting() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

return parties - count;

} finally {

lock.unlock();

}

}

小结:

1.CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。

2.这个等待的await方法,其实是使用ReentrantLock和Condition控制实现的。

Java技术分享微信公众号JavaQ,欢迎围观吐槽!

原文发布于微信公众号 - JavaQ(Java-Q)

原文发表时间:2016-08-18

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Java3y

Web开发模式【Mode I 和Mode II的介绍、应用案例】

开发模式的介绍 在Web开发模式中,有两个主要的开发结构,称为模式一(Mode I)和模式二(Mode II). 首先我们来理清一些概念吧: DAO(Data ...

3507
来自专栏技术墨客

Spring核心——资源数据管理 原

在Profile管理环境一文中介绍了环境的概念以及Spring Profile特性控制Bean的添加。本文将进一步介绍Spring管理和控制操作系统变量、JVM...

974
来自专栏JavaQ

高并发编程-CyclicBarrier深入解析

CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(也可以叫同步点),即相互等待的线程都完成调用await方法...

1.1K3
来自专栏架构之路

SpringMVC + Mybatis bug调试 SQL正确,查数据库却返回NULL

今天碰到个bug,有点意思 背景是SpringMVC + Mybatis的一个项目,mapper文件里写了一条sql 大概相当于 select a from t...

3777
来自专栏芋道源码1024

面试问烂的 Spring AOP 原理

来源:https://www.jianshu.com/p/e18fd44964eb

2324
来自专栏GreenLeaves

WebService 之 身份验证

  在项目开发,我们经常会使用WebService,但在使用WebService时我们经常会考虑到了WebService是安全问题,很容易想到通过一组用户名与密...

3927
来自专栏FreeBuf

恶意程序分析利器PowerShellArsenal

简介 PowerShellArsenal是一个PowerShell模块,它的功能是帮助逆向工程师来分析.NET恶意软件,PowerShellArsenal的功能...

2629
来自专栏程序猿DD

程序员你为什么这么累【续】:编写简陋的接口调用框架 - 动态代理学习

导读: 程序员你为什么这么累? 我的编码习惯 - 接口定义 我的编码习惯 - Controller规范 我的编码习惯 - 日志建议 我的编码习惯 - 异常处理 ...

4387
来自专栏技术墨客

Spring核心——Bean的依赖注入 原

在设计模式与IoC这篇文章中,介绍了Spring基础的三大支柱的两项内容——IoC、Bean。本篇将继续围绕着Bean的创建时的注入方式来介绍Spring的核心...

941
来自专栏芋道源码1024

注册中心 Eureka 源码解析 —— EndPoint 与 解析器

目前有多种 Eureka-Server 访问地址的配置方式,本文只分享 Eureka 1.x 的配置,不包含 Eureka 1.x 对 Eureka 2.x 的...

1310

扫码关注云+社区

领取腾讯云代金券