Zookeeper-watcher机制源码分析(二)

服务端有一个NettyServerCnxn类,用来处理客户端发送过来的请求
 NettyServerCnxn
 public void receiveMessage(ChannelBuffer message) {
 try {
 while(message.readable() && !throttled) {
 if (bb != null) { //ByteBuffer不为空
 if (LOG.isTraceEnabled()) {
 LOG.trace("message readable " + message.readableBytes()
 + " bb len " + bb.remaining() + " " + bb);
 ByteBuffer dat = bb.duplicate();
 dat.flip();
 LOG.trace(Long.toHexString(sessionId)
 + " bb 0x"
 + ChannelBuffers.hexDump(
 ChannelBuffers.copiedBuffer(dat)));
 }
 //bb剩余空间大于message中可读字节大小
 if (bb.remaining() > message.readableBytes()) {
 int newLimit = bb.position() + message.readableBytes();
 bb.limit(newLimit);
 }
 // 将message写入bb中
 message.readBytes(bb);
 bb.limit(bb.capacity());
                if (LOG.isTraceEnabled()) {
                    LOG.trace("after readBytes message readable "
                            + message.readableBytes()
                            + " bb len " + bb.remaining() + " " + bb);
                    ByteBuffer dat = bb.duplicate();
                    dat.flip();
                    LOG.trace("after readbytes "
                            + Long.toHexString(sessionId)
                            + " bb 0x"
                            + ChannelBuffers.hexDump(
                                    ChannelBuffers.copiedBuffer(dat)));
                }
                if (bb.remaining() == 0) { // 已经读完message,表示内容已经全部接收
                    packetReceived(); // 统计接收信息
                    bb.flip();

                    ZooKeeperServer zks = this.zkServer;
                    if (zks == null || !zks.isRunning()) {//Zookeeper服务器为空 ,说明服务端挂了
                        throw new IOException("ZK down");
                    }
                    if (initialized) {
                        //处理客户端传过来的数据包
                        zks.processPacket(this, bb);

                        if (zks.shouldThrottle(outstandingCount.incrementAndGet())) {
                            disableRecvNoWait();
                        }
                    } else {
                        LOG.debug("got conn req request from "
                                + getRemoteSocketAddress());
                        zks.processConnectRequest(this, bb);
                        initialized = true;
                    }
                    bb = null;
                }
            } else { //bb为null的情况,大家自己去看,我就不细讲了
                if (LOG.isTraceEnabled()) {
                    LOG.trace("message readable "
                            + message.readableBytes()
                            + " bblenrem " + bbLen.remaining());
                    ByteBuffer dat = bbLen.duplicate();
                    dat.flip();
                    LOG.trace(Long.toHexString(sessionId)
                            + " bbLen 0x"
                            + ChannelBuffers.hexDump(
                                    ChannelBuffers.copiedBuffer(dat)));
                }

                if (message.readableBytes() < bbLen.remaining()) {
                    bbLen.limit(bbLen.position() + message.readableBytes());
                }
                message.readBytes(bbLen);
                bbLen.limit(bbLen.capacity());
                if (bbLen.remaining() == 0) {
                    bbLen.flip();

                    if (LOG.isTraceEnabled()) {
                        LOG.trace(Long.toHexString(sessionId)
                                + " bbLen 0x"
                                + ChannelBuffers.hexDump(
                                        ChannelBuffers.copiedBuffer(bbLen)));
                    }
                    int len = bbLen.getInt();
                    if (LOG.isTraceEnabled()) {
                        LOG.trace(Long.toHexString(sessionId)
                                + " bbLen len is " + len);
                    }

                    bbLen.clear();
                    if (!initialized) {
                        if (checkFourLetterWord(channel, message, len)) {
                            return;
                        }
                    }
                    if (len < 0 || len > BinaryInputArchive.maxBuffer) {
                        throw new IOException("Len error " + len);
                    }
                    bb = ByteBuffer.allocate(len);
                }
            }
        }
    } catch(IOException e) {
        LOG.warn("Closing connection to " + getRemoteSocketAddress(), e);
        close();
    }
}
ZookeeperServer-zks.processPacket(this, bb);
 处理客户端传送过来的数据包
 public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
 // We have the request, now process and setup for next
 InputStream bais = new ByteBufferInputStream(incomingBuffer);
 BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
 RequestHeader h = new RequestHeader();
 h.deserialize(bia, "header"); //反序列化客户端header头信息
 // Through the magic of byte buffers, txn will not be
 // pointing
 // to the start of the txn
 incomingBuffer = incomingBuffer.slice();
 if (h.getType() == OpCode.auth) { //判断当前操作类型,如果是auth操作,则执行下面的代码
 LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
 AuthPacket authPacket = new AuthPacket();
 ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
 String scheme = authPacket.getScheme();
 ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
 Code authReturn = KeeperException.Code.AUTHFAILED;
 if(ap != null) {
 try {
 authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());
 } catch(RuntimeException e) {
 LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " due to " + e);
 authReturn = KeeperException.Code.AUTHFAILED;
 }
 }
 if (authReturn == KeeperException.Code.OK) {
 if (LOG.isDebugEnabled()) {
 LOG.debug("Authentication succeeded for scheme: " + scheme);
 }
 LOG.info("auth success " + cnxn.getRemoteSocketAddress());
 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
 KeeperException.Code.OK.intValue());
 cnxn.sendResponse(rh, null, null);
 } else {
 if (ap == null) {
 LOG.warn("No authentication provider for scheme: "
 + scheme + " has "
 + ProviderRegistry.listProviders());
 } else {
 LOG.warn("Authentication failed for scheme: " + scheme);
 }
 // send a response...
 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
 KeeperException.Code.AUTHFAILED.intValue());
 cnxn.sendResponse(rh, null, null);
 // ... and close connection
 cnxn.sendBuffer(ServerCnxnFactory.closeConn);
 cnxn.disableRecv();
 }
 return;
 } else { //如果不是授权操作,再判断是否为sasl操作
 if (h.getType() == OpCode.sasl) {
 Record rsp = processSasl(incomingBuffer,cnxn);
 ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
 cnxn.sendResponse(rh,rsp, "response"); // not sure about 3rd arg..what is it?
 return;
 }
 else {//最终进入这个代码块进行处理
 //封装请求对象
 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
 h.getType(), incomingBuffer, cnxn.getAuthInfo());
 si.setOwner(ServerCnxn.me);
 // Always treat packet from the client as a possible
 // local request.
 setLocalSessionFlag(si);
 submitRequest(si); //提交请求
 }
 }
 cnxn.incrOutstandingRequests(h);
 }
 submitRequest
 负责在服务端提交当前请求
 public void submitRequest(Request si) {
 if (firstProcessor == null) { //processor处理器,request过来以后会经历一系列处理器的处理过程
 synchronized (this) {
 try {
 // Since all requests are passed to the request
 // processor it should wait for setting up the request
 // processor chain. The state will be updated to RUNNING
 // after the setup.
 while (state == State.INITIAL) {
 wait(1000);
 }
 } catch (InterruptedException e) {
 LOG.warn("Unexpected interruption", e);
 }
 if (firstProcessor == null || state != State.RUNNING) {
 throw new RuntimeException("Not started");
 }
 }
 }
 try {
 touch(si.cnxn);
 boolean validpacket = Request.isValid(si.type); //判断是否合法
 if (validpacket) {
 firstProcessor.processRequest(si);  调用firstProcessor发起请求,而这个firstProcess是一个接口,有多个实现类,具体的调用链是怎么样的?往下看吧
 if (si.cnxn != null) {
 incInProcess();
 }
 } else {
 LOG.warn("Received packet at server of unknown type " + si.type);
 new UnimplementedRequestProcessor().processRequest(si);
 }
 } catch (MissingSessionException e) {
 if (LOG.isDebugEnabled()) {
 LOG.debug("Dropping request: " + e.getMessage());
 }
 } catch (RequestProcessorException e) {
 LOG.error("Unable to process request:" + e.getMessage(), e);
 }
 }
 firstProcessor的请求链组成
 1.firstProcessor的初始化是在ZookeeperServer的setupRequestProcessor中完成的,代码如下
 protected void setupRequestProcessors() {
 RequestProcessor finalProcessor = new FinalRequestProcessor(this);
 RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
 ((SyncRequestProcessor)syncProcessor).start();
 firstProcessor = new PrepRequestProcessor(this, syncProcessor);//需要注意的是,PrepRequestProcessor中传递的是一个syncProcessor
 ((PrepRequestProcessor)firstProcessor).start();
 }
 从上面我们可以看到firstProcessor的实例是一个PrepRequestProcessor,而这个构造方法中又传递了一个Processor构成了一个调用链。
 RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
 而syncProcessor的构造方法传递的又是一个Processor,对应的是FinalRequestProcessor
 2.所以整个调用链是PrepRequestProcessor -> SyncRequestProcessor ->FinalRequestProcessor
 PredRequestProcessor.processRequest(si);
 通过上面了解到调用链关系以后,我们继续再看firstProcessor.processRequest(si); 会调用到PrepRequestProcessor
 public void processRequest(Request request) {
 submittedRequests.add(request);
 }
 唉,很奇怪,processRequest只是把request添加到submittedRequests中,根据前面的经验,很自然的想到这里又是一个异步操作。而subittedRequests又是一个阻塞队列
 LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
 而PrepRequestProcessor这个类又继承了线程类,因此我们直接找到当前类中的run方法如下
 public void run() {
 try {
 while (true) {
 Request request = submittedRequests.take(); //ok,从队列中拿到请求进行处理
 long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
 if (request.type == OpCode.ping) {
 traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
 }
 if (LOG.isTraceEnabled()) {
 ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
 }
 if (Request.requestOfDeath == request) {
 break;
 }
 pRequest(request); //调用pRequest进行预处理
 }
 } catch (RequestProcessorException e) {
 if (e.getCause() instanceof XidRolloverException) {
 LOG.info(e.getCause().getMessage());
 }
 handleException(this.getName(), e);
 } catch (Exception e) {
 handleException(this.getName(), e);
 }
 LOG.info("PrepRequestProcessor exited loop!");
 }
