专栏首页丑胖侠Zookeeper开源客户端Curator之事件监听详解

Zookeeper开源客户端Curator之事件监听详解

Curator对Zookeeper典型场景之事件监听进行封装,提供了使用参考。这篇博文笔者带领大家了解一下Curator的实现方式。

引入依赖

对于Curator封装Zookeeper的典型场景使用都放在了recipes中。因此,使用之前需先引入此依赖。

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

以下实例默认节点“/p1”已经被创建切存在于Zookeeper服务器上的。

监听方式一

利用Watcher来对节点进行监听操作,但此监听操作只能监听一次,与原生API并无太大差异。如有典型业务场景需要使用可考虑,但一般情况不推荐使用。下面是具体的使用案例。

package com.secbro.learn.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;

/**
 * Created by zhuzs on 2017/4/14.
 */
public class CuratorListenerTest1{
    public static void main(String[] args) {
        CuratorFramework client = getClient();
        String path = "/p1";

        try {
            byte[] content = client.getData().usingWatcher(new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("监听器watchedEvent:" + watchedEvent);
                }
            }).forPath(path);

            System.out.println("监听节点内容:" + new String(content));

            // 第一次变更节点数据
            client.setData().forPath(path,"new content".getBytes());

            // 第二次变更节点数据
            client.setData().forPath(path,"second content".getBytes());

            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
            client.close();
        } finally {
            client.close();
        }

    }

    private static CuratorFramework getClient(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.0:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(3000)
                .namespace("demo")
                .build();
        client.start();
        return client;
    }
}

执行结果:

监听节点内容:new content
监听器watchedEvent:WatchedEvent state:SyncConnected type:NodeDataChanged path:/p1

执行此程序之后,首先会对节点/p1注册一个Watcher监听事件,同时返回当前节点的内容信息。随后改变节点内容为“new content”,此时触发监听事件,并打印出监听事件信息。但当第二次改变节点内容时,监听已经失效,无法再次获得节点变动事件。

方法二

CuratorListener监听,此监听主要针对background通知和错误通知。使用此监听器之后,调用inBackground方法会异步获得监听,而对于节点的创建或修改则不会触发监听事件。具体实例代码如下:

package com.secbro.learn.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * Created by zhuzs on 2017/4/14.
 */
public class CuratorListenerTest1 {
    public static void main(String[] args) {
        CuratorFramework client = getClient();
        String path = "/p1";

        try {
            CuratorListener listener = new CuratorListener() {
                @Override
                public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception {
                    System.out.println("监听事件触发,event内容为:" + event);
                }
            };
            client.getCuratorListenable().addListener(listener);
            // 异步获取节点数据
            client.getData().inBackground().forPath(path);
            // 变更节点内容
            client.setData().forPath(path,"123".getBytes());

            Thread.sleep(Integer.MAX_VALUE);
        } catch (Exception e) {
            e.printStackTrace();
            client.close();
        } finally {
            client.close();
        }

    }

    private static CuratorFramework getClient(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(3000)
                .namespace("demo")
                .build();
        client.start();
        return client;
    }
}

执行结果为:

监听事件触发,event内容为:CuratorEventImpl{type=WATCHED, resultCode=3, path='null', name='null', children=null, context=null, stat=null, data=null, watchedEvent=WatchedEvent state:SyncConnected type:None path:null, aclList=null}
监听事件触发,event内容为:CuratorEventImpl{type=GET_DATA, resultCode=0, path='/p1', name='null', children=null, context=null, stat=17814,18054,1491458317592,1492218568138,12,0,0,0,3,0,17814
, data=[49, 50, 51], watchedEvent=null, aclList=null}

其中两次触发监听事件,第一次触发为注册监听事件时触发,第二次为getData异步处理返回结果时触发。而setData的方法并未触发监听事件。

方法三

Curator引入了Cache来实现对Zookeeper服务端事件监听,Cache事件监听可以理解为一个本地缓存视图与远程Zookeeper视图的对比过程。Cache提供了反复注册的功能。Cache分为两类注册类型:节点监听和子节点监听。

NodeCache

用于监听数据节点本身的变化。提供了两个构造方法:

public NodeCache(CuratorFramework client, String path)

public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

其中参数dataIsCompressed表示是否对数据进行压缩,而第一个方法内部实现为调用第二个方法,且dataIsCompressed默认设为false。

对节点的监听需要配合回调函数来进行处理接收到监听事件之后的业务处理。NodeCache通过NodeCacheListener来完成后续处理。具体代码示例如下:

package com.secbro.learn.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * Created by zhuzs on 2017/4/15.
 */
public class CuratorNodeCacheTest {

    public static void main(String[] args) throws Exception {

        CuratorFramework client = getClient();
        String path = "/p1";
        final NodeCache nodeCache = new NodeCache(client,path);
        nodeCache.start();
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("监听事件触发");
                System.out.println("重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData()));
            }
        });
        client.setData().forPath(path,"456".getBytes());
        client.setData().forPath(path,"789".getBytes());
        client.setData().forPath(path,"123".getBytes());
        client.setData().forPath(path,"222".getBytes());
        client.setData().forPath(path,"333".getBytes());
        client.setData().forPath(path,"444".getBytes());
        Thread.sleep(15000);

    }

    private static CuratorFramework getClient(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(3000)
                .namespace("demo")
                .build();
        client.start();
        return client;
    }
}

