前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Java - CyclicBarrier学习和使用

Java - CyclicBarrier学习和使用

作者头像
夹胡碰
发布2020-12-29 11:20:53
2300
发布2020-12-29 11:20:53
举报
文章被收录于专栏:程序猿~程序猿~
CyclicBarrier使用场景

现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。 这种场景就可以用CyclicBarrier来实现,下面将介绍其使用方法和实现原理。

1. 使用方法

代码语言:javascript
复制
public class CyclicBarrierTest {

    static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

    public static void main(String[] args){
        new Thread(new MThread()).start();
        new Thread(new MThread()).start();
        new Thread(new MThread()).start();
    }

    public static class MThread implements Runnable{

        @Override
        public void run() {
            try {
                cyclicBarrier.await();
                System.out.println(Thread.currentThread().getName() + "await 结束");
            } catch (BrokenBarrierException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 输出结果
代码语言:javascript
复制
out => 
Thread-2await 结束
Thread-0await 结束
Thread-1await 结束

2. 流程示例

  1. 初始化通过构造函数将parties赋值。
  2. 每次线程await时,将线程转换成线程节点Node放置在等待队列中并挂起线程。
  3. 当最后一个线程执行await时,唤醒等待队列中的全部线程(自己并不挂起)。

3. 重点源码分析

  • 执行await
代码语言:javascript
复制
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        final Generation g = generation;

        if (g.broken)
            throw new BrokenBarrierException();

        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }

        int index = --count;
        if (index == 0) { 
            boolean ranAction = false;
            try {
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run();
                ranAction = true;
                nextGeneration(); // 唤醒全部线程
                return 0;
            } finally {
                if (!ranAction)
                    breakBarrier();
            }
        }

        for (;;) {
            try {
                if (!timed)
                    trip.await(); // 加入等待队列并阻塞当前线程
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            } catch (InterruptedException ie) {
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken)
                throw new BrokenBarrierException();

            if (g != generation)
                return index;

            if (timed && nanos <= 0L) {
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        lock.unlock();
    }
}
  • 挂起线程
代码语言:javascript
复制
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this); // 挂起当前线程
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  • 唤醒全部线程
代码语言:javascript
复制
private void nextGeneration() {
    trip.signalAll(); // 唤醒全部线程
    count = parties;
    generation = new Generation();
}
代码语言:javascript
复制
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
代码语言:javascript
复制
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null); // 依次唤醒等待队列中的线程
}
代码语言:javascript
复制
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);  // 唤醒线程
    return true;
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CyclicBarrier使用场景
  • 1. 使用方法
  • 2. 流程示例
  • 3. 重点源码分析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档