pRequest
 预处理这块的代码太长,就不好贴了。前面的N行代码都是根据当前的OP类型进行判断和做相应的处理,在这个方法中的最后一行中,我们会看到如下代码
 nextProcessor.processRequest(request);
 很显然,nextProcessor对应的应该是SyncRequestProcessor
 SyncRequestProcessor. processRequest
 public void processRequest(Request request) {
 // request.addRQRec(">sync");
 queuedRequests.add(request);
 }
 这个方法的代码也是一样,基于异步化的操作,把请求添加到queuedRequets中,那么我们继续在当前类找到run方法
 public void run() {
 try {
 int logCount = 0;
        // we do this in an attempt to ensure that not all of the servers
        // in the ensemble take a snapshot at the same time
        int randRoll = r.nextInt(snapCount/2);
        while (true) {
            Request si = null;
            //从阻塞队列中获取请求
            if (toFlush.isEmpty()) {
                si = queuedRequests.take();  
            } else {
                si = queuedRequests.poll();
                if (si == null) {
                    flush(toFlush);
                    continue;
                }
            }
            if (si == requestOfDeath) {
                break;
            }
            if (si != null) {
                // track the number of records written to the log
                //下面这块代码,粗略看来是触发快照操作,启动一个处理快照的线程
                if (zks.getZKDatabase().append(si)) {
                    logCount++;
                    if (logCount > (snapCount / 2 + randRoll)) {
                        randRoll = r.nextInt(snapCount/2);
                        // roll the log
                        zks.getZKDatabase().rollLog();
                        // take a snapshot
                        if (snapInProcess != null && snapInProcess.isAlive()) {
                            LOG.warn("Too busy to snap, skipping");
                        } else {
                            snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                    public void run() {
                                        try {
                                            zks.takeSnapshot();
                                        } catch(Exception e) {
                                            LOG.warn("Unexpected exception", e);
                                        }
                                    }
                                };
                            snapInProcess.start();
                        }
                        logCount = 0;
                    }
                } else if (toFlush.isEmpty()) {
                    // optimization for read heavy workloads
                    // iff this is a read, and there are no pending
                    // flushes (writes), then just pass this to the next
                    // processor
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si); //继续调用下一个处理器来处理请求
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable)nextProcessor).flush();
                        }
                    }
                    continue;
                }
                toFlush.add(si);
                if (toFlush.size() > 1000) {
                    flush(toFlush);
                }
            }
        }
    } catch (Throwable t) {
        handleException(this.getName(), t);
    } finally{
        running = false;
    }
    LOG.info("SyncRequestProcessor exited!");
}
FinalRequestProcessor. processRequest
 这个方法就是我们在课堂上分析到的方法了,FinalRequestProcessor.processRequest方法并根据Request对象中的操作更新内存中Session信息或者znode数据。
 这块代码有小300多行,就不全部贴出来了,我们直接定位到关键代码,根据客户端的OP类型找到如下的代码
 case OpCode.exists: {
 lastOp = "EXIS";
 // TODO we need to figure out the security requirement for this!
 ExistsRequest existsRequest = new ExistsRequest();
 //反序列化  (将ByteBuffer反序列化成为ExitsRequest.这个就是我们在客户端发起请求的时候传递过来的Request对象
 ByteBufferInputStream.byteBuffer2Record(request.request,
 existsRequest);
 String path = existsRequest.getPath(); //得到请求的路径
 if (path.indexOf('\0') != -1) {
 throw new KeeperException.BadArgumentsException();
 }
 //终于找到一个很关键的代码,判断请求的getWatch是否存在,如果存在,则传递cnxn(servercnxn)
 //对于exists请求,需要监听data变化事件,添加watcher
 Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
 rsp = new ExistsResponse(stat); //在服务端内存数据库中根据路径得到结果进行组装,设置为ExistsResponse
 break;
 }
 statNode这个方法做了什么?
 public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {
 return dataTree.statNode(path, serverCnxn);
 }
 一路向下,在下面这个方法中,讲ServerCnxn向上转型为Watcher了。 因为ServerCnxn实现了Watcher接口
 public Stat statNode(String path, Watcher watcher)
 throws KeeperException.NoNodeException {
 Stat stat = new Stat();
 DataNode n = nodes.get(path); //获得节点数据
 if (watcher != null) { //如果watcher不为空,则讲当前的watcher和path进行绑定
 dataWatches.addWatch(path, watcher);
 }
 if (n == null) {
 throw new KeeperException.NoNodeException();
 }
 synchronized (n) {
 n.copyStat(stat);
 return stat;
 }
 }
 WatchManager.addWatch(path, watcher);
 synchronized void addWatch(String path, Watcher watcher) {
 HashSet<Watcher> list = watchTable.get(path);  //判断watcherTable中是否存在当前路径对应的watcher
 if (list == null) { //不存在则主动添加
 // don't waste memory if there are few watches on a node
 // rehash when the 4th entry is added, doubling size thereafter
 // seems like a good compromise
 list = new HashSet<Watcher>(4); // 新生成watcher集合
 watchTable.put(path, list);
 }
 list.add(watcher); //添加到watcher表
    HashSet<String> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();
        watch2Paths.put(watcher, paths); // 设置watcher到节点路径的映射
    }
    paths.add(path);  // 将路径添加至paths集合
}

