前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >你还在使用复杂的 zkclient 开发 zookeeper 么?是时候用 Curator 了 !

你还在使用复杂的 zkclient 开发 zookeeper 么?是时候用 Curator 了 !

作者头像
架构师修炼
发布2020-09-22 10:34:04
6570
发布2020-09-22 10:34:04
举报
文章被收录于专栏:架构师修炼
什么是 Curator

Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册watcher和NodeExistsException 异常等,对于我们日常 ZooKeeper 服务开发进行了详细的封装,例如 Leader 选举、分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。

下面我们常用的绘画创建以及节点的管理,一起来看看如何去使用 Curator 去代替原生的 ZooKeeper 开发。

引入依赖

Curator 主要又两个关键包,curator-framework 包和 curator-recipes 包。

  • curator-framework,该包是对 ZooKeeper 底层 API 的一些封装,基础功能 API 均在这个包下;
  • curator-recipes,该包封装了一些 ZooKeeper 服务的高级特性,如:Cache 事件监听、选举、分布式锁、分布式 Barrier。
代码语言:javascript
复制
<dependency>

通过上面 Maven 引入依赖之后,我们就具备了使用 Curator 去操作 zookeeper 的能力。Curator 框架提供了的 API 是相当于流式的编码风格,主要是按照逻辑的先后顺序,采用调用的方式,在代码方式以及逻辑上更清晰一些。

下面,我们先来感受下这种编码风格,例如,我们要在 zookeeper 服务中创建一个 “/test/path” 的节点,然后,节点内容为 testData ,使用 Curator 框架编码如下:

client.create().forPath(“/test/path”, testData)

创建会话

通过上面,我们已经引入了 Curator 框架,并且已经知道了其编码风格,接下来,我们来看看该如何使用 Curator 去创建一个会话。

在这之前,我们先来复习下,使用zookeeper 原生客户端如何去创建会话:

代码语言:javascript
复制
try {

使用 Curator 来创建会话:

代码语言:javascript
复制
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);

在定义 CuratorFramework 对象实例的时候,我们使用了 CuratorFrameworkFactory 工厂方法,下面我们来看下其关键信息:

  • connectString,zookeeper 服务地址列表,如果是多个地址则用逗号分隔,如:192.168.1.1:2181,192.168.1.2:2181 ;
  • retryPolicy,重试策略,当客户端发生异常退出或者与服务端失去连接的时候,可以通过设置客户端重新连接 zookeeper 服务端;
  • sessionTimeoutMs,会话超时时间,作用在服务端,用来设置该条会话在 zookeeper 服务端的失效时间;
  • connectionTimeoutMs,客户端连接超时时间,作用在客户端,用来限制客户端发起一个会话连接到接收 zookeeper服务端应答的时间。

现在,我们已经完成了客户端与服务端会话的建立,即证明两端具备了通信的能力下面,我们就再来看看使用 Curator 框架该如何去创建、删除以及更新节点等。

创建节点

我们知道在创建节点的时候,是需要描叙该节点是临时节点、持久节点等节点相关数据信息,使用 Curator 创建节点代码如下:

代码语言:javascript
复制
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes());
  • 通过 create 函数创建数据节点
  • 通过 withMode 函数指定节点类型,是临时节点还是持久节点
  • 通过 forPath 函数指定节点路径以及所存放的数据

更新节点

接下来,我们再来看看节点更新代码该如何去编写:

代码语言:javascript
复制
client.setData().forPath("path","data_update".getBytes());
  • 更新节点是使用 setData 方法
  • 然后通过 forPath 函数指定所需要更新的路径以及要更新的数据信息

删除节点

上面我们已经知道了如何去创建会话、创建节点以及更新节点,下面,我们再来看看使用 Curator 如何去删除节点,代码如下:

代码语言:javascript
复制
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");
  • 删除节点是通过 delete 函数来操作的;
  • guaranteed,主要是为了保障成功删除的,只要可河段会话有效,就会在后台持续发起删除请求,知道节点被成功删除;
  • deletingChildrenIfNeeded,递归删除节点以及子节点;
  • withVersion,指定删除节点的版本;
  • forPath,指定删除节点的位置

高级特性

我们在开发 zookeeper 服务的时候,经常会遇到这种情况,如果我们注册的节点异常断开或者是遇到其他网络问题导致的连接不可用,那这个时候我们怎么能立即感应呢?

在 Curator 中是通过 ConnectionStateListener 这个监听器去实现的,它主要是用来监控会话的连接状态,当状态发生改变的时候, zookeeper 服务就会启用不同的处理方式,其会话一共有六种基本状态:

  • CONNECTED,已连接,当客户端发起的会话成功连接到服务端后,该条会话的状态变为 CONNECTED 已连接状态;
  • SUSPENDED,会话连接挂起,当进行 Leader 选举和 lock 锁等操作时,需要先挂起客户端的连接。注意这里的会话挂起并不等于关闭会话,也不会触发诸如删除临时节点等操作;
  • RECONNECTED,重连,当已经与服务端成功连接的客户端断开后,尝试再次连接服务端后,该条会话的状态为 RECONNECTED,也就是重新连接;
  • LOST,会话丢失,客户端与服务器端因为异常或超时,导致会话关闭时,该条会话的状态就变为 LOST;
  • READONLY,只读,一个客户端会话调用 CuratorFrameworkFactory.Builder.canBeReadOnly() 的时候,该会话会一直处于只读模式,直到重新设置该条会话的状态类型。

接下来,我们来看看代码如何落地:

代码语言:javascript
复制
public class zkConnectionStateListener implements ConnectionStateListener {

        private String zkRegPathPrefix;
        private String regContent;

        public zkConnectionStateListener(String zkRegPathPrefix, String regContent) {
            this.zkRegPathPrefix = zkRegPathPrefix;
            this.regContent = regContent;
        }

        @Override
        public void stateChanged(CuratorFramework client, ConnectionState newState) {
            if (newState == ConnectionState.LOST) {
                logger.info("zk client: {} disConnected...",regContent);
                while (true) {
                    try {
                        if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
                            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                                    .forPath(zkRegPathPrefix, regContent.getBytes("UTF-8"));
                            break;
                        }
                    } catch (InterruptedException e) {
                        logger.error(">> [zkConnectionStateListener]-stateChanged InterruptedException,msg= {}", e);
                        break;
                    } catch (Exception e) {
                        logger.error(">> [zkConnectionStateListener]-stateChanged exception,msg= {}", e);
                    }
                }
            }
        }
    }

在公众号菜单中可自行获取专属架构视频资料,包括不限于 java架构、python系列、人工智能系列、架构系列,以及最新面试、小程序、大前端均无私奉献,你会感谢我的哈

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-09-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 架构师修炼 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引入依赖
  • 创建会话
  • 创建节点
  • 更新节点
  • 删除节点
  • 高级特性
相关产品与服务
云服务器
云服务器(Cloud Virtual Machine,CVM)提供安全可靠的弹性计算服务。 您可以实时扩展或缩减计算资源,适应变化的业务需求,并只需按实际使用的资源计费。使用 CVM 可以极大降低您的软硬件采购成本,简化 IT 运维工作。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档