前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ZooKeeper实现同步屏障(Barrier)

ZooKeeper实现同步屏障(Barrier)

作者头像
程序猿讲故事
发布2019-09-26 16:29:51
5770
发布2019-09-26 16:29:51
举报
文章被收录于专栏:程序猿讲故事程序猿讲故事

按照维基百科的解释:同步屏障(Barrier)是并行计算中的一种同步方法。对于一群进程或线程,程序中的一个同步屏障意味着任何线程/进程执行到此后必须等待,直到所有线程/进程都到达此点才可继续执行下文。

在ZK官网https://zookeeper.apache.org/doc/current/zookeeperTutorial.html ,提供了一个示例实现,但这个例子比较复杂,代码同时包括了Barrier和Queue两种实现,对例子做了修改,仅介绍Barrier的实现。

使用请客吃饭的场景:一张桌子坐四个人,四个人都到齐后,才能开饭;四个人都吃完以后,才能离开。

1 实现原理

为一个餐桌创建一个节点如/table-3,每一个客人是它的一个子节点/table-3/张三。所有客人都监听/table-3的事件,收到事件后检查子节点个数,如果达到要求的人数就开饭;当吃完以后,删除自己的子节点,并继续监听/table-3的事件,当子节点个数为0时,退出程序。

2 客人落座

落座的流程分两步:首先,创建自己的子节点;然后,等待其他客人落座直到坐满。创建客人子节点时CreateMode使用的是CreateMode.EPHEMERAL,这是属于当前zk会话的节点,当会话关闭时,如果节点没有删除,ZK会自动删除。