执行结果:

监听事件触发
重新获得节点内容为:123
监听事件触发
重新获得节点内容为:333
监听事件触发
重新获得节点内容为:444

NodeCache的start方法有一个带Boolean参数的方法,如果设置为true则在首次启动时就会缓存节点内容到Cache中。

经过试验,发现注册监听之后,如果先后多次修改监听节点的内容,部分监听事件会发生丢失现象。其他版本未验证,此版本此处需特别留意。

NodeCache不仅可以监听节点内容变化,还可以监听指定节点是否存在。如果原本节点不存在,那么Cache就会在节点被创建时触发监听事件,如果该节点被删除,就无法再触发监听事件。

PathChildrenCache

PathChildrenCache用于监听数据节点子节点的变化情况。当前版本总共提供了7个构造方法,其中2个已经不建议使用了。

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)

常见的参数就不再具体说明了。其中cacheData表示是否把节点内容缓存起来,如果为true,那么接收到节点列表变更的同时会将获得节点内容。

ExecutorService 和threadFactory提供了通过线程池的方式来处理监听事件。

PathChildrenCache使用PathChildrenCacheListener来处理监听事件。具体使用方法见代码实例:

package com.secbro.learn.curator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

/**
 * Created by zhuzs on 2017/4/15.
 */
public class CuratorPathChildrenCacheTest {

    public static void main(String[] args) throws Exception {

        CuratorFramework client = getClient();
        String parentPath = "/p1";

        PathChildrenCache pathChildrenCache = new PathChildrenCache(client,parentPath,true);
        pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
        pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                System.out.println("事件类型:"  + event.getType() + ";操作节点:" + event.getData().getPath());
            }
        });

        String path = "/p1/c1";
        client.create().withMode(CreateMode.PERSISTENT).forPath(path);
        Thread.sleep(1000); // 此处需留意,如果没有现成睡眠则无法触发监听事件
        client.delete().forPath(path);

        Thread.sleep(15000);

    }

    private static CuratorFramework getClient(){
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .retryPolicy(retryPolicy)
                .sessionTimeoutMs(6000)
                .connectionTimeoutMs(3000)
                .namespace("demo")
                .build();
        client.start();
        return client;
    }
}

打印结果为:

事件类型:CHILD_ADDED;操作节点:/p1/c1
事件类型:CHILD_REMOVED;操作节点:/p1/c1

PathChildrenCache不会对二级子节点进行监听,只会对子节点进行监听。看上面的实例会发现在创建子节点和删除子节点两个操作中间使用了线程睡眠,否则无法接收到监听事件,这也是在使用过程中需要留意的一点。

总结

本篇博客讲述了Curator针对事件监听使用的方法和实例,并点出了其中需要留意的地方。由于篇幅有限部分使用场景未逐一举例说明,可自行尝试。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Zookeeper开源客户端Curator之基本功能讲解

    简介 Curator是Netflix公司开源的一套Zookeeper客户端框架。了解过Zookeeper原生API都会清楚其复杂度。Curator帮助我们在其基...

    用户1161110
  • Zookeeper客户端API之节点检查和权限控制(十一)

    本篇博客介绍一下原生API的节点检查是否存在和节点操作权限控制。Zookeeper提供的方法的使用方式与前面增删改查操作类似,具体使用案例不再逐一用实例说明。 ...

    用户1161110
  • Zookeeper客户端API之修改删除节点(十)

    上篇博客《Zookeeper客户端API之读取子节点内容(九)》我们介绍了Zookeeper获得节点内容的方法使用,其中实例代码中已经用到了修改节点内容的方法。...

    用户1161110
  • 怎么设计高效的敏感词过滤系统(一)

    IM项目需要对上边传输的消息进行必要的过滤。如果总是对着某人输入f**k就显得不太文明了。

    普通程序员
  • h5群聊天室|h5仿微信聊天室|h5直播聊天

    今年的FIFA世界杯甚是精彩,最近兴致高涨就利用HTML5开发了一个手机端仿微信界面聊天室,该h5聊天室采用750px全新伸缩flex布局,以及使用rem响应式...

    andy2018
  • 【程序源代码】又一个小商城

    又一个小商城。Spring Boot后端 + Vue管理员前端 + 微信小程序用户前端 + Vue用户移动端

    程序源代码
  • 搭建Lotus测试网集群挖矿集群

    以下设置是在Lotus上密封32个GiB扇区的最小示例: 2 TB硬盘空间。 8核CPU 128 GiB的RAM

    莲花海
  • 基于JWS的游戏运维服务化平台实现

    经过一个多月的多方沟通和协调,这个平台(JAE,JWS App Engine)终于引来了多方合作的机会,本周也正是立项启动,进入开发者模式。虽然Q2还剩下短...

    用户1593318
  • 使用python批量编译Qt工程脚本

    Qt君
  • Java 程序的主类 ,小程序的主类

    ●在 Java 应用程序中,这个主类是指包含 main()方法的类。主类是 Java 程序执行的入口点。 ●在 Java 小程序中,这个主类是一个继承自系统类...

    赵哥窟

扫码关注云+社区

领取腾讯云代金券