其大致流程如下   ① 通过传入的path(节点路径)从watchTable获取相应的watcher集合,进入②

② 判断①中的watcher是否为空,若为空,则进入③,否则,进入④

③ 新生成watcher集合,并将路径path和此集合添加至watchTable中,进入④

④ 将传入的watcher添加至watcher集合,即完成了path和watcher添加至watchTable的步骤,进入⑤

⑤ 通过传入的watcher从watch2Paths中获取相应的path集合,进入⑥

⑥ 判断path集合是否为空,若为空,则进入⑦,否则,进入⑧

⑦ 新生成path集合,并将watcher和paths添加至watch2Paths中,进入⑧

⑧ 将传入的path(节点路径)添加至path集合,即完成了path和watcher添加至watch2Paths的步骤 总结 调用关系链如下

图片1.png

图文里的技术如何学习,有没有免费资料?

对Java技术,架构技术感兴趣的同学,欢迎加QQ群619881427,一起学习,相互讨论。

群内已经有小伙伴将知识体系整理好(源码,笔记,PPT,学习视频),欢迎加群免费领取。

分享给喜欢Java,喜欢编程,有梦想成为架构师的程序员们,希望能够帮助到你们。

不是Java程序员也没关系,帮忙转发给更多朋友!谢谢。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏技术博客

