前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >zookeeper javaApi 事件监听

zookeeper javaApi 事件监听

作者头像
周杰伦本人
发布2022-10-25 16:11:28
1.1K0
发布2022-10-25 16:11:28
举报
文章被收录于专栏:同步文章同步文章

1. 数据存储

事务日志 快照日志 运行时日志 bin/zookeeper.out

2 基于 Java API 初探 zookeeper 的使用

2.1 zookeeper 增删改查

代码语言:javascript
复制
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

/**
 * @author: xiepanpan
 * @Date: 2020/5/30
 * @Description: zookeeper 连接
 */
public class ConnectionDemo {
    public static void main(String[] args) throws Exception {
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper =
                    new ZooKeeper("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183", 4000, new Watcher() {
                        public void process(WatchedEvent watchedEvent) {
                            //如果收到了服务端的响应事件,连接成功
                            if (Event.KeeperState.SyncConnected==watchedEvent.getState()) {
                                countDownLatch.countDown();
                            }
                        }
                    });
            countDownLatch.await();
            //CONNECTED
            System.out.println(zooKeeper.getState());

            //创建节点 第三个参数是权限 OPEN_ACL_UNSAFE 所有人都可以访问 第四个参数 节点类型 持久节点
            zooKeeper.create("/zk-persistent-xpp","0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            Thread.sleep(1000);

            //得到节点
            Stat stat = new Stat();
            byte[] bytes = zooKeeper.getData("/zk-persistent-xpp", null, stat);
            System.out.println(new String(bytes));

            //修改节点
            zooKeeper.setData("/zk-persistent-xpp","1".getBytes(),stat.getVersion());

            byte[] data = zooKeeper.getData("/zk-persistent-xpp", null, stat);
            System.out.println(new String(data));

            //删除节点 使用乐观锁 比较版本号
            zooKeeper.delete("/zk-persistent-xpp",stat.getVersion());

            zooKeeper.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

2.2 事件机制

Watcher 监听机制是 Zookeeper 中非常重要的特性,我们基于 zookeeper 上创建的节点,可以对这些节点绑定监听 事件,比如可以监听节点数据变更、节点删除、子节点状态变更等事件,通过这个事件机制,可以基于 zookeeper 实现分布式锁、集群管理等功能 watcher 特性:当数据发生变化的时候, zookeeper 会产生一个 watcher 事件,并且会发送到客户端。但是客户端 只会收到一次通知。如果后续这个节点再次发生变化,那么之前设置 watcher 的客户端不会再次收到消息。(watcher 是一次性的操作)。 可以通过循环监听去达到永久监听效果

2.3 如何注册事件机制

通过这三个操作来绑定事件 :getData、Exists、getChildren

2.4 如何触发事件?

凡是事务类型的操作,都会触发监听事件。 create /delete /setData

2.5 watcher 事件类型

None (-1), 客户端链接状态发生变化的时候,会收到 none 的事件 NodeCreated (1), 创建节点的事件。 比如 zk-persis-mic NodeDeleted (2), 删除节点的事件 NodeDataChanged (3), 节点数据发生变更 NodeChildrenChanged (4); 子节点被创建、被删除、会发生事件触发

2.6 操作对应的water事件类型

zk-persis-mic ( 监听事件)

zk-persis-mic/child (子节点监听事件)

create(/ zk-persis-mic)

NodeCreated (exists getData)

delete(/ zk-persis-mic)

NodeDeleted (exists getData)

setData(/ zk-persis-mic)

NodeDataChanged (exists getData)

create(/ zk-persis-mic/child)

NodeChildrenChange(getChildren)

NodedCreated

delete(/ zk-persis-mic/child)

NodeChildrenChange(getChildren)

NodedDeleted

setData(/ zk-persis-mic/child)

NodeDataChanged

3 Curator 客户端的使用,简单高效

3.1使用Curator对zookeeper增删改查

代码语言:javascript
复制
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

/**
 * @author: xiepanpan
 * @Date: 2020/5/31
 * @Description:  使用Curator框架来对zookeeper操作
 */
public class CuratorDemo {
    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .builder()
                //衰减的重试机制
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .connectString("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183")
                .sessionTimeoutMs(4000)
                //隔离命名空间  以下所有操作都是基于该相对目录进行的
                .namespace("curator")
                .build();
        curatorFramework.start();

        //创建节点
        //结果 /curator/xpp/mode1
        curatorFramework.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)
                .forPath("/xpp/node1","1".getBytes());

        //更改节点
        //保存节点状态
        Stat stat = new Stat();
        curatorFramework.getData().storingStatIn(stat).forPath("/xpp/node1");
        curatorFramework.setData().withVersion(stat.getVersion()).forPath("/xpp/node1","xx".getBytes());

        //删除节点
        curatorFramework.delete().deletingChildrenIfNeeded().forPath("/xpp/node1");
    }



}

3.2 监听

PathChildrenCache 监听一个节点下的子节点的创建删除和更新

NodeCache监听一个节点的更新和创建事件

TreeCache 综合PathChildrenCache 和NodeCache 的特性

代码语言:javascript
复制
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * @author: xiepanpan
 * @Date: 2020/5/31
 * @Description:  事件监听
 */
public class CuratorWatcherDemo {

