前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Zookeeper入门(三)—使用CuratorFramework操作节点并添加监视器

Zookeeper入门(三)—使用CuratorFramework操作节点并添加监视器

作者头像
用户3587585
发布2022-11-22 13:37:43
2.2K0
发布2022-11-22 13:37:43
举报
文章被收录于专栏:阿福谈Web编程阿福谈Web编程

前言

在上一篇文章ZooKeeper入门(二)中笔者讲解了分布式协调中间件ZooKeeper的常用命令并使用Curator客户端实现了一个简单的配置中心功能。本文的目的就是带领读者朋友们一起学习如何在SpringBoot项目中使用Curator客户端对ZooKeeper节点进行简单的增删改查并对节点设置Watcher监视器等实践,让大家掌握使用Curator客户端对ZooKeeper进行基础的操作。

升级Curator版本

因为与我们使用的3.7.1版本的ZooKeeper对应的Curator客户端已升级到5.3.0版本,而且具备了幂等操作API,因此笔者也对Curator的版本由之前的4.0版本升级到了5.3.0版本

代码语言:javascript
复制
  <dependency>
		<groupId>org.apache.curator</groupId>
		<artifactId>curator-framework</artifactId>
		<version>5.3.0</version>
  </dependency>
  <dependency>
		<groupId>org.apache.curator</groupId>
		<artifactId>curator-recipes</artifactId>
		<version>5.3.0</version>
	</dependency>

升级后的TreeCache类已过时, 其官方Java API文档中提示我们已使用CuratorCache类代替了TreeCache

因此,我们需要对之前项目中的ZooKeeperConfig类进行修改,鉴于CuratorFramwork类实例作为客户端工具

在对ZooKeeper节点进行操作时需要经常用到,因此我们把他注册到Spring 的IOC容器使其成为一个bean

  • 首先新建一个ZooKeeperClientConfig类,实例化CuratorFramwork bean
代码语言:javascript
复制
package org.sang.config;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZooKeeperClientConfig {

    /**
     * 多参数构建ZooKeeper客户端连接
     * @return client
     */
    @Bean(name="zookeeperClient")
    public CuratorFramework createWithOptions(){
        // 连接串也可以从配置文件中取
        String connectString = "119.29.117.19:2181,119.29.117.19:2182,119.29.117.19:2183";
        ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString)
                .retryPolicy(backoffRetry)
                .sessionTimeoutMs(30*60*1000)   // 会话超时30分钟
                .connectionTimeoutMs(30*1000)   // 连接超时30s
                .build();
        client.start(); // 初始化后启动
        return client;
    }

}
  1. ZooKeeperConfig类中注入CuratorFramework bean 并使用CuratorCache类替换过时的TreeCache
代码语言:javascript
复制
@Component
public class ZooKeeperConfig {

    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperConfig.class);
    @Resource
    private CuratorFramework zkClient;

    private Properties configProperties = new Properties();


    public String getProperty(String key){
        return configProperties.getProperty(key);
    }

    // 初始化
    @PostConstruct
    public void init() throws Exception {
        List<String> configNames = zkClient.getChildren().forPath("/config");
        for(String key: configNames){
            // 获取每个路径下的值(即配置值)
            byte[] value = zkClient.getData().forPath("/config/"+key);
            configProperties.put(key, new String(value, "UTF-8"));
        }
        //保证实时性,利用zk的watch机制
        CuratorCache curatorCache = CuratorCache.build(zkClient, "/config");
        curatorCache.start();
        // 创建监听器
        curatorCache.listenable().addListener((type, oldData, data) -> {
            // oldData为修改前的数据;data为将要修改的新数据,类型均为ChildData
            switch (type){
                case NODE_CHANGED:
                    // 获取变更节点的路径名
                    String configName = data.getPath().replace("/config/", "");
                    // 监听到zk的zNode发生了数据变更
                    logger.info(configName + "的值发生了更新, 更新后的值为:" + new String(data.getData()));
                    // 获取变更的值
                    String configValue = new String(data.getData());
                    configProperties.put(configName, configValue);
                    break;
                default:
                    break;
            }
        });
    }

}

ZooKeeper节点基础CRUD操作

新建ZooKeeperService服务类,在该类中完成对ZooKeeper节点的操作

代码语言:javascript
复制
package org.sang.service;

import com.alibaba.fastjson.JSON;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;

