跟着实例学习ZooKeeper的用法: Barrier

分布式Barrier是这样一个类: 它会阻塞所有节点上的等待进程,知道某一个被满足, 然后所有的节点继续进行。

比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。

栅栏Barrier

DistributedBarrier类实现了栅栏的功能。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)Parameters:
client - client
barrierPath - path to use as the barrier

首先你需要设置栅栏,它将阻塞在它上面等待的线程:

setBarrier();

然后需要阻塞的线程调用“方法等待放行条件:

public void waitOnBarrier()

当条件满足时,移除栅栏,所有等待的线程将继续执行:

removeBarrier();

异常处理 DistributedBarrier 会监控连接状态,当连接断掉时waitOnBarrier()方法会抛出异常。

看一个例子:

package com.colobu.zkrecipe.barrier;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.barriers.DistributedBarrier;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;public class DistributedBarrierExample {    private static final int QTY = 5;    private static final String PATH = "/examples/barrier";    public static void main(String[] args) throws Exception {        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();            for (int i = 0; i < QTY; ++i) {                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);                final int index = i;
                Callable<Void> task = new Callable<Void>() {                    @Override
                    public Void call() throws Exception {

                        Thread.sleep((long) (3 * Math.random()));
                        System.out.println("Client #" + index + " waits on Barrier");
                        barrier.waitOnBarrier();
                        System.out.println("Client #" + index + " begins");                        return null;
                    }
                };
                service.submit(task);
            }

            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");


            controlBarrier.removeBarrier();


            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

        }

    }

}

这个例子创建了controlBarrier来设置栅栏和移除栅栏。 我们创建了5个线程,在此Barrier上等待。 最后移除栅栏后所有的线程才继续执行。

如果你开始不设置栅栏,所有的线程就不会阻塞住。

双栅栏Double Barrier

双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier。 构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,                                int memberQty)Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter方法被调用时,成员被阻塞,直到所有的成员都调用了enter。 当leave方法被调用时,它也阻塞调用线程, 知道所有的成员都调用了leave。 就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。

DistributedBarrier 会监控连接状态,当连接断掉时enter()leave方法会抛出异常。

例子代码:

package com.colobu.zkrecipe.barrier;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.TimeUnit;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.framework.recipes.barriers.DistributedBarrier;import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;public class DistributedBarrierExample {    private static final int QTY = 5;    private static final String PATH = "/examples/barrier";    public static void main(String[] args) throws Exception {        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            ExecutorService service = Executors.newFixedThreadPool(QTY);            for (int i = 0; i < QTY; ++i) {                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);                final int index = i;
                Callable<Void> task = new Callable<Void>() {                    @Override
                    public Void call() throws Exception {

                        Thread.sleep((long) (3 * Math.random()));
                        System.out.println("Client #" + index + " enters");
                        barrier.enter();
                        System.out.println("Client #" + index + " begins");
                        Thread.sleep((long) (3000 * Math.random()));
                        barrier.leave();
                        System.out.println("Client #" + index + " left");                        return null;
                    }
                };
                service.submit(task);
            }


            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

        }

    }
}                                                                        

本文分享自微信公众号 - IT技术精选文摘(ITHK01)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-11-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

ActiveMQ笔记(2):基于ZooKeeper的HA方案

activemq官网给出了3种master/slave的HA方案,详见:http://activemq.apache.org/masterslave.html ...

23160
来自专栏菩提树下的杨过

ZooKeeper 笔记(4) 实战应用之【消除单点故障】

关键节点的单点故障(Single Point of Failure)在大型的架构中,往往是致命的。比如:SOA架构中,服务注册中心(Server Registe...

28490
来自专栏菩提树下的杨过

ZooKeeper 笔记(6) 分布式锁

  目前分布式锁,比较成熟、主流的方案有基于redis及基于zookeeper的二种方案。   大体来讲,基于redis的分布式锁核心指令为SETNX,即如果目...

27180
来自专栏菩提树下的杨过

ZooKeeper 笔记(2) 监听数据变化

ZK中的每个节点都可以存储一些轻量级的数据,这些数据的变化会同步到集群中的其它机器。在应用中程序员可以添加watcher来监听这些数据的变化,watcher只会...

31470
来自专栏菩提树下的杨过

ZooKeeper 笔记(1) 安装部署及hello world

先给一堆学习文档,方便以后查看 官网文档地址大全: OverView(概述) http://zookeeper.apache.org/doc/r3.4.6/zo...

246100
来自专栏菩提树下的杨过

ZooKeeper 笔记(5) ACL(Access Control List)访问控制列表

zk做为分布式架构中的重要中间件,通常会在上面以节点的方式存储一些关键信息,默认情况下,所有应用都可以读写任何节点,在复杂的应用中,这不太安全,ZK通过ACL机...

93760
来自专栏菩提树下的杨过

ZooKeeper 笔记(3) 实战应用之【统一配置管理】

大型应用通常会按业务拆分成一个个业务子系统,这些大大小小的子应用,往往会使用一些公用的资源,比如:需要文件上传、下载时,各子应用都会访问公用的Ftp服务器。如果...

56150
来自专栏张善友的专栏

Windows安装和使用zookeeper

之前整理过一篇文章《zookeeper 分布式锁服务》,本文介绍的 Zookeeper 是以 3.4.5 这个稳定版本为基础,最新的版本可以通过官网 http:...

30190
来自专栏菩提树下的杨过

ActiveMQ笔记(2):基于ZooKeeper的HA方案

activemq官网给出了3种master/slave的HA方案,详见:http://activemq.apache.org/masterslave.html ...

228100
来自专栏张善友的专栏

zookeeper 分布式锁服务

分布式锁服务在大家的项目中或许用的不多,因为大家都把排他放在数据库那一层来挡。当大量的行锁、表锁、事务充斥着数据库的时候。一般web应用很多的瓶颈都在数据库上,...

24380

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励