前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多线程基础(二十):CyclicBarrier源码分析

多线程基础(二十):CyclicBarrier源码分析

作者头像
冬天里的懒猫
发布2020-11-24 15:29:24
4070
发布2020-11-24 15:29:24
举报

CyclicBarrier是java并发包中的常用工具之一,通常被用来与CountDownlatch对比。CyclicBarrier能够实现CountDownLatch的效果,此外还能重复使用,而CountDownLatch则只能做一次计数。但是对于实现的源码而言,CyclicBarrier对于CountDowLatch有着本质的不同。CountDownLatch基于AQS的等待队列实现。而CyclicBarrier则依赖于ReentrantLock的Condition。我们前面在介绍AQS的时候知道,Condition与AQS本身的等待队列是采用的不同的队列。

1.类注释

CyclicBarrier是一种同步工具,它允许一组线程全部互相等待以到达一个公共的障碍点。CyclicBarrier在固定线程数量的程序中很有用。该线程有时会必须互相等待,该屏障称为cyclic,因为其可以在释放等待线程之后重新使用。 CyclicBarrier支持可选的Runnable命令,该命令在屏障的最后一个线程到达之后,在释放任何线程之前,每个屏障点操作一次,屏障操作对于在任何一方继续之前更新共享状态很有用。 示例如下:

代码语言:javascript
复制
class Solver {
    final int N;
    final float[][] data;
    final CyclicBarrier barrier;
 
    class Worker implements Runnable {
      int myRow;
      Worker(int row) { myRow = row; }
      public void run() {
        while (!done()) {
          processRow(myRow);
 
          try {
            barrier.await();
          } catch (InterruptedException ex) {
            return;
          } catch (BrokenBarrierException ex) {
            return;
          }
        }
      }
    }
 
    public Solver(float[][] matrix) {
      data = matrix;
      N = matrix.length;
      Runnable barrierAction =
        new Runnable() { public void run() { mergeRows(...); }};
      barrier = new CyclicBarrier(N, barrierAction);
 
      List<Thread> threads = new ArrayList<Thread>(N);
      for (int i = 0; i < N; i++) {
        Thread thread = new Thread(new Worker(i));
        threads.add(thread);
        thread.start();
      }
 
      // wait until done
      for (Thread thread : threads)
        thread.join();
    }
  }}

在这个代码中,每个工作线程处理将会处理矩阵的一行,然后在内存的屏障处等待,直到所有的行都处理完毕,处理所有的行之后,将执行Runnable屏障操作并将其合并,如果合并确定已找到解决方案,则执行done(),并将返回true。并且每个worker将终止。 如果屏障操作不依赖于执行时暂停的各方,则该方中的任何线程都可以在释放该操作时执行该操作。为方便起见,每次调用await都会返回该线程在屏障处的到达索引。然后,您可以选择哪个线程应执行屏障操作,例如:

代码语言:javascript
复制
  if (barrier.await() == 0) {
    // log the completion of this iteration
  }}

CyclicBarrier对于失败的同步尝试使用全无损模型,如果线程由于中断,失败,或者超时而过早离开屏障点,则在该屏障点等待的其他所有线程也将通过异常离开,BrokenBarrierException或者InterruptedException。(如果它们几乎同时被中断)

2.类结构及构造函数

CyclicBarrier的类结构如下:

image.png
image.png

在CyclicBarrier内部,存在一个Generation内部类。

代码语言:javascript
复制
 private static class Generation {
        boolean broken = false;
    }

这个类的结构非常简单。但是在CyclicBarrier中作用非常关键,在Barrier的每次使用的时候,都会生成一个实例,每单CyclicBarrier被触发或者重置的时候,生成都会更改,这使得Barrier可以与线程关联产生很多Generation。由于这种不确定性,可以将锁分配给等待的线程,但是一次只能激活其中之一,其余的需要全部等待。如果有中断但是没有后续的重置,则不需要可用的Generation。 提供的构造函数如下:

2.1 CyclicBarrier(int parties, Runnable barrierAction)

代码语言:javascript
复制
public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

2.2 CyclicBarrier(int parties)

此方法实际上是调用的前面的方法。无command处理。

代码语言:javascript
复制
public CyclicBarrier(int parties) {
    this(parties, null);
}

3.成员变量及常量

变量及常量

类型

说明

lock

private final ReentrantLock

用于保护屏障的锁

trip

private final Condition

等待条件,直到条件满足。

parties

private final int

参与方数量。

barrierCommand

private final Runnable

条件达到之后需要允许的线程。

generation

private Generation generation

当前的Generation

count

private int

仍在等待的参与方数量。按照参与方数量不断倒数,直到为0,则产生一个新的Generation。

实际上通过变量表可以看出,Generation是一代的意思,那么也就是说,每次产生一个Generation,当这个Generation对应的count为0之后,再产生一个新的Genrtation。

4.重要方法

4.1 nextGeneration

此方法将更新barrier的状态,并将等待的线程都唤醒。但是调用这个方法需要持有锁。

