前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >源码阅读之CyclicBarrier

源码阅读之CyclicBarrier

作者头像
JavaQ
发布2018-04-04 16:02:16
5740
发布2018-04-04 16:02:16
举报
文章被收录于专栏:JavaQJavaQJavaQ

源码阅读是基于JDK7,本篇主要涉及CyclicBarrier常用方法源码分析。文中代码若格式排版不对,可点击底部的阅读原文阅读。

1.概述

CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到所有线程都到达某个公共屏障点(也可以叫同步点),即相互等待的线程都完成调用await方法,所有被屏障拦截的线程才会继续运行await方法后面的程序。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该屏障点在释放等待线程后可以重用,所以称它为循环的屏障点。CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达屏障点之后(但在释放所有线程之前),该命令只在所有线程到达屏障点之后运行一次,并且该命令由最后一个进入屏障点的线程执行。

2.使用样例

下面的代码演示了CyclicBarrier简单使用的样例。

public class CyclicBarrierDemo {

@Test

public void test() {

final CyclicBarrier barrier = new CyclicBarrier(2, myThread);

new Thread(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName());

barrier.await();

System.out.println(Thread.currentThread().getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}, "thread1").start();

new Thread(new Runnable() {

@Override

public void run() {

try {

System.out.println(Thread.currentThread().getName());

barrier.await();

System.out.println(Thread.currentThread().getName());

} catch (Exception e) {

e.printStackTrace();

}

}

}, "thread2").start();

}

Thread myThread = new Thread(new Runnable() {

@Override

public void run() {

System.out.println("myThread");

}

}, "thread3");

}

输出结果如下所示:

thread1

thread2

myThread

thread2

thread1

3.数据结构

CyclicBarrier中声明了如下一些属性及变量:

private final ReentrantLock lock = new ReentrantLock();

private final Condition trip = lock.newCondition();

private final int parties;

private final Runnable barrierCommand;

private Generation generation = new Generation();

private int count;

(1)lock用于保护屏障入口的锁;

(2)trip线程等待条件;

(3)parties参与等待的线程数;

(4)barrierCommand当所有线程到达屏障点之后,首先执行的命令;

(5)count实际中仍在等待的线程数,每当有一个线程到达屏障点,count值就会减一;当一次新的运算开始后,count的值被重置为parties。

4.构造方法

提供了两个构造函数可供使用。

//创建一个CyclicBarrier实例,parties指定参与相互等待的线程数,

//barrierAction指定当所有线程到达屏障点之后,首先执行的操作,该操作由最后一个进入屏障点的线程执行。

public CyclicBarrier(int parties, Runnable barrierAction) {

if (parties <= 0) throw new IllegalArgumentException();

this.parties = parties;

this.count = parties;

this.barrierCommand = barrierAction;

}

//创建一个CyclicBarrier实例,parties指定参与相互等待的线程数

public CyclicBarrier(int parties) {

this(parties, null);

}

5.getParties方法

//返回参与相互等待的线程数

public int getParties() {

return parties;

}

6.await方法

//该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态

//直到所有线程都到达屏障点,当前线程才会被唤醒

public int await() throws InterruptedException, BrokenBarrierException {

try {

return dowait(false, 0L);

} catch (TimeoutException toe) {

throw new Error(toe); // cannot happen;

}

}

//该方法被调用时表示当前线程已经到达屏障点,当前线程阻塞进入休眠状态

//在timeout指定的超时时间内,等待其他参与线程到达屏障点

//如果超出指定的等待时间,则抛出TimeoutException异常,如果该时间小于等于零,则此方法根本不会等待

public int await(long timeout, TimeUnit unit)

throws InterruptedException,

BrokenBarrierException,

TimeoutException {

return dowait(true, unit.toNanos(timeout));

}

private int dowait(boolean timed, long nanos)

throws InterruptedException, BrokenBarrierException,

TimeoutException {

final ReentrantLock lock = this.lock;

lock.lock();

try {

final Generation g = generation;

if (g.broken)

throw new BrokenBarrierException();

if (Thread.interrupted()) {

breakBarrier();

throw new InterruptedException();

}

int index = --count;

if (index == 0) { // tripped

boolean ranAction = false;

try {

final Runnable command = barrierCommand;

if (command != null)

command.run();

ranAction = true;

//当所有参与的线程都到达屏障点,立即去唤醒所有处于休眠状态的线程,恢复执行

nextGeneration();

return 0;

} finally {

if (!ranAction)

breakBarrier();

}

}

// loop until tripped, broken, interrupted, or timed out

for (;;) {

try {

if (!timed)

//让当前执行的线程阻塞,处于休眠状态

trip.await();

else if (nanos > 0L)

//让当前执行的线程阻塞,在超时时间内处于休眠状态

nanos = trip.awaitNanos(nanos);

} catch (InterruptedException ie) {

if (g == generation && ! g.broken) {

breakBarrier();

throw ie;

} else {

// We're about to finish waiting even if we had not

// been interrupted, so this interrupt is deemed to

// "belong" to subsequent execution.

Thread.currentThread().interrupt();

}

}

if (g.broken)

throw new BrokenBarrierException();

if (g != generation)

return index;

if (timed && nanos <= 0L) {

breakBarrier();

throw new TimeoutException();

}

}

} finally {

lock.unlock();

}

}

//唤醒所有处于休眠状态的线程,恢复执行

//重置count值为parties

//重置中断状态为false

private void nextGeneration() {

// signal completion of last generation

trip.signalAll();

// set up next generation

count = parties;

generation = new Generation();

}

//唤醒所有处于休眠状态的线程,恢复执行

//重置count值为parties

//重置中断状态为true

private void breakBarrier() {

generation.broken = true;

count = parties;

trip.signalAll();

}

这个等待的await方法,其实是使用ReentrantLock和Condition控制实现的。

7.isBroken方法

public boolean isBroken() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

return generation.broken;

} finally {

lock.unlock();

}

}

判断此屏障是否处于中断状态。如果因为构造或最后一次重置而导致中断或超时,从而使一个或多个参与者摆脱此屏障点,或者因为异常而导致某个屏障操作失败,则返回true;否则返回false。

8.reset方法

//将屏障重置为其初始状态。

public void reset() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

//唤醒所有等待的线程继续执行,并设置屏障中断状态为true

breakBarrier(); // break the current generation

//唤醒所有等待的线程继续执行,并设置屏障中断状态为false

nextGeneration(); // start a new generation

} finally {

lock.unlock();

}

}

9.getNumberWaiting方法

//返回当前在屏障处等待的参与者数目,此方法主要用于调试和断言。

public int getNumberWaiting() {

final ReentrantLock lock = this.lock;

lock.lock();

try {

return parties - count;

} finally {

lock.unlock();

}

}

小结:

1.CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。

2.这个等待的await方法,其实是使用ReentrantLock和Condition控制实现的。

Java技术分享微信公众号JavaQ,欢迎围观吐槽!

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-08-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 JavaQ 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档