@Service
public class ZooKeeperService {

    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperService.class);
    
    // 注入ZkClient bean
    @Resource
    private CuratorFramework curatorFramework;

    /**
     * 创建永久节点
     * @param path
     * @param data
     * @throws Exception
     */
    public void createNode(String path, String data) throws Exception{
        curatorFramework.create().forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 创建临时节点
     * @param path
     * @param data
     * @throws Exception
     */
    public void createEphemeralNode(String path, String data) throws Exception {
        curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 创建临时有序节点
     * @param path
     * @param data
     * @throws Exception
     */
    public void crateEphemeralSequentialNode(String path, String data) throws Exception {
       curatorFramework.create()
               .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
               .forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 往节点种设置数据
     * @param path
     * @param data
     * @throws Exception
     */
    public void setData(String path, String data) throws Exception{
         curatorFramework.setData().forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }
    
    /**
     * 异步修改数据
     * @param path
     * @param data
     * @throws Exception
     */
    public void setDataAsync(String path, String data) throws Exception{
        // 添加回调监听器, set数据成功后会对节点进行监听
        CuratorListener listener = (client, event) -> {
            Stat stat = event.getStat();
            logger.info("stat=" + JSON.toJSONString(stat));
            CuratorEventType eventType = event.getType();
            logger.info("eventType="+eventType.name());
        };
        curatorFramework.getCuratorListenable().addListener(listener);
        curatorFramework.setData().inBackground().forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }


    /**
     * 删除节点
     * @param path
     * @throws Exception
     */
    public void deleteData(String path) throws Exception{
        curatorFramework.delete().forPath(path);
    }

    /**
     * 安全删除节点
     * @param path
     * @throws Exception
     */
    public void guaranteedDeleteData(String path) throws Exception {
        curatorFramework.delete().guaranteed().forPath(path);
    }
    
     /**
     * 获取子节点下的全部子节点路径集合
     * @param path 指定节点路径
     * @return List<String> 子节点路径集合
     * @throws Exception
     */
    public List<String> watchedGetChildren(String path) throws Exception {
        List<String> children = curatorFramework.getChildren().watched().forPath(path);
        return children;
    }

  
   /**
     * 获取节点数据
     * @param path 节点路径
     * @param fullClassName 数据转换对象全类名
     * @return Object
     * @throws Exception
     */
    public Object getDataByPath(String path, String fullClassName) throws Exception {
        String jsonStr = new String(curatorFramework.getData().forPath(path), StandardCharsets.UTF_8);
        Class clazz = Class.forName(fullClassName);
        return JSON.parseObject(jsonStr, clazz);
    }

}

  1. 新建ZookeepeController类, 通过接口操作ZookeeperService
代码语言:javascript
复制
package org.sang.controller;

import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.sang.config.ZooKeeperConfig;
import org.sang.pojo.RespBean;
import org.sang.service.ZooKeeperService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/zookeeper")
public class ZooKeeperController {

    @Resource
    private ZooKeeperService zooKeeperService;
    @Resource
    private ZooKeeperConfig zooKeeperConfig;

    private final static Logger logger = LoggerFactory.getLogger(ZooKeeperController.class);
    
    /**
    * 获取配置变量接口
    */
    @GetMapping("/getConfigValueByKey")
    public RespBean<String> getConfigValueByKey(@RequestParam("configKey") String configKey){
        logger.info("configKey={}", configKey);
        String configValue = zooKeeperConfig.getProperty(configKey);
        RespBean<String> respBean = RespBean.success(configValue);
        return respBean;
    }
    
    /**
    * 创建持久节点接口
    */
    @PostMapping(value = "/create/persistent")
    public RespBean<String> createPersistentNode(@RequestBody Map<String, Object> postData) {
        RespBean respBean;
        try {
            checkPostData(postData);
            String path = (String) postData.get("path");
            Object data = postData.get("data");
            zooKeeperService.createNode(path, JSON.toJSONString(data));
            respBean = RespBean.success("create node " + path + " success");
        } catch (Exception e) {
            respBean = RespBean.error("create node failed");
            logger.error("create node error", e);
        }
        return respBean;
    }
    
    /**
    * 创建临时节点接口
    */
    @RequestMapping("/create/ephemeral")
    public  RespBean<String> createTempNode(@RequestBody Map<String, Object> postData) {
        RespBean<String> respBean;
        try {
                checkPostData(postData);
                String path = (String) postData.get("path");
                Object data = postData.get("data");
                zooKeeperService.createEphemeralNode(path, JSON.toJSONString(data));
                respBean = RespBean.success("create ephemeral node " + path + " success");
        } catch (Exception e) {
                respBean = RespBean.error("create ephemeral node failed");
                logger.error("create ephemeral node error", e);
        }
        return respBean;
    }
    
    /**
    * 创建临时有序节点接口
    */
    @PostMapping("/ephemeral/sequence")
    public RespBean<String> createEphemeralSequenceNode(@RequestBody Map<String, Object> postData){
        RespBean respBean;
        try{
            checkPostData(postData);
            String path = (String) postData.get("path");
            Object data = postData.get("data");
            zooKeeperService.crateEphemeralSequentialNode(path, JSON.toJSONString(data));
            respBean = RespBean.success("create ephemeral sequence node " + postData.get("path") + " success");
        } catch (Exception e) {
            respBean = RespBean.error("create ephemeral node failed");
            logger.error("create ephemeral sequence node error", e);
        }
        return respBean;
    }
    
    /**
    * 根据节点路径获取节点中的数据接口
    */
    @PostMapping("getDataByPath")
    public RespBean<Object> getDataByPath(@RequestBody Map<String, String> paramMap){
        RespBean respBean;
        try {
            String path = paramMap.get("path");
            String fullClassName = paramMap.get("fullClassName");
            Object data = zooKeeperService.getDataByPath(path, fullClassName);
            respBean = RespBean.success(data);
        } catch (Exception e) {
            respBean = RespBean.error("get data failed caused by " + e.getMessage());
            logger.error("get data error", e);
        }
        return respBean;
    }
    
    /**
    * 同步修改节点数据接口
    */
    @PostMapping("/setData/sync")
    public RespBean<String> setData(@RequestBody Map<String, Object> paramMap){
        checkPostData(paramMap);
        RespBean<String> respBean;
        try {
            zooKeeperService.setData((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
            respBean = RespBean.success("set data success");
        } catch (Exception e) {
            logger.error("set data failed", e);
            respBean = RespBean.error("set data failed, caused by " + e.getMessage());
        }
        return respBean;
    }

    @PostMapping("/setData/async")
    public RespBean<String> asyncSetData(@RequestBody Map<String, Object> paramMap){
        checkPostData(paramMap);
        RespBean<String> respBean;
        try {
            zooKeeperService.setDataAsync((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
            respBean = RespBean.success("async set data success");
        } catch (Exception e) {
            logger.error("async set data failed", e);
            respBean = RespBean.error("async set data failed, caused by " + e.getMessage());
        }
        return respBean;
    }

    /**
    * 获取被监听的子节点路径集合接口
    */
    @GetMapping("/getWatchedChildren")
    public RespBean<List<String>> getWatchedChildren(@RequestParam("path") String path){
        RespBean<List<String>> respBean;
        try {
            List<String> watchedChildren = zooKeeperService.watchedGetChildren(path);
            respBean = RespBean.success(watchedChildren);
        } catch (Exception e) {
            logger.error("getWatchedChildren error", e);
            respBean = RespBean.error("getWatchedChildren failed, caused by " + e.getMessage());
        }
        return respBean;
    }
    
    /**
    * 删除节点接口
    */
    @DeleteMapping("/deleteByPath")
    public RespBean<String> deleteDataByPath(@RequestParam("path") String path){
        logger.info("delete ZNode " + path);
        RespBean<String> respBean;
        try {
            zooKeeperService.deleteData(path);
            respBean = RespBean.success("delete ZNode success");
        } catch (Exception e) {
            logger.error("delete ZNode of " + path + "failed", e);
            respBean = RespBean.error("delete ZNode failed, caused by " + e.getMessage());
        }
        return respBean;
    }
    
    /**
    * 安全删除节点接口
    */
    @DeleteMapping("/guaranteedDeleteData")
    public RespBean<String> guaranteedDeleteData(@RequestParam("path") String path){
        logger.info("guaranteed delete ZNode " + path);
        RespBean<String> respBean;
        try {
            zooKeeperService.guaranteedDeleteData(path);
            respBean = RespBean.success("guaranteed delete data success");
        } catch (Exception e) {
            logger.error("guaranteed delete data failed", e);
            respBean = RespBean.error("guaranteed delete data failed, caused by " + e.getMessage());
        }
        return respBean;
    }

    /**
    * 校验POST请求入参数据
    */
    private void checkPostData(Map<String, Object> postData){
        String path = (String) postData.get("path");
        if(StringUtils.isEmpty(path)){
            throw new IllegalArgumentException("path cannot be null");
        }
        Object data = postData.get("data");
        if(data==null || "".equals(data)){
            throw new IllegalArgumentException("data cannot be null");
        }
    }

}

测试CRUD基础操作

首先我们在项目的SpringSecurity配置文件WebSecurityConfig.java中对操作zookeeper的接口放开认证要求

SpringSecurity#configure(HttpSecurity http)方法

代码语言:javascript
复制
 http.authorizeRequests()
                .antMatchers("/user/reg").anonymous()
                .antMatchers("/zookeeper/**").anonymous()

参考笔者之前发布的文章Zookeeepr入门(一)启动ZooKeeper集群服务,然后启动本地的Redis和MySql服务后

再在IDEA中启动blogserver服务,服务启动成功之后就可以通过在postman中调用接口进行验证了

创建持久节点

在postman中调用创建持久节点接口

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/create/persistent
{
    "path": "/test",
	"data": {
		"serviceName": "zooKeeperService",
		"serverAddress": "192.110.119.201:2181,192.110.119.202:2181,192.110.119.203:2181",
        "contextPath": "/zooKeeper",
        "apiList": []
	}
}

接口返回信息:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": "create node /test success"
}

然后更换入参调用同一个接口创建子节点/test/oderService

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/create/persistent
{
    "path": "/test/oderService",
		"data": {
		"serviceName": "orderService",
		"serverAddress": "192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080",
        "contextPath": "/orderService",
            "apiList": [
                {
                    "apiId": 1,
                    "apiPath": "/order/create",
                    "apiName": "创建订单接口API",
                    "requestType": "POST",
                    "argumentTypes": "java.lang.String, java.lang.Double",
                     "returnType": "org.sang.pojo.RespBean"
                },
                {
                    "apiId": 2,
                    "apiPath": "/order/get",
                    "apiName": "查询订单接口API",
                    "requestType": "GET",
                    "argumentTypes": "java.lang.Long",
                     "returnType": "org.sang.pojo.RespBean"
                }
            ]
	}
}

注意:为了创建路径为/test/orderService的持久节点成功,必须先创建/test节点,否则在没有父节点的情况下直接创建/test`的子节点zookeeper客户端会会报错

此时我们在Linux客户端连接ZooKeeper服务, 通过ls /test命令可查看到/test路径下的所有子节点

代码语言:javascript
复制
[zk: localhost:2181(CONNECTED) 19] ls /test
[orderService]

然后再通过get /test/orderService命令可以直接查看该节点下的数据

代码语言:javascript
复制
[zk: localhost:2181(CONNECTED) 21] get /test/orderService
{"serviceName":"orderService","serverAddress":"192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080","contextPath":"/orderService","apiList":[{"apiId":1,"apiPath":"/order/create","apiName":"创建订单接口API","requestType":"POST","argumentTypes":"java.lang.String, java.lang.Double","returnType":"org.sang.pojo.RespBean"},{"apiId":2,"apiPath":"/order/get","apiName":"查询订单接口API","requestType":"GET","argumentTypes":"java.lang.Long","returnType":"org.sang.pojo.RespBean"}]}
  1. 查看节点数据

为了将/test节点及其子节点中存放的数据在取数据时能反序列化为一个对象,我们新建了一两个实体类

ServiceInfo.javaApiInfo.java

代码语言:javascript
复制
public class ServiceInfo implements Serializable {
    // 服务名称
    private String serviceName;
    // 服务地址,IP+端口号,多个使用逗号分隔
    private String serverAddress;
    // 上下文
    private String contextPath;
    // 服务中的api列表
    private List<ApiInfo> apiList;
    // 省略setter和getter方法
}
代码语言:javascript
复制
public class ApiInfo implements Serializable {
    // APIID
    private Long apiId;
    // API 路径
    private String apiPath;
    // API名称
    private String apiName;
    // 请求类型:GET|POST|PUT|DELETE
    private String requestType;
    // 参数类型,参数全类名, 多个以逗号分隔
    private String argumentTypes;
    // 返回值类型
    private String returnType;
     // 省略setter和getter方法
}

在postman中调用查询节点数据接口

代码语言:javascript
复制
  POST http://localhost:8081/blog/zookeeper/getDataByPath
  {
  	"path": "/test",
  	"fullClassName": "org.sang.pojo.ServiceInfo"
  }
  // 第一参数为节点路径,第二个参数为实体类全类名

接口返回信息:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": {
        "serviceName": "zooKeeperService",
        "serverAddress": "192.110.119.201:2181,192.110.119.202:2181,192.110.119.203:2181",
        "contextPath": "/zooKeeper",
        "apiList": []
    }
}

再次调用相同接口获取/test/orderService节点中的数据

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/getDataByPath
{
	"path": "/test/orderService",
	"fullClassName": "org.sang.pojo.ServiceInfo"
}

接口返回结果:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": {
        "serviceName": "orderService",
        "serverAddress": "192.110.119.201:8080,192.110.119.202:8080,192.110.119.203:8080",
        "contextPath": "/orderService",
        "apiList": [
            {
                "apiId": 1,
                "apiPath": "/order/create",
                "apiName": "创建订单接口API",
                "requestType": "POST",
                "argumentTypes": "java.lang.String, java.lang.Double",
                "returnType": "org.sang.pojo.RespBean"
            },
            {
                "apiId": 2,
                "apiPath": "/order/get",
                "apiName": "查询订单接口API",
                "requestType": "GET",
                "argumentTypes": "java.lang.Long",
                "returnType": "org.sang.pojo.RespBean"
            }
        ]
    }
}

创建临时节点

调用创建临时节点接口

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/create/ephemeral
{
    "path": "/test/ephemralOne",
	"data": "ephemeral node1"
}

接口返回信息:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": "create ephemeral node /test/ephemralOne success"
}

然后在连接ZooKeeper服务的Linux客户端中执行命令ls /test 查看临时节点

代码语言:javascript
复制
[zk: localhost:2181(CONNECTED) 22] ls /test
[ephemralOne, orderService]

我们发现/test节点下多了一个子节点ephemralOne

然后重启blogserver服务后我们再次执行ls /test命令会发现ephemralOne子节点消失不见了,证明了它是 一个临时节点,在会话关闭后就会消失。

创建临时有序节点

调用创建临时有序节点接口

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/ephemeral/sequence
{
	"path": "/test/ephemeralSequence",
	"data": "ephemeralSequence"
}

连续调用以上接口三次

每次调用都会返回以下响应信息:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": "create ephemeral sequence node /test/ephemeralSequence success"
}

再次执行ls /test命令查看/test节点下的子节点

代码语言:javascript
复制
[zk: localhost:2181(CONNECTED) 23] ls /test
[ephemeralSequence0000000004, ephemeralSequence0000000005, ephemeralSequence0000000006, orderService]

可以看到创建的临时有序节点在指定的路径名ephemeralSequence后面都带有一个十位长度的数字字符串

在获取节点路径时需要将后面的数字字符也带上

代码语言:javascript
复制
  [zk: localhost:2181(CONNECTED) 24] get /test/ephemeralSequence0000000004
  "ephemeralSequence"

重置节点数据

调用同步设置数据接口

代码语言:javascript
复制
POST 
{
	"path": "/test",
	"data": {
		"serverAddress": "localhost:2181,localhost:2182,localhost:2183",
		"contextPath": "/cloudService",
		"serviceName": "registerService",
		"apiList":[]
	}
}

返回结果:

代码语言:javascript
复制
  {
      "status": 200,
      "msg": "success",
      "data": "set data success"
  }

然后通过客户端zkCli命令可以查看到test节点数据发送了变化

代码语言:javascript
复制
[zk: localhost:2181(CONNECTED) 0] get /test
{"serverAddress":"localhost:2181,localhost:2182,localhost:2183","contextPath":"/cloudService","serviceName":"registerService","apiList":[]}

删除节点

调用删除节点接口

代码语言:javascript
复制
DELETE http://localhost:8081/blog/zookeeper/deleteByPath?path=/test/ephemralOne

返回结果

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": "delete ZNode success"
}

注意DeleteBuilder#forPath方法只能删除没有子节点的节点,不能用来删除有子节点的节点

另一种删除方式curatorFramework.delete().guaranteed().forPath(path) 也只能删除子节点,这种方式是显示地表示可删除节点位子节点

ZooKeeper中的回调、监听器和Watcher

ZooKeeper中间件之所以能作为一个分布式协调器的一个重要原因就在于它的Watch机制, 当节点创建、修改、删除以及重连和连接失效时都能通过watch机制得到通知

通过回调监听

这种方式是通过设置BackgroundCallback回调函数监听节点

BackgroundCallback是一个接口

代码语言:javascript
复制
public interface BackgroundCallback {
    void processResult(CuratorFramework zkClient, CuratorEvent event) throws Exception;
}

构造一个BackgroundCallback实例需要实现processResult抽象方法

第一个参数是CuratorFramework类型的zkClient, 通过这个客户端可以在回调种继续操作ZNode节点, 添加监视器等

第二个参数是CuratorEvent类型的事件对象, CuratorEvent也是一个接口,它的实现类是CuratorEventImpl

代码语言:javascript
复制
public interface CuratorEvent {
    CuratorEventType getType();

    int getResultCode();

    String getPath();

    Object getContext();

    Stat getStat();

    byte[] getData();

    String getName();

    List<String> getChildren();

    List<ACL> getACLList();

    List<CuratorTransactionResult> getOpResults();

    WatchedEvent getWatchedEvent();
}

通过CuratorEvent#getType方法可以获得事件类型CuratorEventType,是一个枚举类型可以看到有下面这些事件类型

代码语言:javascript
复制
public enum CuratorEventType {
    CREATE,  // 创建节点
    DELETE,  // 删除节点
    EXISTS,  // 存在节点
    GET_DATA,  // 获取节点数据
    SET_DATA,  // 重置节点数据
    CHILDREN,   // 子节点
    SYNC,   // 同步
    GET_ACL,  // 获取权限
    SET_ACL,  // 设置权限
    TRANSACTION,  // 事务
    GET_CONFIG, // 获取zookeeper配置信息
    RECONFIG,   // 重新配置zookeeper
    WATCHED,  // 监听
    REMOVE_WATCHES,  // 移除监听
    CLOSING,  会话关闭
    ADD_WATCH;  // 添加watch
    private CuratorEventType() {
    }
}

通过添加监听器监听

监听器为CuratorListener, 它是一个接口,有一个抽象方法

代码语言:javascript
复制
 public interface CuratorListener {
      void eventReceived(CuratorFramework zkClient, CuratorEvent event) throws Exception;
   }

可以看到CuratorListenerBackgroundCallback两个接口具有相同类型的入参,可以说二者实现监视节点的效果和底层原理都是一样的

给节点添加Watcher

watcher是一个接口

代码语言:javascript
复制
@Public
public interface Watcher {
    void process(WatchedEvent watchedEvent);
}

要构造一个Watcher实例需要实现process抽象方法,只有一个WatchedEvent类型的构造参数

代码语言:javascript
复制
@Public
public class WatchedEvent {
    private final KeeperState keeperState;
    private final EventType eventType;
    private String path;
    // 三个参数的构造函数
    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
        this.keeperState = keeperState;
        this.eventType = eventType;
        this.path = path;
    }
    // 一个参数的构造方法
    public WatchedEvent(WatcherEvent eventMessage) {
        this.keeperState = KeeperState.fromInt(eventMessage.getState());
        this.eventType = EventType.fromInt(eventMessage.getType());
        this.path = eventMessage.getPath();
    }
    
    public KeeperState getState() {
        return this.keeperState;
    }

    public EventType getType() {
        return this.eventType;
    }

    public String getPath() {
        return this.path;
    }

    public String toString() {
        return "WatchedEvent state:" + this.keeperState + " type:" + this.eventType + " path:" + this.path;
    }

    public WatcherEvent getWrapper() {
        return new WatcherEvent(this.eventType.getIntValue(), this.keeperState.getIntValue(), this.path);
    }
}

通过WatchedEvent#type方法可以获得事件类型参数EventType对象

EventType也是一个枚举类, 囊括了以下几种ZNode事件

代码语言:javascript
复制
None(-1),  // 无事件
      NodeCreated(1),  // 创建节点
      NodeDeleted(2),  // 节点被删除
      NodeDataChanged(3),  // 节点发送改变
      NodeChildrenChanged(4), // 子节点发生改变
      DataWatchRemoved(5),  // 数据监视器被删除
      ChildWatchRemoved(6), // 子节点监视器被删除
      PersistentWatchRemoved(7);  // 持久化监视器被删除

Watcher内部具有EventTypeKeeperState两个枚举类

BackgroundCallback与CuratorListener的用法

ZooKeeeprService类中定义一个全局变量callback, 这个回调可以在异步创建节点、异步修改节点数据以及异步删除节点时对节点进行监听处理

代码语言:javascript
复制
 // 定义回调函数,对节点进行监听
    private BackgroundCallback callback = ((zkClient, curatorEvent) -> {
        logger.info("event data="+ ((curatorEvent.getData()==null)?"null data": new String(curatorEvent.getData())));
        switch (curatorEvent.getType()){
            case SET_DATA:
                // 只会触发SET_DATA事件
                logger.info("node data changed");
                // 回调做其他事情
                break;
            case CREATE:
                logger.info("node created");
                break;
            case CHILDREN:
                logger.info("children");
                break;
            case DELETE:
                logger.info("node deleted");
                break;
            default:
                logger.info("eventType="+curatorEvent.getName());
                break;
        }
    });
// 定义监听器,对节点进行监听
    private CuratorListener listener = (client, event) -> {
        Stat stat = event.getStat();
        logger.info("stat=" + JSON.toJSONString(stat));
        CuratorEventType eventType = event.getType();
        logger.info("eventType="+eventType.name());
    };

这里我们只在回调里打印日志

然后在异步创建节点、异步修改节点数据及异步删除节点方法中使用

代码语言:javascript
复制
 /**
     * 异步创建节点
     * @param path
     * @param data
     * @throws Exception
     */
    public void asyncCreateNode(String path, String data) throws Exception {
        curatorFramework.create().inBackground(callback).forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }

/**
     * 异步修改节点数据
     * @param path
     * @param data
     * @throws Exception
     */
    public void setDataAsyncWithCallback(String path, String data) throws Exception {
        curatorFramework.setData().inBackground(callback).forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }

    /**
     * 使用监听器异步修改数据
     * @param path
     * @param data
     * @throws Exception
     */
    public void setDataAsyncWithListener(String path, String data) throws Exception {
        // 通过监听器修节点改数据
        curatorFramework.getCuratorListenable().addListener(listener);
        curatorFramework.setData().inBackground().forPath(path, data.getBytes(StandardCharsets.UTF_8));
    }
/**
     * 异步删除数据
     * @param path
     * @throws Exception
     */
    public void asyncDeleteData(String path) throws Exception {
        curatorFramework.delete().inBackground(callback).forPath(path);
    }

然后在ZooKeeperController类中加上对应的控制器方法

代码语言:javascript
复制
@PostMapping("/create/async")
    public RespBean<String> asyncCreateNode(@RequestBody Map<String, Object> postData){
        RespBean respBean;
        try {
            checkPostData(postData);
            String path = (String) postData.get("path");
            Object data = postData.get("data");
            zooKeeperService.asyncCreateNode(path, JSON.toJSONString(data));
            respBean = RespBean.success(" async create node " + path + " success");
        } catch (Exception e){
            respBean = RespBean.error("async create node failed");
            logger.error("async create node error", e);
        }
        return respBean;
    }

     @PostMapping("/setData/async/callback")
    public RespBean<String> setDataAsyncWithCallback(@RequestBody Map<String, Object> paramMap){
        checkPostData(paramMap);
        RespBean<String> respBean;
        try {
            zooKeeperService.setDataAsyncWithCallback((String) paramMap.get("path"), JSON.toJSONString(paramMap.get("data")));
            respBean = RespBean.success("async set data with callback success");
        } catch (Exception e) {
            logger.error("set data with callback failed", e);
            respBean = RespBean.error("async set data with callback failed, caused by " + e.getMessage());
        }
        return respBean;
    }

@DeleteMapping("/deleteByPath/async")
    public RespBean<String> asyncDeleteByPath(@RequestParam("path") String path){
        logger.info(" async delete ZNode " + path);
        RespBean<String> respBean;
        try {
            zooKeeperService.asyncDeleteData(path);
            respBean = RespBean.success("delete ZNode success");
        } catch (Exception e) {
            logger.error("async delete ZNode of " + path + "failed", e);
            respBean = RespBean.error("async delete ZNode failed, caused by " + e.getMessage());
        }
        return respBean;
    }

然后重启服务开始测试效果

测试异步添加节点

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/create/async
{
	"path": "/test/person",
	"data": "personInfo collection"
}

返回信息:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": " async create node /test/person success"
}

可以看到控制台中打印出了节点创建的信息,这是我们在回调函数中根据事件类型判断打印的日志

代码语言:javascript
复制
2022-08-21 13:44:02.629  INFO 20120 --- [ain-EventThread] org.sang.service.ZooKeeperService        : event data=null data
2022-08-21 13:44:02.630  INFO 20120 --- [ain-EventThread] org.sang.service.ZooKeeperService        : node created

测试使用回调异步修改节点数据接口

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/setData/async/callback
{
	"path": "/test/person",
	"data": "person infomation collection"
}

返回结果:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": "async set data with callback success"
}

控制台打印如下日志:

代码语言:javascript
复制
2022-08-21 22:55:02.011  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : event data=null data
2022-08-21 22:55:02.012  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : node data changed

测试使用监听器异步修改节点数据

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/setData/async/listener
{
	"path": "/test/person/2",
	"data": {
		"name": "李四",
		"age": 24,
		"height": 173.2,
		"salary": 15800.85
	}
}

返回信息:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": "async set data with callback success"
}

控制台打印日志:

代码语言:javascript
复制
2022-08-21 23:03:54.778  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : stat=null
2022-08-21 23:03:54.779  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : eventType=SET_DATA

测试异步删除节点

代码语言:javascript
复制
DELETE http://localhost:8081/blog/zookeeper/deleteByPath/async?path=/test/person/2

返回结果:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": "delete ZNode success"
}

控制台打印出日志:

代码语言:javascript
复制
2022-08-21 23:09:06.306  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : event data=null data2022-08-21 23:09:06.306  INFO 13724 --- [ain-EventThread] org.sang.service.ZooKeeperService        : node deleted

Watcher的用法

ZooKeeperService类里定义个全局的CuratorWatcher, 并定义一个为节点添加这个watcher的方法

代码语言:javascript
复制
private CuratorWatcher watcher = watchedEvent -> {
        String eventName = watchedEvent.getType().name();
        // 监听的节点路径
        String watchedPath = watchedEvent.getPath();
        logger.info("watchedPath={}", watchedPath);
        switch (eventName){
            case "NodeCreated":
                logger.info("node created, add Lock success");
                break;
            case "NodeDeleted":
                logger.info("node deleted, release lock success");
                break;
            case "NodeDataChanged":
                logger.info("node data changed");
                break;
            case "NodeChildrenChanged":
                logger.info("node children changed");
                break;
            case "DataWatchRemoved":
                logger.info("data watcher removed");
                break;
            case "ChildWatchRemoved":
                logger.info("child watcher removed");
                break;
            case "PersistentWatchRemoved":
                logger.info("persistent watcher removed");
   
                
     /**
     * 给节点添加watcher
     * @param path
     * @throws Exception
     */
    public void addWatchByPath(String path) throws Exception {
        curatorFramework.getData().usingWatcher(watcher).forPath(path);
    }break;
            default:
                logger.info("none event");
                break;
        }
    };

/**
     * 给节点添加watcher
     * @param path
     * @throws Exception
     */
    public void addWatchByPath(String path) throws Exception {
        curatorFramework.getData().usingWatcher(watcher).forPath(path);
    }

然后再在ZooKeeperController类中添加对应的控制器方法

代码语言:javascript
复制
@PostMapping("/addWatcherByPath")
public RespBean<String> addWatcherByPath(@RequestParam("path") String path){
        RespBean<String> respBean;
        try {
            zooKeeperService.addWatchByPath(path);
            respBean = RespBean.success("add watcher success");
        } catch (Exception e) {
            logger.info("add watcher failed", e);
            respBean = RespBean.error("add watcher failed, caused by " + e.getMessage());
        }
        return respBean;
 }

重启应用后对/test/person/1这个节点进行监听

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/addWatcherByPath?path=/test/person/1

返回信息:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": "add watcher success"
}

然后调用同步setData接口对这个节点数据进行修改

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/setData/sync
{
	"path": "/test/person/1",
	"data": {
		"name": "王五",
		"age": 32,
		"height": 173.6,
		"salary": 16800.58
	}
}

返回信息:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": "set data success"
}

控制台打印出日志:

代码语言:javascript
复制
2022-08-21 23:29:19.349  INFO 3732 --- [ain-EventThread] org.sang.service.ZooKeeperService        : watchedPath=/test/person/1
2022-08-21 23:29:19.349  INFO 3732 --- [ain-EventThread] org.sang.service.ZooKeeperService        : node data changed

注意: Watcher还会监听一次,后面继续对节点进行操作就不会进入CuratorWatcher#process方法,如果需要继续监视节点的变化,则需要重新对节点添加Watcher

我们来测试一下效果,继续修改/test/person/1节点

代码语言:javascript
复制
POST http://localhost:8081/blog/zookeeper/setData/sync
{
	"path": "/test/person/1",
	"data": {
		"name": "赵六",
		"age": 33,
		"height": 174.6,
		"salary": 18800.58
	}
}

返回信息:

代码语言:javascript
复制
{
    "status": 200,
    "msg": "success",
    "data": "set data success"
}

但是控制台却看不到我们定义的watcherprocess方法中打印的日志,说明被调用了一次的watcher已经失效

不过要给节点添加持久类型的Watcher可通过下面这种链式调用方式实现

代码语言:javascript
复制
CuratorFramework.watchers()
    .add()
    .withMode(AddWatchMode.PERSISTENT)
    .usingWatcher(watcher)
    .forPath(path)

小结

本文主要详细讲解了使用CuratorFramework客户端在SpringBoot项目中对ZooKeeper节点实现增删改查以及对ZooKeeper节点添加BackgroundCallback回调、CuratorListener监听器和Watcher监视器等操作,既能实现对节点的异步操作,也能监听节点的变化。从而让我们并根据ZooKeeper节点事件类型作出响应的业务逻辑处理.

关于使用CuratorFramework客户端以非事务的方式操作ZooKeeper节点就介绍到这里,想要更深入的学习CuratorFramework的用法可通过阅读该类及其方法相关类的源码进一步掌握。下一篇文章,笔者将继续介绍

使用CuratorFramework客户端在一个事务中完成多个操作,并介绍使用ZooKeeper实现分布式事务锁。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-08-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 阿福谈Web编程 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 升级Curator版本
  • ZooKeeper节点基础CRUD操作
  • 测试CRUD基础操作
    • 创建持久节点
      • 创建临时节点
        • 创建临时有序节点
          • 重置节点数据
            • 删除节点
            • ZooKeeper中的回调、监听器和Watcher
              • 通过回调监听
                • 通过添加监听器监听
                  • 给节点添加Watcher
                    • BackgroundCallback与CuratorListener的用法
                      • 测试异步添加节点
                        • 测试使用回调异步修改节点数据接口
                          • 测试使用监听器异步修改节点数据
                            • 测试异步删除节点
                            • Watcher的用法
                            • 小结
                            相关产品与服务
                            消息队列 TDMQ
                            消息队列 TDMQ (Tencent Distributed Message Queue)是腾讯基于 Apache Pulsar 自研的一个云原生消息中间件系列,其中包含兼容Pulsar、RabbitMQ、RocketMQ 等协议的消息队列子产品,得益于其底层计算与存储分离的架构,TDMQ 具备良好的弹性伸缩以及故障恢复能力。
                            领券
                            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档