代码语言:javascript
复制
private void nextGeneration() {
    // signal completion of last generation
    //调用trip的signalAll方法
    trip.signalAll();
    // set up next generation
    //重置count,count随着到达的parties而减少
    count = parties;
    //产生新的gereration
    generation = new Generation();
}

4.2 breakBarrier

设置当前的barrier的broken状态为true,并唤醒所有人。仅在锁定时调用。

代码语言:javascript
复制
private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

4.3 dowait

主要的屏障代码,涵盖了各种策略。

代码语言:javascript
复制
private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
           TimeoutException {
    //持有的锁
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        //g为当前的generation
        final Generation g = generation;
        //如果g的briken为true,则屏障失效,并抛出异常
        if (g.broken)
            throw new BrokenBarrierException();
        //如果线程被重点,则break屏障,且抛出异常
        if (Thread.interrupted()) {
            breakBarrier();
            throw new InterruptedException();
        }
        //index为count减去1,index从0开始
        int index = --count;
        //如果index为0 说明已经截止,需要重新产生
        if (index == 0) {  // tripped
            //如果ranAction为false
            boolean ranAction = false;
            try {
                //定义command 
                final Runnable command = barrierCommand;
                //如果command不为空,则执行run
                if (command != null)
                    command.run();
                //之后将runActrion修改为true
                ranAction = true;
                //产生下一个Generation
                nextGeneration();
                //返回0
                return 0;
            } finally {
                //如果ranAtion不为true,则强行执行berak
                if (!ranAction)
                    breakBarrier();
            }
        }

        // loop until tripped, broken, interrupted, or timed out
        //循环,直到条件被触发
        //死循环
        for (;;) {
            try {
                //如果传入没有等待时间,则await无限期等待
                if (!timed)
                    trip.await();
                //反之根据传入的nanos判断
                else if (nanos > 0L)
                    nanos = trip.awaitNanos(nanos);
            //异常处理
            } catch (InterruptedException ie) {
                //如果g还是原有的generation,且没有被broken,则抛出异常
                if (g == generation && ! g.broken) {
                    breakBarrier();
                    throw ie;
                //反之
                } else {
                    // We're about to finish waiting even if we had not
                    // been interrupted, so this interrupt is deemed to
                    // "belong" to subsequent execution.
                    //执行中断 Thread.currentThread().interrupt();
                }
            }
            //中断抛出异常
            if (g.broken)
                throw new BrokenBarrierException();
             //g没有更新 则返回index
            if (g != generation)
                return index;
             //如果timed且nanos小于0 
            if (timed && nanos <= 0L) {
                //将barrier berak
                breakBarrier();
                throw new TimeoutException();
            }
        }
    } finally {
        //解锁
        lock.unlock();
    }
}

4.4 await

调用dowait方法,不带时间,无限期等待。

代码语言:javascript
复制
public int await() throws InterruptedException, BrokenBarrierException {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        throw new Error(toe); // cannot happen
    }
}

4.5 await(long timeout, TimeUnit unit)

携带等待时间,调用dowait方法。

代码语言:javascript
复制
public int await(long timeout, TimeUnit unit)
    throws InterruptedException,
           BrokenBarrierException,
           TimeoutException {
    return dowait(true, unit.toNanos(timeout));
}

4.6 isBroken

判断当前barrier是否被broken

代码语言:javascript
复制
public boolean isBroken() {
    final ReentrantLock lock = this.lock;
    //加锁
    lock.lock();
    try {
        //返回状态
        return generation.broken;
    } finally {
        lock.unlock();
    }
}

4.7 reset

重置。

代码语言:javascript
复制
public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    //加锁,break 当前barrier
    try {
        breakBarrier();   // break the current generation
        //产生下一个Generation
        nextGeneration(); // start a new generation
    } finally {
        lock.unlock();
    }
}

4.8 getNumberWaiting

代码语言:javascript
复制
public int getNumberWaiting() {
    //加锁 
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //用parties减去count得到当前的waiting
        return parties - count;
    } finally {
        lock.unlock();
    }
}

5.总结

本文对CyclicBarrier的源码进行了分析,可以看到,CyclicBarrier是在AQS之上,基于ReentrantLock和Condition的综合应用。其核心是,每次计数,都产生一个Generate。之后根据参与者个数,计算阻塞的数量,当阻塞的线程达到参与者数量之后,就唤醒全部等待线程,然后产生一个新的Generate。而Generate内部只有一个boolean的变量,可以通过修改这个boolean的状态将Generate break。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2020-11-16 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.类注释
  • 2.类结构及构造函数
    • 2.1 CyclicBarrier(int parties, Runnable barrierAction)
      • 2.2 CyclicBarrier(int parties)
      • 3.成员变量及常量
      • 4.重要方法
        • 4.1 nextGeneration
          • 4.2 breakBarrier
            • 4.3 dowait
              • 4.4 await
                • 4.5 await(long timeout, TimeUnit unit)
                  • 4.6 isBroken
                    • 4.7 reset
                      • 4.8 getNumberWaiting
                      • 5.总结
                      领券
                      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档