    public static void main(String[] args) throws Exception {
        CuratorFramework curatorFramework = CuratorFrameworkFactory
                .builder()
                //衰减的重试机制
                .retryPolicy(new ExponentialBackoffRetry(1000,3))
                .connectString("192.168.217.130:2181,192.168.217.130:2183,192.168.217.130:2183")
                .sessionTimeoutMs(4000)
                //隔离命名空间  以下所有操作都是基于该相对目录进行的
                .namespace("curator")
                .build();
        curatorFramework.start();

//        addListenerWithNodeCache(curatorFramework,"/xpp");
//        addListenerWithPathChildCache(curatorFramework,"/xpp");
        addListenerWithTreeCache(curatorFramework,"/xpp");
        System.in.read();
    }

    /**
     * 监听一个节点的更新和创建事件
     * @param curatorFramework
     * @param path
     * @throws Exception
     */
    public static void addListenerWithNodeCache(CuratorFramework curatorFramework,String path) throws Exception {
        //第三个参数 对详细内容数据的压缩
        final NodeCache nodeCache = new NodeCache(curatorFramework,path,false);
        NodeCacheListener nodeCacheListener = new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                System.out.println("Receive Event:"+nodeCache.getCurrentData().getPath());
            }
        };
        nodeCache.getListenable().addListener(nodeCacheListener);
        nodeCache.start();
    }

    /**
     *  监听一个节点子节点的创建删除和更新
     * @param curatorFramework
     * @param path
     * @throws Exception
     */
    public static void addListenerWithPathChildCache(CuratorFramework curatorFramework,String path) throws Exception {
        PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
        PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                System.out.println("Receive Event:"+pathChildrenCacheEvent.getType());
            }
        };
        pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
        pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
    }

    /**
     * 综合节点监听事件 监听当前节点和子节点  节点上任何一个事件都能收到
     * @param curatorFramework
     * @param path
     * @throws Exception
     */
    public static void addListenerWithTreeCache(CuratorFramework curatorFramework,String path) throws Exception {
        TreeCache treeCache = new TreeCache(curatorFramework,path);
        TreeCacheListener treeCacheListener = new TreeCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                System.out.println(treeCacheEvent.getType()+"->"+treeCacheEvent.getData().getPath());
            }
        };
        treeCache.getListenable().addListener(treeCacheListener);
        treeCache.start();
    }
}

我的pom文件

代码语言:javascript
复制
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
       <dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
           <version>3.5.4-beta</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework -->
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-framework</artifactId>
           <version>2.7.1</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
       <dependency>
           <groupId>org.apache.curator</groupId>
           <artifactId>curator-recipes</artifactId>
           <version>2.7.1</version>
       </dependency>

zookeeper 我是用的3.4.14 版本 注意 Curator 版本 和zookeeper版本有对应关系 我的Curator版本是2.7.1

https://blog.csdn.net/glory1234work2115/article/details/51967507

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-05-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 数据存储
  • 2 基于 Java API 初探 zookeeper 的使用
    • 2.1 zookeeper 增删改查
      • 2.2 事件机制
        • 2.3 如何注册事件机制
          • 2.4 如何触发事件?
            • 2.5 watcher 事件类型
              • 2.6 操作对应的water事件类型
              • 3 Curator 客户端的使用,简单高效
                • 3.1使用Curator对zookeeper增删改查
                  • 3.2 监听
                  相关产品与服务
                  数据保险箱
                  数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档