前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >AQS源码分析之CyclicBarrier

AQS源码分析之CyclicBarrier

作者头像
山行AI
发布2020-03-25 18:07:17
3910
发布2020-03-25 18:07:17
举报
文章被收录于专栏:山行AI山行AI山行AI

CyclicBarrier的功能与CountDownLatch类似,只是它与CountDownLatch比的优势是它能够被复用。

示例

 static class Worker extends Thread {        private CyclicBarrier cyclicBarrier;
        public Worker(CyclicBarrier cyclicBarrier) {            this.cyclicBarrier = cyclicBarrier;        }
        @Override        public void run() {            super.run();
            try {                System.out.println(Thread.currentThread().getName() + "开始等待其他线程");                cyclicBarrier.await();                System.out.println(Thread.currentThread().getName() + "开始执行");                // 这里用Thread.sleep()来模拟业务处理                Thread.sleep(1000);                System.out.println(Thread.currentThread().getName() + "执行完毕");            } catch (Exception e) {                e.printStackTrace();            }        }    }
    public static void main(String[] args) {        int threadCount = 6;        CyclicBarrier cyclicBarrier = new CyclicBarrier(threadCount);
        for (int i = 0; i < threadCount; i++) {            Worker worker = new Worker(cyclicBarrier);            worker.start();        }    }

结果:

Thread-0开始等待其他线程Thread-1开始等待其他线程Thread-2开始等待其他线程Thread-3开始等待其他线程Thread-4开始等待其他线程Thread-5开始等待其他线程Thread-5开始执行Thread-0开始执行Thread-1开始执行Thread-2开始执行Thread-3开始执行Thread-4开始执行Thread-5执行完毕Thread-0执行完毕Thread-1执行完毕Thread-3执行完毕Thread-4执行完毕Thread-2执行完毕

六个线程都会在await方法处等待,等到六个线程都到位后才会并发往下执行,接下来我们基于源码来分析下其内部实现原理。

CyclicBarrier属性与构造方法

内部类Generation和CyclicBarrier的几个属性
// 屏障的每次使用都表示为一个Generation实例。每当障碍被触发或重置时,Generation都会发生变化// 如果有休息但没有后续的重置,则不需要活跃的Generationprivate static class Generation {        boolean broken = false;    }
    // 防护栅栏入口的锁,非公平锁    private final ReentrantLock lock = new ReentrantLock();    // 在锁上的Condition上等待直到被触发的condition    private final Condition trip = lock.newCondition();    // parties的数量    private final int parties;    // 被触发时运行的命令    private final Runnable barrierCommand;    // 当前generation    private Generation generation = new Generation();
    // 处于等待状态的parties数量。每一个generation都会将count从parties减到0。它会在开启一个新的generation或者broken时会被重置为parties    private int count;
构造方法
// CyclicBarrier会在当有足够数量(parties个)的线程在它上面等待时会触发,当被触发时会执行barrierAction中的内容,由进入屏障的最后一个线程执行    public CyclicBarrier(int parties, Runnable barrierAction) {        if (parties <= 0) throw new IllegalArgumentException();        this.parties = parties;        // count的初始值为parties        this.count = parties;        this.barrierCommand = barrierAction;    }    // 这个方法中只是没有所有在barrier被触发时需要执行的内容    public CyclicBarrier(int parties) {        this(parties, null);    }

CyclicBarrier会在当有足够数量(parties个)的线程在它上面等待时会触发,当被触发时会执行barrierAction中的内容,由进入屏障的最后一个线程执行,如果没有需要在barrier触发时执行的内容,可以传入null。

直接进入java.util.concurrent.CyclicBarrier#await()方法

话不多说,直接上代码:

public int await() throws InterruptedException, BrokenBarrierException {        try {            return dowait(false, 0L);        } catch (TimeoutException toe) {            throw new Error(toe); // cannot happen        }    }

可以看到,这个方法的主要内容都委托给了dowait方法,没有猜错的话,dowait方法的第二个值传0时表示一直阻塞,具体猜得对不对,咱们继续看dowait方法代码:

  /**     * Main barrier code, covering the various policies.     */    private int dowait(boolean timed, long nanos)        throws InterruptedException, BrokenBarrierException,               TimeoutException {        // 一把非公平锁,主要用于控制barrier的入口        final ReentrantLock lock = this.lock;        // 上锁,会在finally中解锁        lock.lock();        try {            // 拷贝到本地变量            final Generation g = generation;            // 如果已经处理broken状态则抛异常,会在finally中解锁            if (g.broken)                throw new BrokenBarrierException();            // 响应线程中断,如果外部调用了Thread.interrupt则这里会为true            if (Thread.interrupted()) {                // break 当前barrier,具体下面的的分析,注意下这里会唤醒所有处于等待状态的线程                breakBarrier();                throw new InterruptedException();            }
            int index = --count;            if (index == 0) {  // tripped  当index为0时表明线程已经全部就位,直接触发barrier                boolean ranAction = false;// 一个标识位,如果整个过程没能执行成功,则需要在finally中调用breakBarrier方法                try {                    // 检查有无需要在触发时执行的command                    final Runnable command = barrierCommand;                    if (command != null)// 有的话则执行,注意这里调用的是run方法,表明是在执行到index == 0成立时的这个线程中执行的,这个线程肯定是最后一个进入barrier的线程。                        command.run();                    // 标识为true                    ranAction = true;                    // 表明上一个周期结束,开始下一个周期,方法的具体代码见下面的分析                    nextGeneration();                    return 0;                } finally {                    if (!ranAction)                        breakBarrier();                }            }
            // loop until tripped, broken, interrupted, or timed out            for (;;) {// 无限循环                try {                    // 如果没有时间限制则一直等待                    if (!timed)                        // trip是一个conditon                        // 这个会添加一个condition状态的节点到condition队列,然后处于阻塞状态,直到被signal唤醒                        trip.await();                    else if (nanos > 0L)                        // 调用等待超时,如果超时则不再进行等待                        nanos = trip.awaitNanos(nanos);                } catch (InterruptedException ie) {                    // 出现异常,如果还没有处理broken,则broken                    if (g == generation && ! g.broken) {                        breakBarrier();                        throw ie;                    } else {                        // We're about to finish waiting even if we had not                        // been interrupted, so this interrupt is deemed to                        // "belong" to subsequent execution.                        // 尝试用interrupt来结束等待                        Thread.currentThread().interrupt();                    }                }                // 如果已经broken,则抛出异常                if (g.broken)                    throw new BrokenBarrierException();                // 如果g不是当前generation,证明中间generation发生了改变,这里返回进行到了哪个index,也就是还有多少线程未进入barrier,例如全部准备就绪会返回0                if (g != generation)                    return index;                // 如果timed为true并且参数nanos也<=0则直接中断barrier                if (timed && nanos <= 0L) {                    breakBarrier();                    throw new TimeoutException();                }            }        } finally {            // 解锁            lock.unlock();        }    }

 /**     * Sets current barrier generation as broken and wakes up everyone.     * Called only while holding lock.     */    private void breakBarrier() {        // 设置broken状态为true        generation.broken = true;         // 将count的值复位为parties        count = parties;        // 唤醒等待状态的线程        trip.signalAll();    }
 /**     * Updates state on barrier trip and wakes up everyone.     * Called only while holding lock.     */    private void nextGeneration() {        // signal completion of last generation        // 唤醒所有的等待线程        trip.signalAll();        // set up next generation        // 重置count的值        count = parties;        // 将generation属性指向一个新的Generation,表示新的一代已经开始了        generation = new Generation();    }

java.util.concurrent.locks.AbstractQueuedLongSynchronizer.ConditionObject#awaitNanos:

这里主要总结如下几点:

  • 执行barrierCommand的是最后一个进入barrier的线程;
  • park操作可以让线程进入等待状态,让出锁的占有权;
  • dowait方法的主要作用是对barrier入口加锁,正常条件下会在进入barrier的线程在还没有达到parties个时在入口锁的condition上等待(注意此时线程不是在一直执行for循环,而是一直处理等待状态,猜测这个无限循环与虚假唤醒有一定的关系),并在出现异常或者broken时中止await;
  • 当index为0时,代表所有线程准已进入barrier,可以执行nextGeneration,在nextGeneration方法中会对condition上的所有线程进行唤醒,注意唤醒后它们会进行自己线程内的工作,与barrier便没有关系了,barrier可以进行下一轮的generation。
  • breakBarrier也会唤醒condition上的所有等待线程。

最后,我们再来看一下reset方法:

public void reset() {        final ReentrantLock lock = this.lock;        lock.lock();        try {            // 中止当前generation的操作,包括唤醒所有的等待线程等            breakBarrier();   // break the current generation            // 开始新一轮的generation            nextGeneration(); // start a new generation        } finally {            lock.unlock();        }    }

主要做了两步:

  1. 中止当前generation的操作,包括唤醒所有的等待线程等;
  2. 开始新一轮的generation,在调用了这个方法之后就可以重新利用barrier了。

总结

CyclicBarrier与CountDownLatch的功能类似,但是实现是不同的,CyclicBarrier是基于ReentrantLock的非公平版本,而CountDownLatch是基于AQS的state值来实现,分为公平和非公平版本。还有一点就是CyclicBarrier是可以重用的。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-03-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 示例
    • CyclicBarrier属性与构造方法
      • 内部类Generation和CyclicBarrier的几个属性
      • 构造方法
    • 直接进入java.util.concurrent.CyclicBarrier#await()方法
    • 总结
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档