前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Zookeeper源码分析:Watcher机制

Zookeeper源码分析:Watcher机制

作者头像
星哥玩云
发布2022-07-04 13:33:12
2210
发布2022-07-04 13:33:12
举报
文章被收录于专栏:开源部署

1. 设置Watcher 使用Watcher需要先实现Watcher接口,并将实现类对象传递到指定方法中,如getChildren, exist等。Zookeeper允许在构造Zookeeper对象时候指定一个默认Watcher对象.getChildren和exit方法可以使用这个默认的Watcher对象,也可以指定一个新Watcher对象。

Code 1: Watcher接口

public interface Watcher {

    /**     * Event的状态     */     public interface Event {         /**         * 在事件发生时,ZooKeeper的状态         */         public enum KeeperState {

            @Deprecated             Unknown (-1),

            Disconnected (0),

            @Deprecated             NoSyncConnected (1),

            SyncConnected (3),

            AuthFailed (4),

            ConnectedReadOnly (5),

            SaslAuthenticated(6),

            Expired (-112);

            private final int intValue; 

            KeeperState( int intValue) {                 this.intValue = intValue;             } 

            ......         }

        /**         * ZooKeeper中的事件         */         public enum EventType {             None (-1),             NodeCreated (1),             NodeDeleted (2),             NodeDataChanged (3),             NodeChildrenChanged (4);

            private final int intValue;    // Integer representation of value                                             // for sending over wire             EventType( int intValue) {                 this.intValue = intValue;             }             ......          }     }

    //Watcher的回调方法     abstract public void process(WatchedEvent event); }

Code 2: Zookeeper.getChildren(final String, Watcher)方法

public List<String> getChildren(final String path, Watcher watcher)     throws KeeperException, InterruptedException {     final String clientPath = path;     PathUtils. validatePath(clientPath);

    WatchRegistration wcb = null;     //如果watcher不等于null, 构建WatchRegistration对象,     //该对象描述了watcher和path之间的关系     if (watcher != null) {         wcb = new ChildWatchRegistration(watcher, clientPath);     }     //在传入的path加上root path前缀,构成服务器端的绝对路径     final String serverPath = prependChroot(clientPath);     //构建RequestHeader对象     RequestHeader h = new RequestHeader();     //设置操作类型为OpCode. getChildren     h.setType(ZooDefs.OpCode. getChildren);     //构建GetChildrenRequest对象     GetChildrenRequest request = new GetChildrenRequest();     //设置path     request.setPath(serverPath);     //设置是否使用watcher     request.setWatch(watcher != null);     //构建GetChildrenResponse对象     GetChildrenResponse response = new GetChildrenResponse();     //提交请求,并阻塞等待结果     ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);     if (r.getErr() != 0) {         throw KeeperException.create(KeeperException.Code. get(r.getErr()),                 clientPath);     }     return response.getChildren(); }

Follower的NIOServerCnxn类接到了Client的请求,会调用ZookeeperServer.processPacket()方法。该方法会构建一个Request对象,并调用第一个处理器FollowerRequestProcessor。

由于我们的请求只是一个读操作,而不是一个Quorum请求或者sync请求,所以FollowerRequestProcessor不需要调用Follower.request()方法将请求转给Leader,只需要将请求传递到下一个处理器CommitProcessor。

处理器CommitProcessor线程发现请求是读请求后,直接将Requet对象加入到toProcess队列中,在接下的循环中会调用FinalRequestProcessor.processRequest方法进行处理。

FinalRequestProcessor.processRequest方法最终会调用ZKDatabase中的读操作方法(如statNode和getData方法), 而ZKDatabase的这些方法会最终调用DataTree类的方法来获取指定path的znode信息并返回给Client端,同时也会设置Watcher。

Code 3: FinalRequestProcessor对OpCode.getData请求的处理

case OpCode. getData: {               lastOp = "GETD";               GetDataRequest getDataRequest = new GetDataRequest();               ByteBufferInputStream. byteBuffer2Record(request.request,                       getDataRequest);               //获得znode对象               DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());               //n为null, 抛出NoNodeException异常               if (n == null) {                   throw new KeeperException.NoNodeException();               }               Long aclL;               synchronized(n) {                   aclL = n. acl;               }               //检查是否有读权限               PrepRequestProcessor. checkACL(zks, zks.getZKDatabase().convertLong(aclL),                       ZooDefs.Perms. READ,                       request. authInfo);               //构建状态对象stat               Stat stat = new Stat();               //获得指定path的znode数据,               //如果GetDataRequest.getWatcher()返回true, 将ServerCnxn类型对象cnxn传递进去。               //ServerCnxn是实现了Watcher接口               byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,                       getDataRequest. getWatch() ? cnxn : null);               //构建GetDataResponse对象               rsp = new GetDataResponse(b, stat);               break;           }

Code 4: DataTree.getData()方法

public byte[] getData(String path, Stat stat, Watcher watcher)         throws KeeperException.NoNodeException {     //从nodes map中获取指定path的DataNode对象     DataNode n = nodes.get(path);     //如果n为null, 则抛出NoNodeException异常     if (n == null) {         throw new KeeperException.NoNodeException();     }     synchronized (n) {         //将n的状态copy到stat中         n.copyStat(stat);         //如果watcher不会null, 则将(path, watcher)键值对放入dataWatchers Map里         if (watcher != null) {             dataWatches.addWatch(path, watcher);         }         //返回节点数据         return n.data ;     } }

2. 修改znode数据触发Watcher 在Zookeeper二阶段提交的COMMIT阶段。当Follower从Leader那接收到一个写请求的Leader.COMMIT数据包,会调用FinalRequestProcessor.processRequest()方法。Leader本身在发送完Leader.COMMIT数据包,也会调用FinalRequestProcessor.processRequest()方法。

如果是setData修改数据请求,那么FinalRequestProcessor.processRequest()方法最终会调用到DataTree.setData方法将txn应用到指定znode上,同时触发Watcher,并发送notification给Client端。

其关SetData请求的时序图如下:

triggerWatcher

Code 5: DataTree.setData()方法

public Stat setData(String path, byte data[], int version, long zxid,         long time) throws KeeperException.NoNodeException {     Stat s = new Stat();     //根据path, 获得DataNode对象n     DataNode n = nodes.get(path);     //如果n为null, 则抛出NoNodeException异常     if (n == null) {         throw new KeeperException.NoNodeException();     }     byte lastdata[] = null;     synchronized (n) {         lastdata = n. data;         n. data = data;         n. stat.setMtime(time);         n. stat.setMzxid(zxid);         n. stat.setVersion(version);         n.copyStat(s);     }     // now update if the path is in a quota subtree.     String lastPrefix = getMaxPrefixWithQuota(path);     if(lastPrefix != null) {       this.updateBytes(lastPrefix, (data == null ? 0 : data.length)           - (lastdata == null ? 0 : lastdata.length ));     }     //触发Watcher     dataWatches.triggerWatch(path, EventType.NodeDataChanged);     return s; }

Code 6: WatchManage.triggerWatcher()方法,触发Watcher。

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {     WatchedEvent e = new WatchedEvent(type,             KeeperState. SyncConnected, path);     HashSet<Watcher> watchers;     synchronized (this ) {         //从watchTable删除掉path对于的watcher         watchers = watchTable.remove(path);         if (watchers == null || watchers.isEmpty()) {             if (LOG .isTraceEnabled()) {                 ZooTrace. logTraceMessage(LOG,                         ZooTrace. EVENT_DELIVERY_TRACE_MASK,                         "No watchers for " + path);             }             return null;         }         for (Watcher w : watchers) {             HashSet<String> paths = watch2Paths.get(w);             if (paths != null) {                 paths.remove(path);             }         }     }     //循环处理所有关于path的Watcher, 这里Watcher对象实际上就是ServerCnxn类型对象     for (Watcher w : watchers) {         if (supress != null && supress.contains(w)) {             continue;         }         w.process(e);     }     return watchers; }

Code 7: NIOServerCnxn.process方法,发送notification给Client端

synchronized public void process (WatchedEvent event) {     ReplyHeader h = new ReplyHeader(-1, -1L, 0);     if (LOG .isTraceEnabled()) {         ZooTrace. logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK ,                                 "Deliver event " + event + " to 0x"                                 + Long. toHexString(this. sessionId)                                 + " through " + this );     }

    // Convert WatchedEvent to a type that can be sent over the wire     WatcherEvent e = event.getWrapper();     //发送notification给Client端     sendResponse(h, e, "notification"); }

3. 总结 Watcher具有one-time trigger的特性,在代码中我们也可以看到一个watcher被处理后会立即从watchTable中删掉。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档