zookeeper curator使用caches实现各种监听

1、篇首语

curator是zookeeper的一个高级api开发包。封装了zookeeper众多的recipes,并且实现了一些新的recipes原语,最重要的是基于zookeeper提供的各种机制实现了更健壮的连接和异常处理。

本文将其中比较常用的一种recipe,就是cache。

2、各种Caches

cache是一种缓存机制,可以借助cache实现监听。

简单来说,cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。是不是很简单。

curator支持的cache种类有3种Path Cache,Node Cache,Tree Cache

1)Path Cache

Path Cache用来观察ZNode的子节点并缓存状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。

Path Cache是通过PathChildrenCache类来实现的,监听器注册是通过PathChildrenCacheListener。

2)Node Cache

Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。

Node Cache是通过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。

3)Tree Cache

可以看做是上两种的合体,Tree Cache观察的是所有节点的所有数据。

3、下面给出一个例子。

1)这是在springboot中使用curator,先给出curator依赖pom

<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.9.1</version>
        </dependency>

2)三种cache的实现

package com.dqa.prometheus.client.zookeeper;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

public class ZkClient {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private CuratorFramework client;
    private NodeCache nodeCache;
    private PathChildrenCache pathChildrenCache;
    private TreeCache treeCache;
    private String zookeeperServer;
    private int sessionTimeoutMs;
    private int connectionTimeoutMs;
    private int baseSleepTimeMs;
    private int maxRetries;

    public void setZookeeperServer(String zookeeperServer) {
        this.zookeeperServer = zookeeperServer;
    }
    public String getZookeeperServer() {
        return zookeeperServer;
    }
    public void setSessionTimeoutMs(int sessionTimeoutMs) {
        this.sessionTimeoutMs = sessionTimeoutMs;
    }
    public int getSessionTimeoutMs() {
        return sessionTimeoutMs;
    }
    public void setConnectionTimeoutMs(int connectionTimeoutMs) {
        this.connectionTimeoutMs = connectionTimeoutMs;
    }
    public int getConnectionTimeoutMs() {
        return connectionTimeoutMs;
    }
    public void setBaseSleepTimeMs(int baseSleepTimeMs) {
        this.baseSleepTimeMs = baseSleepTimeMs;
    }
    public int getBaseSleepTimeMs() {
        return baseSleepTimeMs;
    }
    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
    }
    public int getMaxRetries() {
        return maxRetries;
    }

    public void init() {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
        client = CuratorFrameworkFactory.builder().connectString(zookeeperServer).retryPolicy(retryPolicy)
                .sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).build();
        client.start();
    }

    public void stop() {
        if (client != null) CloseableUtils.closeQuietly(client);
        if (pathChildrenCache != null) CloseableUtils.closeQuietly(pathChildrenCache);
        if (nodeCache != null) CloseableUtils.closeQuietly(nodeCache);
        if (treeCache != null) CloseableUtils.closeQuietly(treeCache);
    }

    public CuratorFramework getClient() {
        return client;
    }

    /*
    *  设置Path Cache, 监控本节点的子节点被创建,更新或者删除,注意是子节点, 子节点下的子节点不能递归监控
    *  事件类型有3个, 可以根据不同的动作触发不同的动作
    *  本例子只是演示, 所以只是打印了状态改变的信息, 并没有在PathChildrenCacheListener中实现复杂的逻辑
    *  @Param path 监控的节点路径, cacheData 是否缓存data
    *  可重入监听
    * */
    public void setPathCacheListener(String path, boolean cacheData) {
        try {
            pathChildrenCache = new PathChildrenCache(client, path, cacheData);
            PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) {
                    ChildData data = event.getData();
                    switch (event.getType()) {
                        case CHILD_ADDED:
                            logger.info("子节点增加, path={}, data={}", data.getPath(), data.getData());
                            break;
                        case CHILD_UPDATED:
                            logger.info("子节点更新, path={}, data={}", data.getPath(), data.getData());
                            break;
                        case CHILD_REMOVED:
                            logger.info("子节点删除, path={}, data={}", data.getPath(), data.getData());
                            break;
                        default:
                            break;
                    }
                }
            };
            pathChildrenCache.getListenable().addListener(childrenCacheListener);
            pathChildrenCache.start(StartMode.POST_INITIALIZED_EVENT);
        } catch (Exception e) {
            logger.error("PathCache监听失败, path=", path);
        }

    }

    /*
    *  设置Node Cache, 监控本节点的新增,删除,更新
    *  节点的update可以监控到, 如果删除会自动再次创建空节点
    *  本例子只是演示, 所以只是打印了状态改变的信息, 并没有在NodeCacheListener中实现复杂的逻辑
    *  @Param path 监控的节点路径, dataIsCompressed 数据是否压缩
    *  不可重入监听
    * */
    public void setNodeCacheListener(String path, boolean dataIsCompressed) {
        try {
            nodeCache = new NodeCache(client, path, dataIsCompressed);
            NodeCacheListener nodeCacheListener = new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    ChildData childData = nodeCache.getCurrentData();
                    logger.info("ZNode节点状态改变, path={}", childData.getPath());
                    logger.info("ZNode节点状态改变, data={}", childData.getData());
                    logger.info("ZNode节点状态改变, stat={}", childData.getStat());
                }
            };
            nodeCache.getListenable().addListener(nodeCacheListener);
            nodeCache.start();
        } catch (Exception e) {
            logger.error("创建NodeCache监听失败, path={}", path);
        }
    }


    /*
    *  设置Tree Cache, 监控本节点的新增,删除,更新
    *  节点的update可以监控到, 如果删除不会自动再次创建
    *  本例子只是演示, 所以只是打印了状态改变的信息, 并没有在NodeCacheListener中实现复杂的逻辑
    *  @Param path 监控的节点路径, dataIsCompressed 数据是否压缩
    *  可重入监听
    * */
    public void setTreeCacheListener(final String path) {
        try {
            treeCache = new TreeCache(client, path);
            TreeCacheListener treeCacheListener = new TreeCacheListener() {
                @Override
                public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                    ChildData data = event.getData();
                    if(data != null){
                        switch (event.getType()) {
                            case NODE_ADDED:
                                logger.info("[TreeCache]节点增加, path={}, data={}", data.getPath(), data.getData());
                                break;
                            case NODE_UPDATED:
                                logger.info("[TreeCache]节点更新, path={}, data={}", data.getPath(), data.getData());
                                break;
                            case NODE_REMOVED:
                                logger.info("[TreeCache]节点删除, path={}, data={}", data.getPath(), data.getData());
                                break;
                            default:
                                break;
                        }
                    }else{
                        logger.info("[TreeCache]节点数据为空, path={}", data.getPath());
                    }
                }
            };
            treeCache.getListenable().addListener(treeCacheListener);
            treeCache.start();
        } catch (Exception e) {
            logger.error("创建TreeCache监听失败, path={}", path);
        }

    }
}