代码语言:javascript
复制
tring nodeName = tableSerial + "/" + customerName;
log.info("{}: 自己坐下来 {}", customerName, nodeName);
zk.create(nodeName, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
while (true) {
  synchronized (mutex) {
    // 读出子节点列表,并继续监听
    List<String> list = zk.getChildren(tableSerial, true);
    if (list.size() < tableCapacity) {
      log.info("{}: 当前人数 = {} , 总人数 = {}, 人还不够: 吃饭不积极,一定有问题...", customerName,
          list.size(), tableCapacity);
      mutex.wait();
    } else {
      log.info("{}: 人终于够了,开饭...", customerName);
      return true;
    }
  }
}

3 客人准备离开

客人准备离开的逻辑同落座类似,首先删除自己的子节点,然后判断是否所有的子节点都已经被删除。删除子节点时,直接设置版本号为0,这是因为在这个示例中创建后没有修改过数据。真实业务场景,应该先读出zk中数据的版本号,然后作为参数传入到delete命令。

代码语言:javascript
复制
String nodeName = tableSerial + "/" + customerName;
log.info("{}: 已经吃完,准备离席,删除节点 {}", customerName, nodeName);
zk.delete(nodeName, 0);
while (true) {
  // 读出子节点列表,并继续监听
  List<String> list = zk.getChildren(tableSerial, true);
  if (list.size() > 0) {
    log.info("{}: 还有 {} 人没吃完,你们吃快点...", customerName, list.size());
    synchronized (mutex) {
      mutex.wait();
    }
  } else {
    log.info("{}: 所有人都吃完了,准备散伙", customerName);
    return true;
  }
}

4 尝试用Stat获取子节点个数

代码中使用getChildren获取子节点列表,然后统计个数。ZooKeeper还有另一个方法也能获取子节点数:org.apache.zookeeper.data.Stat#numChildren。

将代码leave修改为

代码语言:javascript
复制
String nodeName = tableSerial + "/" + customerName;
log.info("{}: 已经吃完,准备离席,删除节点 {}", customerName, nodeName);
zk.delete(nodeName, 0);
while (true) {
  // 使用Stat判断子节点个数
  Stat tableStat = new Stat();
  zk.getData(tableSerial, true, tableStat);
  if (tableStat.getNumChildren() > 0) {
    log.info("{}: 还有 {} 人没吃完,你们吃快点...", customerName, tableStat.getNumChildren());
    synchronized (mutex) {
      mutex.wait();
    }
  } else {
    log.info("{}: 所有人都吃完了,准备散伙", customerName);
    return true;
  }
}

运行后发现:能够读出子节点个数,但再也无法监听 EventType.NodeChildrenChanged事件,这是ZooKeeper的监听机制决定的。网上搜索到 https://my.oschina.net/u/587108/blog/484203 有介绍,可以自己看一下。简单说就是:

getData()和exists()会监听节点自己的NodeCreated、NodeDeleted、NodeDataChanged事件;getChildren()会监听节点的NodeChildrenChanged事件。

5 完整源码

这个例子没有使用main()函数,改为创建一个 testng 测试用例启动。

5.1 ZooKeeperBarrier.java
代码语言:javascript
复制
package tech.codestory.zookeeper.barrier;

 

import java.io.IOException;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.*;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.data.Stat;

import org.slf4j.profiler.Profiler;

import lombok.extern.slf4j.Slf4j;

 

/**

 * @author junyongliao

 * @date 2019/8/13

 * @since 1.0.0

 */

@Slf4j

public class ZooKeeperBarrier implements Watcher {

  /** 等待连接建立成功的信号 */

  CountDownLatch connectedSemaphore = new CountDownLatch(1);

  /** ZooKeeper 客户端 static */

  ZooKeeper zk = null;

  /** 子节点发生变化的信号 static */

  Integer mutex;

  /** 避免重复构建餐桌 */

  static Integer tableSerialInitial = Integer.valueOf(1);

 

  /** 餐桌容量 */

  int tableCapacity;

  /** 餐桌编号 */

  String tableSerial;

 

  /** 客人姓名 */

  String customerName;

 

  /**

   * 构造函数,用于创建zk客户端,以及记录记录barrier的名称和容量

   *

   * @param address ZooKeeper服务器地址

   * @param tableSerial 餐桌编号

   * @param tableCapacity 餐桌容量

   * @param customerName 客人姓名

   */

  ZooKeeperBarrier(String address, String tableSerial, int tableCapacity, String customerName) {

    this.tableSerial = tableSerial;

    this.tableCapacity = tableCapacity;

    this.customerName = customerName;

 

    try {

      Profiler profiler = new Profiler(customerName + " 连接到ZooKeeper");

      profiler.start("开始连接");

      zk = new ZooKeeper(address, 3000, this);

 

      profiler.start("等待连接成功的Event");

      connectedSemaphore.await();

      profiler.stop();

      profiler.setLogger(log);

      profiler.log();

 

      mutex = Integer.valueOf(-1);

    } catch (IOException e) {

      log.error("IOException", e);

      zk = null;

    } catch (InterruptedException e) {

      log.error("InterruptedException", e);

    }

 

    synchronized (tableSerialInitial) {

      // 创建 tableSerial 的zNode

      try {

        Stat existsStat = zk.exists(tableSerial, false);

        if (existsStat == null) {

          this.tableSerial = zk.create(tableSerial, new byte[0], Ids.OPEN_ACL_UNSAFE,

              CreateMode.PERSISTENT);

        }

      } catch (KeeperException e) {

        log.error("KeeperException", e);

      } catch (InterruptedException e) {

        log.error("InterruptedException", e);

      }

    }

  }

 

  @Override

  public void process(WatchedEvent event) {

    if (Event.EventType.None.equals(event.getType())) {

      // 连接状态发生变化

      if (Event.KeeperState.SyncConnected.equals(event.getState())) {

        // 连接建立成功

        connectedSemaphore.countDown();

      }

    } else if (Event.EventType.NodeChildrenChanged.equals(event.getType())) {

      log.info("{} 接收到了通知 : {}", customerName, event.getType());

      // 子节点有变化

      synchronized (mutex) {

        mutex.notify();

      }

    }

  }

 

  /**

   * 客人坐在饭桌上

   *

   * @return 当等到餐桌坐满时返回 true

   * @throws KeeperException

   * @throws InterruptedException

   */

  boolean enter() throws KeeperException, InterruptedException {

    String nodeName = tableSerial + "/" + customerName;

    log.info("{}: 自己坐下来 {}", customerName, nodeName);

    // 属于客人自己的节点,如果会话结束没删掉会自动删除

    zk.create(nodeName, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

    while (true) {

      synchronized (mutex) {

        // 读出子节点列表,并继续监听

        List<String> list = zk.getChildren(tableSerial, true);

        if (list.size() < tableCapacity) {

          log.info("{}: 当前人数 = {} , 总人数 = {}, 人还不够: 吃饭不积极,一定有问题...", customerName,

              list.size(), tableCapacity);

          mutex.wait();

        } else {

          log.info("{}: 人终于够了,开饭...", customerName);

          return true;

        }

      }

    }

  }

 

  /**

   * 客人吃完饭了,可以离开

   *

   * @return 所有客人都吃完,再返回true

   * @throws KeeperException

   * @throws InterruptedException

   */

  boolean leave() throws KeeperException, InterruptedException {

    String nodeName = tableSerial + "/" + customerName;

    log.info("{}: 已经吃完,准备离席,删除节点 {}", customerName, nodeName);

    zk.delete(nodeName, 0);

    while (true) {

      // 读出子节点列表,并继续监听

      List<String> list = zk.getChildren(tableSerial, true);

      if (list.size() > 0) {

        log.info("{}: 还有 {} 人没吃完,你们吃快点...", customerName, list.size());

        synchronized (mutex) {

          mutex.wait();

        }

      } else {

        log.info("{}: 所有人都吃完了,准备散伙", customerName);

        return true;

      }

    }

  }

}
5.2 ZooKeeperBarrierTest.java
代码语言:javascript
复制
package tech.codestory.zookeeper.barrier;

 

import lombok.extern.slf4j.Slf4j;

import org.apache.zookeeper.KeeperException;

import org.testng.annotations.Test;

import java.security.SecureRandom;

import java.util.Random;

import java.util.concurrent.CountDownLatch;

import static org.testng.Assert.*;

 

/**

 * 测试 ZooKeeperBarrier

 *

 * @author code story

 * @date 2019/8/15

 */

@Slf4j

public class ZooKeeperBarrierTest {

  Random random = new SecureRandom();

 

  @Test

  public void testBarrierTest() {

    /** 等待连接建立成功的信号 */

 

    String address = "192.168.5.128:2181";

    String barrierName = "/table-" + random.nextInt(10);

    int barrierSize = 4;

 

    CountDownLatch countDown = new CountDownLatch(barrierSize);

    String[] customerNames = {"张三", "李四", "王五", "赵六"};

    for (int i = 0; i < barrierSize; i++) {

      String customerName = customerNames[i];

      new Thread() {

        @Override

        public void run() {

          log.info("{}: 准备吃饭", customerName);

          ZooKeeperBarrier barrier =

              new ZooKeeperBarrier(address, barrierName, barrierSize, customerName);

          try {

            boolean flag = barrier.enter();

            log.info("{}: 坐在了可以容纳 {} 人的饭桌", customerName, barrierSize);

            if (!flag) {

              log.info("{}: 想坐在饭桌时出错了", customerName);

            }

          } catch (KeeperException e) {

            log.error("KeeperException", e);

          } catch (InterruptedException e) {

            log.error("InterruptedException", e);

          }

 

          // 假装在吃饭,随机时间

          randomWait();

 

          // 假装吃完了,离开barrier

          try {

            barrier.leave();

          } catch (KeeperException e) {

            log.error("KeeperException", e);

          } catch (InterruptedException e) {

            log.error("InterruptedException", e);

          }

          countDown.countDown();

        }

      }.start();

 

      // 等一会儿再开始下一个进程

      randomWait();

    }

 

    try {

      countDown.await();

      log.info("这一桌吃完了,散伙");

    } catch (InterruptedException e) {

      log.error("InterruptedException", e);

    }

  }

 

  /** 随机等待 */

  private void randomWait() {

    int r = random.nextInt(100);

    for (int j = 0; j < r; j++) {

      try {

        Thread.sleep(100);

      } catch (InterruptedException e) {

        log.error("InterruptedException", e);

      }

    }

  }

}

6 测试日志

如下是测试日志

代码语言:javascript
复制
33:34.198 [INFO] ZooKeeperBarrierTest.run(36) 张三: 准备吃饭

33:40.497 [INFO] ZooKeeperBarrierTest.run(36) 李四: 准备吃饭

 

33:43.333 [DEBUG] ZooKeeperBarrier.log(201)

+ Profiler [张三 连接到ZooKeeper]

|-- elapsed time                   [开始连接]    71.684 milliseconds.

|-- elapsed time           [等待连接成功的Event]  9046.279 milliseconds.

|-- Total               [张三 连接到ZooKeeper]  9118.483 milliseconds.

 

33:43.346 [INFO] ZooKeeperBarrier.enter(110) 张三: 自己坐下来 /table-2/张三

33:43.353 [INFO] ZooKeeperBarrier.enter(118) 张三: 当前人数 = 1 , 总人数 = 4, 人还不够: 吃饭不积极,一定有问题...

 

33:49.515 [DEBUG] ZooKeeperBarrier.log(201)

+ Profiler [李四 连接到ZooKeeper]

|-- elapsed time                   [开始连接]     4.365 milliseconds.

|-- elapsed time           [等待连接成功的Event]  9011.503 milliseconds.

|-- Total               [李四 连接到ZooKeeper]  9015.873 milliseconds.

 

33:49.520 [INFO] ZooKeeperBarrier.enter(110) 李四: 自己坐下来 /table-2/李四

33:49.528 [INFO] ZooKeeperBarrier.process(93) 张三 接收到了通知 : NodeChildrenChanged

33:49.528 [INFO] ZooKeeperBarrier.enter(118) 李四: 当前人数 = 2 , 总人数 = 4, 人还不够: 吃饭不积极,一定有问题...

33:49.532 [INFO] ZooKeeperBarrier.enter(118) 张三: 当前人数 = 2 , 总人数 = 4, 人还不够: 吃饭不积极,一定有问题...

33:50.107 [INFO] ZooKeeperBarrierTest.run(36) 王五: 准备吃饭

33:50.307 [INFO] ZooKeeperBarrierTest.run(36) 赵六: 准备吃饭

 

33:59.122 [DEBUG] ZooKeeperBarrier.log(201)

+ Profiler [王五 连接到ZooKeeper]

|-- elapsed time                   [开始连接]     4.956 milliseconds.

|-- elapsed time           [等待连接成功的Event]  9008.505 milliseconds.

|-- Total               [王五 连接到ZooKeeper]  9013.468 milliseconds.

 

33:59.125 [INFO] ZooKeeperBarrier.enter(110) 王五: 自己坐下来 /table-2/王五

33:59.128 [INFO] ZooKeeperBarrier.process(93) 张三 接收到了通知 : NodeChildrenChanged

33:59.132 [INFO] ZooKeeperBarrier.process(93) 李四 接收到了通知 : NodeChildrenChanged

33:59.133 [INFO] ZooKeeperBarrier.enter(118) 李四: 当前人数 = 3 , 总人数 = 4, 人还不够: 吃饭不积极,一定有问题...

33:59.135 [INFO] ZooKeeperBarrier.enter(118) 王五: 当前人数 = 3 , 总人数 = 4, 人还不够: 吃饭不积极,一定有问题...

33:59.136 [INFO] ZooKeeperBarrier.enter(118) 张三: 当前人数 = 3 , 总人数 = 4, 人还不够: 吃饭不积极,一定有问题...

 

33:59.335 [DEBUG] ZooKeeperBarrier.log(201)

+ Profiler [赵六 连接到ZooKeeper]

|-- elapsed time                   [开始连接]    10.184 milliseconds.

|-- elapsed time           [等待连接成功的Event]  9014.981 milliseconds.

|-- Total               [赵六 连接到ZooKeeper]  9025.175 milliseconds.

 

33:59.339 [INFO] ZooKeeperBarrier.enter(110) 赵六: 自己坐下来 /table-2/赵六

33:59.343 [INFO] ZooKeeperBarrier.process(93) 张三 接收到了通知 : NodeChildrenChanged

33:59.345 [INFO] ZooKeeperBarrier.enter(122) 赵六: 人终于够了,开饭...

33:59.346 [INFO] ZooKeeperBarrierTest.run(41) 赵六: 坐在了可以容纳 4 人的饭桌

33:59.346 [INFO] ZooKeeperBarrier.process(93) 王五 接收到了通知 : NodeChildrenChanged

33:59.346 [INFO] ZooKeeperBarrier.process(93) 李四 接收到了通知 : NodeChildrenChanged

33:59.348 [INFO] ZooKeeperBarrier.enter(122) 王五: 人终于够了,开饭...

33:59.348 [INFO] ZooKeeperBarrierTest.run(41) 王五: 坐在了可以容纳 4 人的饭桌

33:59.350 [INFO] ZooKeeperBarrier.enter(122) 李四: 人终于够了,开饭...

33:59.350 [INFO] ZooKeeperBarrierTest.run(41) 李四: 坐在了可以容纳 4 人的饭桌

33:59.352 [INFO] ZooKeeperBarrier.enter(122) 张三: 人终于够了,开饭...

33:59.352 [INFO] ZooKeeperBarrierTest.run(41) 张三: 坐在了可以容纳 4 人的饭桌

33:59.646 [INFO] ZooKeeperBarrier.leave(138) 赵六: 已经吃完,准备离席,删除节点 /table-2/赵六

33:59.650 [INFO] ZooKeeperBarrier.process(93) 赵六 接收到了通知 : NodeChildrenChanged

33:59.651 [INFO] ZooKeeperBarrier.process(93) 张三 接收到了通知 : NodeChildrenChanged

33:59.652 [INFO] ZooKeeperBarrier.leave(144) 赵六: 还有 3 人没吃完,你们吃快点...

33:59.652 [INFO] ZooKeeperBarrier.process(93) 李四 接收到了通知 : NodeChildrenChanged

33:59.652 [INFO] ZooKeeperBarrier.process(93) 王五 接收到了通知 : NodeChildrenChanged

33:59.654 [INFO] ZooKeeperBarrier.leave(144) 赵六: 还有 3 人没吃完,你们吃快点...

34:04.356 [INFO] ZooKeeperBarrier.leave(138) 王五: 已经吃完,准备离席,删除节点 /table-2/王五

34:04.361 [INFO] ZooKeeperBarrier.process(93) 赵六 接收到了通知 : NodeChildrenChanged

34:04.363 [INFO] ZooKeeperBarrier.leave(144) 王五: 还有 2 人没吃完,你们吃快点...

34:04.363 [INFO] ZooKeeperBarrier.leave(144) 赵六: 还有 2 人没吃完,你们吃快点...

34:05.958 [INFO] ZooKeeperBarrier.leave(138) 张三: 已经吃完,准备离席,删除节点 /table-2/张三

34:05.963 [INFO] ZooKeeperBarrier.process(93) 王五 接收到了通知 : NodeChildrenChanged

34:05.961 [INFO] ZooKeeperBarrier.leave(138) 李四: 已经吃完,准备离席,删除节点 /table-2/李四

34:05.967 [INFO] ZooKeeperBarrier.leave(144) 张三: 还有 1 人没吃完,你们吃快点...

34:05.968 [INFO] ZooKeeperBarrier.process(93) 赵六 接收到了通知 : NodeChildrenChanged

34:05.971 [INFO] ZooKeeperBarrier.process(93) 张三 接收到了通知 : NodeChildrenChanged

34:05.973 [INFO] ZooKeeperBarrier.leave(149) 赵六: 所有人都吃完了,准备散伙

34:05.981 [INFO] ZooKeeperBarrier.leave(149) 王五: 所有人都吃完了,准备散伙

34:05.982 [INFO] ZooKeeperBarrier.leave(149) 张三: 所有人都吃完了,准备散伙

34:05.983 [INFO] ZooKeeperBarrier.leave(149) 李四: 所有人都吃完了,准备散伙

34:05.985 [INFO] ZooKeeperBarrierTest.testBarrierTest(72) 这一桌吃完了,散伙
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序猿讲故事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 实现原理
  • 2 客人落座
  • 3 客人准备离开
  • 4 尝试用Stat获取子节点个数
  • 5 完整源码
    • 5.1 ZooKeeperBarrier.java
      • 5.2 ZooKeeperBarrierTest.java
      • 6 测试日志
      相关产品与服务
      GPU 云服务器
      GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档