Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册watcher和NodeExistsException 异常等,对于我们日常 ZooKeeper 服务开发进行了详细的封装,例如 Leader 选举、分布式计数器、分布式锁。这就减少了技术人员在使用 ZooKeeper 时的大部分底层细节开发工作。
下面我们常用的绘画创建以及节点的管理,一起来看看如何去使用 Curator 去代替原生的 ZooKeeper 开发。
Curator 主要又两个关键包,curator-framework 包和 curator-recipes 包。
<dependency>
通过上面 Maven 引入依赖之后,我们就具备了使用 Curator 去操作 zookeeper 的能力。Curator 框架提供了的 API 是相当于流式的编码风格,主要是按照逻辑的先后顺序,采用调用的方式,在代码方式以及逻辑上更清晰一些。
下面,我们先来感受下这种编码风格,例如,我们要在 zookeeper 服务中创建一个 “/test/path” 的节点,然后,节点内容为 testData ,使用 Curator 框架编码如下:
client.create().forPath(“/test/path”, testData)
通过上面,我们已经引入了 Curator 框架,并且已经知道了其编码风格,接下来,我们来看看该如何使用 Curator 去创建一个会话。
在这之前,我们先来复习下,使用zookeeper 原生客户端如何去创建会话:
try {
使用 Curator 来创建会话:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
在定义 CuratorFramework 对象实例的时候,我们使用了 CuratorFrameworkFactory 工厂方法,下面我们来看下其关键信息:
现在,我们已经完成了客户端与服务端会话的建立,即证明两端具备了通信的能力下面,我们就再来看看使用 Curator 框架该如何去创建、删除以及更新节点等。
我们知道在创建节点的时候,是需要描叙该节点是临时节点、持久节点等节点相关数据信息,使用 Curator 创建节点代码如下:
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes());
接下来,我们再来看看节点更新代码该如何去编写:
client.setData().forPath("path","data_update".getBytes());
上面我们已经知道了如何去创建会话、创建节点以及更新节点,下面,我们再来看看使用 Curator 如何去删除节点,代码如下:
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");
我们在开发 zookeeper 服务的时候,经常会遇到这种情况,如果我们注册的节点异常断开或者是遇到其他网络问题导致的连接不可用,那这个时候我们怎么能立即感应呢?
在 Curator 中是通过 ConnectionStateListener 这个监听器去实现的,它主要是用来监控会话的连接状态,当状态发生改变的时候, zookeeper 服务就会启用不同的处理方式,其会话一共有六种基本状态:
接下来,我们来看看代码如何落地:
public class zkConnectionStateListener implements ConnectionStateListener {
private String zkRegPathPrefix;
private String regContent;
public zkConnectionStateListener(String zkRegPathPrefix, String regContent) {
this.zkRegPathPrefix = zkRegPathPrefix;
this.regContent = regContent;
}
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.LOST) {
logger.info("zk client: {} disConnected...",regContent);
while (true) {
try {
if (client.getZookeeperClient().blockUntilConnectedOrTimedOut()) {
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(zkRegPathPrefix, regContent.getBytes("UTF-8"));
break;
}
} catch (InterruptedException e) {
logger.error(">> [zkConnectionStateListener]-stateChanged InterruptedException,msg= {}", e);
break;
} catch (Exception e) {
logger.error(">> [zkConnectionStateListener]-stateChanged exception,msg= {}", e);
}
}
}
}
}
在公众号菜单中可自行获取专属架构视频资料,包括不限于 java架构、python系列、人工智能系列、架构系列,以及最新面试、小程序、大前端均无私奉献,你会感谢我的哈