前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >ZooKeeper Watcher机制(源码分析)

ZooKeeper Watcher机制(源码分析)

作者头像
shysh95
发布2019-07-24 14:42:13
1.1K0
发布2019-07-24 14:42:13
举报
文章被收录于专栏:shysh95shysh95

客户端Watcher的注册

关键类:

  1. org.apache.zookeeper.ZooKeeper:客户端基础类、存储了ClientCnxn和ZkWatcherManager
  2. ZKWatchManager:ZooKeeper的内部类,实现了ClientWatchManager接口,主要用来存储各种类型的Watcher,主要有三种:dataWatches、existWatches、childWatches以及一个默认的defaultWatcher
  3. org.apache.zookeeper.ClientCnxn:与服务端的交互类,主要包含以下对象:LinkedListoutgoingQueue、SendThread和EventThread,其中outgoingQueue未待发送给服务端的Packet列表,SendThread线程负责和服务端进行请求交互,而EventThread线程则负责客户端Watcher事件的回调执行
  4. WatchRegistration:Zookeeper的内容类,包装了Watcher和clientPath,并且负责Watcher的注册
  5. Packet:ClientCnxn的内部类,与Zookeeper服务端通信的交互类

主要流程:

  1. 用户调用Zookeeper的getData方法,并将自定义的Watcher以参数形式传入,该方法的作用主要是封装请求,然后调用ClientCnxn的submitRequest方法提交请求
  2. ClientCnxn在调用submitRequest提交请求时,会将WatchRegistration(封装了我们传入的Watcher和clientPath)以参数的形式传入,submitRequest方法主要作用是将信息封装成Packet(ClientCnxn的内部类),并将封装好的Packet加入到ClientCnxn的待发送列表中(LinkedListoutgoingQueue)
  3. SendThread线程不断地从outgoingQueue取出未发送的Packet发送给客户端并且将该Packet加入pendingQueue(等待服务器响应的Packet列表)中,并通过自身的readResponse方法接收服务端的响应
  4. SendThread接收到客户端的响应以后,会调用ClientCnxn的finishPacket方法进行Watcher方法的注册
  5. 在finishPacket方法中,会取出Packet中的WatchRegistration对象,并调用其register方法,从ZKWatchManager取出对应的dataWatches、existWatches或者childWatches其中的一个Watcher集合,然后将自己的Watcher添加到该Watcher集合中。

Zookeeper客户端和服务端在交互时,并不会将Watcher实体发送给服务端,这样减少了服务端的内存消耗,并且提高了传输效率,这部分可以通过Packet的序列化代码(Packet中的createBB方法)可以看出,如下:

代码语言:javascript
复制
public void createBB() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeInt(-1, "len"); // We'll fill this in later
                if (requestHeader != null) {
                    requestHeader.serialize(boa, "header");
                }
                if (request instanceof ConnectRequest) {
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                    request.serialize(boa, "request");
                }
                baos.close();
                this.bb = ByteBuffer.wrap(baos.toByteArray());
                this.bb.putInt(this.bb.capacity() - 4);
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

服务端处理客户端的请求

关键类:

  1. org.apache.zookeeper.server.ZooKeeperServer:服务端基础类,初始化FinalRequestProcessor并将其包装在线程中运行,存储了ZKDatabase(ZooKeeper服务端的内存数据库)
  2. FinalRequestProcessor:用于处理来自客户端的请求
  3. ServerCnxn:代表了服务端与客户端一个连接,实现了Watcher接口,主要有两种实现:NIOServerCnxn和NettyServerCnxn
  4. ZKDatabase:ZooKeeper服务端的内存数据库,存储了DataTree
  5. DataTree:ZooKeeper服务端的树结构存储,另外还存储了两个WatcherManager,一个用来管理数据变更Watcher,另一个用来管理子节点变更Watcher
  6. WatcherManager:服务端管理Watcher的类,从两个维度来对Watcher进行存储,一个以目录为维度存储Watcher(HashMap<string, hashset

主要流程(以监听数据变更为例):

  1. FinalRequestProcessor收到来自客户端的请求,首先从ZooKeeperServer中取得ZKDatabase,然后调用其getData方法,并将ServerCnxn以参数的形式传入
  2. ZKDatabase的getData方法实际上会去调用DataTree的getData方法
  3. DataTree的getData方法会获取节点的内容,并将ServerCnxn添加到WatcherManager中
  4. 当ZooKeeper服务端为节点设置内容时,最终会调用DataTree中的Stat setData(String path, byte data[], int version, long zxid, long time)方法,该方法在设置完节点内容以后,将会触发Watcher事件,会调用WatcherManager的triggerWatch方法

服务端Watcher触发的主要步骤

  1. 封装WatchedEvent对象,取得该节点的所有Watcher列表,然后并将其从watchTable和watch2Paths中移除
  2. 循环遍历Watcher(其实就是ServerCnxn),调用其process方法,并将WatchedEvent以参数形式传入
  3. 在此处以NIOServerCnxn讲解,process方法中,主要是在请求头中标记xid为-1,代表这是一个通知事件,然后将WatchedEvent转换成WatcherEvent以便网络传输然后经序列化后发送给客户端

客户端如何接收Watcher事件和执行回调

主要逻辑:

  1. SendThread在收到服务端的请求后,会判断xid的类型,发现xid值为-1,然后调用Watcher的相关处理
  2. 首先将WatcherEvent从服务端的响应中反序列化出来,然后转换成WatchedEvent,并且调用EventThread的queueEvent方法去处理该WatchedEvent
  3. EventThread的queueEvent方法会调用ZKWatchManager的materialize的方法从ZKWatchManager的dataWatches取出所有相关的Watcher添加到WatcherSetEventPair中并将原集合中的移除,然后将WatcherSetEventPair添加到一个LinkedBlockingQueue waitingEvents集合,代表还未处理的Watche事件
  4. EventThread线程会不断轮询waitingEvents集合,取出还未处理的WatcherSetEventPair,并取出其中Watcher集合,循环处理调用Watcher的process方法从而实现了回调
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-05-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员修炼笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 客户端Watcher的注册
  • 服务端处理客户端的请求
  • 服务端Watcher触发的主要步骤
  • 客户端如何接收Watcher事件和执行回调
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档