MVC Html.RenderPartial和Html.partial

①Html.Partial是HtmlHelper的扩展方法,而Html.RenderPartial是HtmlHelper自带方法,两者功能相近。但

18540
来自专栏Golang语言社区

50. RESTful API的简单实现 | 厚土Go学习笔记

RESTfull API是现在很流行的 API 设计风格。众所周知的 HTTP 1.1规范正是基于 REST 架构风格的指导原理来设计的。需要注意的是,REST...

29140
来自专栏逸鹏说道

我这么玩Web Api(二)

数据验证,全局数据验证与单元测试 目录 一、模型状态 - ModelState 二、数据注解 - Data Annotations 三、自定义数据注解 四、全局...

54060
来自专栏ascii0x03的安全笔记

C/C++网络编程时注意的问题小结

1.网络编程在自己定义结构体实现协议的时候,一定要注意字节对齐这个问题。否则sizeof和强制转换指针的时候都会出现很难发现的bug。 什么是字节对齐自行百度。...

36390
来自专栏何俊林

LeakCanary的原理,你知道么?

32320
来自专栏跟着阿笨一起玩NET

C#FTP下载文件出现远程服务器返回错误: (500) 语法错误,无法识别命令

如果下载多个文件的时候,有时候莫名其妙的出现500服务器错误,很有可能是没有设置KeepAlive 属性导致的。

33310
来自专栏GreenLeaves

oracle 层次化查询(生成菜单树等)

1、简介:Oracle层次化查询是Oracle特有的功能实现,主要用于返回一个数据集,这个数据集存在树的关系(数据集中存在一个Pid记录着当前数据集某一条记录的...

26580
来自专栏chenssy

【追光者系列】HikariCP源码分析之evict、时钟回拨、连接创建生命周期

evict定义在com.zaxxer.hikari.pool.PoolEntry中,evict的汉语意思是驱逐、逐出,用来标记连接池中的连接不可用。

47440
来自专栏编码小白

tomcat请求处理分析(一) 启动container实例

1.1.1  启动container实例 其主要是进行了生命周期中一系列的操作之后调用StandardEngine中的 startInternal方法,不难看出...

38760
来自专栏何俊林

MediaCodec进行编解码AAC(文件格式转换)

本文来自eric原创授权发布,eric,音视频开发爱好者,简书地址:https://www.jianshu.com/u/1502591a1753。欢迎大家关注。...

74270

扫码关注云+社区

领取腾讯云代金券