3)configuration

init方法是初始化zookeeper client的操作

stop是停止zookeeper是的清理动作

package com.dqa.prometheus.configuration;

import com.xiaoju.dqa.prometheus.client.zookeeper.ZkClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZkConfiguration {

    @Value("${zookeeper.server}")
    private String zookeeperServer;
    @Value(("${zookeeper.sessionTimeoutMs}"))
    private int sessionTimeoutMs;
    @Value("${zookeeper.connectionTimeoutMs}")
    private int connectionTimeoutMs;
    @Value("${zookeeper.maxRetries}")
    private int maxRetries;
    @Value("${zookeeper.baseSleepTimeMs}")
    private int baseSleepTimeMs;

    @Bean(initMethod = "init", destroyMethod = "stop")
    public ZkClient zkClient() {
        ZkClient zkClient = new ZkClient();
        zkClient.setZookeeperServer(zookeeperServer);
        zkClient.setSessionTimeoutMs(sessionTimeoutMs);
        zkClient.setConnectionTimeoutMs(connectionTimeoutMs);
        zkClient.setMaxRetries(maxRetries);
        zkClient.setBaseSleepTimeMs(baseSleepTimeMs);
        return zkClient;
    }

}

 3)zk配置文件

其中最重要的应该是会话超时和重试机制了。

============== zookeeper ===================
zookeeper.server=10.93.21.21:2181,10.93.18.34:2181,10.93.18.35:2181
zookeeper.sessionTimeoutMs=6000
zookeeper.connectionTimeoutMs=6000
zookeeper.maxRetries=3
zookeeper.baseSleepTimeMs=1000

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Android先生

这是一份很详细的 Retrofit 2.0 使用教程(含实例讲解) - 简书

步骤1:添加Retrofit库的依赖 步骤2:创建 接收服务器返回数据 的类 步骤3:创建 用于描述网络请求 的接口 步骤4:创建 Retrofit 实例 步骤...

40060
来自专栏爱撒谎的男孩

Spring MVC处理异常

注意:使用SimpleMappingExceptionResolver处理异常时,不可以使用@ExceptionHandler!

36250
来自专栏java思维导图

给你一份SpringBoot知识清单

在过去两三年的Spring生态圈,最让人兴奋的莫过于Spring Boot框架。或许从命名上就能看出这个框架的设计初衷:快速的启动Spring应用。因而Spri...

16340
来自专栏竹清助手

浅谈Linux磁盘修复e2fsck命令

检查 /dev/mapper/VolGroup00-LogVol02 是否有问题,如发现问题便自动修复:

37020
来自专栏Java成神之路

Spring_总结_02_依赖注入

在上一节中,我们了解了Spring的最根本使命、四大原则、六大模块以及Spring的生态。

8740
来自专栏冷冷

Spring 必知概念(二)

13、Spring框架中的单例Beans是线程安全的么? Spring框架并没有对单例bean进行任何多线程的封装处理。关于单例bean的线程安全和并发问题需要...

21790
来自专栏Android群英传

Andromeda:适用于多进程架构的组件通信框架(下)

11920
来自专栏CSDN技术头条

给你一份超详细 Spring Boot 知识清单

在过去两三年的 Spring 生态圈,最让人兴奋的莫过于 Spring Boot 框架。Spring Boot 应用本质上就是一个基于 Spring 框架的应用...

64120
来自专栏向治洪

android 之ndk开发

1、Android NDK简介 NDK全称为native development kit本地语言(C&C++)开发包。而对应的是经常接触的Android-SDK...

24760
来自专栏个人分享

Spark Netty与Jetty (源码阅读十一)

  spark呢,对Netty API又做了一层封装,那么Netty是什么呢~是个鬼。它基于NIO的服务端客户端框架,具体不再说了,下面开始。

18740

扫码关注云+社区

领取腾讯云代金券