前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >zookeeper源码分析(2)-客户端启动流程

zookeeper源码分析(2)-客户端启动流程

作者头像
Monica2333
发布2020-06-22 11:38:41
8140
发布2020-06-22 11:38:41
举报
文章被收录于专栏:码农知识点码农知识点
zookeeper原生客户端启动流程

客户端整体结构如下:

核心组件

ZooKeeper

客户端的入口,负责启动整个客户端。持有ClientCnxnZKWatchManager的实例,提供了客户端对节点操作的方法。

ZKWatchManager

watcher机制是zookeeper的一大特性,实现了分布式数据变化对客户端的实时通知功能,需要服务端和客户端的共同实现,可参考Zookeeper Watcher机制。包括对节点数据内容,子节点变化和默认none类型事件的三种watch.

ClientCnxn

管理客户端的socket i/o,包含两个核心线程。SendThread负责与服务端之间的读写网络通信和心跳维持。

EventThread主要对经过SendThread反序列化后的具体请求根据类型的不同,具体处理,同时也会负责对请求事件的watch回调。此外ClientCnxn含有HostProvider对象,能够在会话未过期前与当前服务端连接中断时,自动选择一个可用的服务端重建连接。将在初始化过程中详细介绍。

StaticHostProvider

zookeeper对HostProvider接口的简单实现,持有服务端的地址列表,可通过轮询的方式获得服务端的所有地址。HostProvider接口定义如下:

代码语言:javascript
复制
public interface HostProvider {
//返回服务器地址的总大小
    public int size();

    /**
     * The next host to try to connect to.
     * 
     * For a spinDelay of 0 there should be no wait.
     * 必须返回已被解析的InetSocketAddress对象

     */
    public InetSocketAddress next(long spinDelay);

    /**
     * Notify the HostProvider of a successful connection.
     * The HostProvider may use this notification to reset it's inner state.
     */
    public void onConnected();
}

在对StaticHostProvider的初始化时,会将解析出来的InetSocketAddress对象随机打散,防止多个客户端均连接到一台服务器上

代码语言:javascript
复制
 public StaticHostProvider(Collection<InetSocketAddress> serverAddresses)
            throws UnknownHostException {
      ·············省略List<InetSocketAddress> serverAddresses的构造过程·····················
        Collections.shuffle(this.serverAddresses);
    }

主要看下StaticHostProvider.next的实现

代码语言:javascript
复制
//记录当前客户端奕尝试连接完所有服务端的位置
private int lastIndex = -1;
//记录当前客户端尝试连接的服务端位置
private int currentIndex = -1;

 public InetSocketAddress next(long spinDelay) {
        ++currentIndex;
        if (currentIndex == serverAddresses.size()) {
            currentIndex = 0;
        }
        if (currentIndex == lastIndex && spinDelay > 0) {
            try {
                Thread.sleep(spinDelay);
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
            }
        } else if (lastIndex == -1) {
            // We don't want to sleep on the first ever connect attempt.
            lastIndex = 0;
        }

        return serverAddresses.get(currentIndex);
    }

实现逻辑就是从位于第一个的服务器开始,挨个向后获取下一个可尝试连接的服务器地址,如果走了一轮,则在下一轮获取第一个服务器前,需要睡眠spinDelayms,因为这种情况很可能服务器集群出现了某种故障。流程示意图如下:

环形地址列表队列

初始化过程

直接初始化一个ZooKeeper客户端对象,就是客户端的启动入口。如:

代码语言:javascript
复制
ZooKeeper zooKeeper = new ZooKeeper(ZK_SERVER, ZK_CONNECTION_TIMEOUT, null);

最终构造方法为:

代码语言:javascript
复制
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
//设置默认defaultWatcher 
//private final ZKWatchManager watchManager = new ZKWatchManager();
        watchManager.defaultWatcher = watcher;

        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }

流程为:

1.设置默认defaultWatcher

主要是用来对事件类型为EventType.None类型的回调。

2.初始化StaticHostProvider

主要是将服务端地址列表字符串解析为List<InetSocketAddress> serverAddresses,可供socket连接使用

3.初始化ClientCnxn

首先看下ClientCnxn的主要成员变量

代码语言:javascript
复制
//Packet:封装了客户端一次请求或服务端一次响应的完整数据
//已经发送但是等待服务端响应的packet集合
    private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();

    //需要发送的packet集合
    private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
//建立连接的超时时间
    private int connectTimeout;
 //服务端认为的下一次会话过期的具体时间
    private volatile int negotiatedSessionTimeout;
//客户端认为的最大会话超时时间,默认为sessionTimeout * 2 / 3
    private int readTimeout;
//会话的超时时间
    private final int sessionTimeout;
    private long sessionId;
    private byte sessionPasswd[] = new byte[16];
//客户端的命名空间,客户端所有的数据节点的路径都会默认在这层路径下创建。可通过`connectString`参数
//传入,如`127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a`,则`chrootPath=/app/a
final String chrootPath;
//用于记录客户端请求发起的先后顺序,没发送一个packet出去加1
    private int xid = 1;
//客户端连接状态,面向与服务端的连接状态
    private volatile States state = States.NOT_CONNECTED;
    final SendThread sendThread;
    final EventThread eventThread;
  private final ZooKeeper zooKeeper;
    private final ClientWatchManager watcher;
private final HostProvider hostProvider;

构造方法为:

代码语言:javascript
复制
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
            throws IOException {
        this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
             clientCnxnSocket, 0, new byte[16], canBeReadOnly);
    }

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;

        connectTimeout = sessionTimeout / hostProvider.size();

        readTimeout = sessionTimeout * 2 / 3;
//是否为只读客户端,默认不是
        readOnly = canBeReadOnly;

        sendThread = new SendThread(clientCnxnSocket);
        eventThread = new EventThread();

    }

初始化sendThread的构造方法为:

代码语言:javascript
复制
SendThread(ClientCnxnSocket clientCnxnSocket) {
            super(makeThreadName("-SendThread()"));
            state = States.CONNECTING;
            this.clientCnxnSocket = clientCnxnSocket;
            setDaemon(true);
        }

主要是将当前客户端连接状态置为States.CONNECTING,并初始化了ClientCnxnSocket

ClientCnxnSocket :负责实现更底层与服务端的socket io,默认实现为ClientCnxnSocketNIO,即NIO方式,它的成员变量为:

代码语言:javascript
复制
//socket是否未初始化
protected boolean initialized;
//已经发送的请求数量
    protected long sentCount = 0;
//已经接收的响应数量
    protected long recvCount = 0;
//最后一次接收响应的时间
    protected long lastHeard;
//最后一次发送请求的时间
    protected long lastSend;
//记录当下的时间
    protected long now;
    protected ClientCnxn.SendThread sendThread;

//输出日志和异常信息时使用
    protected long sessionId;
//NIO相关
private final Selector selector = Selector.open();
 private SelectionKey sockKey;
/**
     * This buffer is only used to read the length of the incoming message.
     */
    protected final ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);

    /**
     * After the length is read, a new incomingBuffer is allocated in
     * readLength() to receive the full message.
     */
    protected ByteBuffer incomingBuffer = lenBuffer;

ClientCnxnSocket底层梳理IO的方法为doTransport,将在后续介绍。

eventThread:事件处理线程,同样为守护线程,主要成员变量为:

代码语言:javascript
复制
//待处理事件的集合
private final LinkedBlockingQueue<Object> waitingEvents =
            new LinkedBlockingQueue<Object>();

        /** This is really the queued session state until the event
         * thread actually processes the event and hands it to the watcher.
         * But for all intents and purposes this is the state.
         */
//面向事件处理
        private volatile KeeperState sessionState = KeeperState.Disconnected;

       private volatile boolean wasKilled = false;
       private volatile boolean isRunning = false;

4.启动ClientCnxn

代码语言:javascript
复制
public void start() {
        sendThread.start();
        eventThread.start();
    }

可以看到分别启动了sendThread和eventThread线程

sendThread.run

代码语言:javascript
复制
public void run() {
//初始化clientCnxnSocket.sendThread和clientCnxnSocket.sessionId,后者此时为0
            clientCnxnSocket.introduce(this,sessionId);
//更新clientCnxnSocket.now为当前时间
            clientCnxnSocket.updateNow();
//更新clientCnxnSocket.lastSend和clientCnxnSocket.lastHeard为当前时间
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = System.currentTimeMillis();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
            while (state.isAlive()) {
//连接状态不为States.CLOSED 或 States. AUTH_FAILED
                try {
//当clientCnxnSocket.sockKey == null时,说明底层连接已和服务端断开
                    if (!clientCnxnSocket.isConnected()) {
                        if(!isFirstConnect){
//不是第一次连接且连接断开
                            try {
                                Thread.sleep(r.nextInt(1000));
                            } catch (InterruptedException e) {
                                LOG.warn("Unexpected exception", e);
                            }
                        }
                        // don't re-establish connection if we are closing
                        if (closing || !state.isAlive()) {
                            break;
                        }
//非客户端主动关闭或认证失败,连接断开时客户端会重新连接服务端
//也是第一次连接的入口
                        startConnect();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

                    if (state.isConnected()) {
                           ······················省略sasl认证··················
                        }
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
//重置连接可用时间
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    
                    if (to <= 0) {
                        String warnInfo;
                        warnInfo = "Client session timed out, have not heard from server in "
                            + clientCnxnSocket.getIdleRecv()
                            + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId);
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }
              
                  ·····················省略定时心跳发送和检测···················
                    clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                    if (closing) {
                        if (LOG.isDebugEnabled()) {
                            // closing so this is expected
                            LOG.debug("An exception was thrown while closing send thread for session 0x"
                                    + Long.toHexString(getSessionId())
                                    + " : " + e.getMessage());
                        }
                        break;
                    } else {
                        // this is ugly, you have a better way speak up
                        if (e instanceof SessionExpiredException) {
                            LOG.info(e.getMessage() + ", closing socket connection");
                        } else if (e instanceof SessionTimeoutException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof EndOfStreamException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof RWServerFoundException) {
                            LOG.info(e.getMessage());
                        } else {
                            LOG.warn(
                                    "Session 0x"
                                            + Long.toHexString(getSessionId())
                                            + " for server "
                                            + clientCnxnSocket.getRemoteSocketAddress()
                                            + ", unexpected error"
                                            + RETRY_CONN_MSG, e);
                        }
//说明线程出现异常,清空pendingQueue和outgoingQueue,需要等待客户端重新连接
                        cleanup();
                        if (state.isAlive()) {
                            eventThread.queueEvent(new WatchedEvent(
                                    Event.EventType.None,
                                    Event.KeeperState.Disconnected,
                                    null));
                        }
                        clientCnxnSocket.updateNow();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
                }
            }
            cleanup();
            clientCnxnSocket.close();
            if (state.isAlive()) {
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                        Event.KeeperState.Disconnected, null));
            }
            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                    "SendThread exited loop for session: 0x"
                           + Long.toHexString(getSessionId()));
        }

第一次创建连接的方法为sendThread.startConnect

代码语言:javascript
复制
 private void startConnect() throws IOException {
            state = States.CONNECTING;

            InetSocketAddress addr;
            if (rwServerAddress != null) {
                addr = rwServerAddress;
                rwServerAddress = null;
            } else {
                addr = hostProvider.next(1000);
            }
··············省略zookeeper Sasl认证校验的过程,如果失败会发送EventType.None、KeeperState.AuthFailed的事件···············
            clientCnxnSocket.connect(addr);
        }

实际建立连接的方法为clientCnxnSocket.connect

代码语言:javascript
复制
 void connect(InetSocketAddress addr) throws IOException {
        SocketChannel sock = createSock();
        try {
           registerAndConnect(sock, addr);
        } catch (IOException e) {
            LOG.error("Unable to open socket to " + addr);
            sock.close();
            throw e;
        }
        initialized = false;
// Reset incomingBuffer
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }

void registerAndConnect(SocketChannel sock, InetSocketAddress addr) 
    throws IOException {
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
//因为SocketChannel sock是非阻塞的,此时并没有马上建立真正的连接就会返回true
        boolean immediateConnect = sock.connect(addr);
        if (immediateConnect) {
            sendThread.primeConnection();
        }
    }

当连接返回时,会调用sendThread.primeConnection构建发送队列的packet

代码语言:javascript
复制
 void primeConnection() throws IOException {
            isFirstConnect = false;
            long sessId = (seenRwServerBefore) ? sessionId : 0;
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);
            synchronized (outgoingQueue) {
                // We add backwards since we are pushing into the front
                // Only send if there's a pending watch
                // TODO: here we have the only remaining use of zooKeeper in
                // this class. It's to be eliminated!
                if (!disableAutoWatchReset) {
                    List<String> dataWatches = zooKeeper.getDataWatches();
                    List<String> existWatches = zooKeeper.getExistWatches();
                    List<String> childWatches = zooKeeper.getChildWatches();
                    if (!dataWatches.isEmpty()
                                || !existWatches.isEmpty() || !childWatches.isEmpty()) {

                        Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                        Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                        Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                        long setWatchesLastZxid = lastZxid;

                        while (dataWatchesIter.hasNext()
                                       || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
                            List<String> dataWatchesBatch = new ArrayList<String>();
                            List<String> existWatchesBatch = new ArrayList<String>();
                            List<String> childWatchesBatch = new ArrayList<String>();
                            int batchLength = 0;

                            // Note, we may exceed our max length by a bit when we add the last
                            // watch in the batch. This isn't ideal, but it makes the code simpler.
                            while (batchLength < SET_WATCHES_MAX_LENGTH) {
                                final String watch;
                                if (dataWatchesIter.hasNext()) {
                                    watch = dataWatchesIter.next();
                                    dataWatchesBatch.add(watch);
                                } else if (existWatchesIter.hasNext()) {
                                    watch = existWatchesIter.next();
                                    existWatchesBatch.add(watch);
                                } else if (childWatchesIter.hasNext()) {
                                    watch = childWatchesIter.next();
                                    childWatchesBatch.add(watch);
                                } else {
                                    break;
                                }
                                batchLength += watch.length();
                            }

                            SetWatches sw = new SetWatches(setWatchesLastZxid,
                                    dataWatchesBatch,
                                    existWatchesBatch,
                                    childWatchesBatch);
                            RequestHeader h = new RequestHeader();
                            h.setType(ZooDefs.OpCode.setWatches);
                            h.setXid(-8);
                            Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
                            outgoingQueue.addFirst(packet);
                        }
                    }
                }

                for (AuthData id : authInfo) {
                    outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                            OpCode.auth), null, new AuthPacket(0, id.scheme,
                            id.data), null, null));
                }
                outgoingQueue.addFirst(new Packet(null, null, conReq,
                            null, null, readOnly));
            }
//只注册SelectionKey.OP_READ 和 SelectionKey.OP_WRITE事件
            clientCnxnSocket.enableReadWriteOnly();
           
        }

注册SelectionKey.OP_READ 和 SelectionKey.OP_WRITE事件到selector上

发送队列outgoingQueue从头到尾添加的packet的请求内容依次为:ConnectRequest ->AuthPacket(如果需要认证的话)->SetWatches (ps:当前watch信息,是需要服务端响应的请求,所以每次成功建立连接都需要将当前客户端的watch信息发送给连接的服务端)

之后run方法会调用clientCnxnSocket.doTransport,也是处理整个IO的方法

代码语言:javascript
复制
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//
                doIO(pendingQueue, outgoingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            synchronized(outgoingQueue) {
                if (findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    enableWrite();
                }
            }
        }
        selected.clear();
    }

此时selector上注册的事件为读写事件,将会调用doIO

代码语言:javascript
复制
 void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        if (sockKey.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
                    readLength();
                } else if (!initialized) {
//第一次连接调到此处
                    readConnectResult();
                    enableRead();
                    if (findSendablePacket(outgoingQueue,
                            cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else {
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        if (sockKey.isWritable()) {
            synchronized(outgoingQueue) {
                Packet p = findSendablePacket(outgoingQueue,
                        cnxn.sendThread.clientTunneledAuthenticationInProgress());

                if (p != null) {
                    updateLastSend();
                    // If we already started writing p, p.bb will already exist
                    if (p.bb == null) {
                        if ((p.requestHeader != null) &&
                                (p.requestHeader.getType() != OpCode.ping) &&
                                (p.requestHeader.getType() != OpCode.auth)) {
                            p.requestHeader.setXid(cnxn.getXid());
                        }
                        p.createBB();
                    }

                    sock.write(p.bb);
                    if (!p.bb.hasRemaining()) {
                        sentCount++;
                        outgoingQueue.removeFirstOccurrence(p);
                        if (p.requestHeader != null
                                && p.requestHeader.getType() != OpCode.ping
                                && p.requestHeader.getType() != OpCode.auth) {
                            synchronized (pendingQueue) {
                                pendingQueue.add(p);
                            }
                        }
                    }
                }
                if (outgoingQueue.isEmpty()) {
                    // No more packets to send: turn off write interest flag.
                    // Will be turned on later by a later call to enableWrite(),
                    // from within ZooKeeperSaslClient (if client is configured
                    // to attempt SASL authentication), or in either doIO() or
                    // in doTransport() if not.
                    disableWrite();
                } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                    // On initial connection, write the complete connect request
                    // packet, but then disable further writes until after
                    // receiving a successful connection response.  If the
                    // session is expired, then the server sends the expiration
                    // response and immediately closes its end of the socket.  If
                    // the client is simultaneously writing on its end, then the
                    // TCP stack may choose to abort with RST, in which case the
                    // client would never receive the session expired event.  See
                    // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
                    disableWrite();
                } else {
                    // Just in case
                    enableWrite();
                }
            }
        }
        }

首先会触发写事件sockKey.isWritable(),(因为写缓冲区可写但此时服务端并不会发送数据到客户端)将从outgoingQueue取出第一个ConnectRequest 的packet发送出去,然后取消写事件,等待服务端的响应

当除发读事件sockKey.isReadable()时,读取服务端的连接响应数据readConnectResult

代码语言:javascript
复制
void readConnectResult() throws IOException {

        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        conRsp.deserialize(bbia, "connect");

        // read "is read-only" flag
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }

        this.sessionId = conRsp.getSessionId();
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                conRsp.getPasswd(), isRO);
    }

会将响应数据反序列化为ConnectResponse conRsp,从中读取sessionId ,并调用连接回调函数sendThread.onConnected

代码语言:javascript
复制
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            if (negotiatedSessionTimeout <= 0) {
                state = States.CLOSED;

                eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.Expired, null));
                eventThread.queueEventOfDeath();

                throw new SessionExpiredException(warnInfo);
            }
            if (!readOnly && isRO) {
                LOG.error("Read/write client got connected to read-only server");
            }
            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
//lastIndex = currentIndex
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
            state = (isRO) ?
                    States.CONNECTEDREADONLY : States.CONNECTED;
            seenRwServerBefore |= !isRO;
            
            KeeperState eventState = (isRO) ?
                    KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(
                    Watcher.Event.EventType.None,
                    eventState, null));
        }

1.根据服务端认可的下次会话过期时间,重置客户端的negotiatedSessionTimeout,readTimeoutconnectTimeout

2.回调hostProvider,重置环形地址列表的指针lastIndex

3.设置sessionId ,sessionPasswd

4.根据是否只读,设置连接状态state= States.CONNECTEDREADONLY或 States.CONNECTED

5.发送连接状态事件给eventThread

接下来当一直是连接状态的时候,sendThread就开始不停的处理读写请求数据和定时发送心跳,可参考会话管理

eventThread.run

代码语言:javascript
复制
 public void run() {
           try {
              isRunning = true;
              while (true) {
                 Object event = waitingEvents.take();
                 if (event == eventOfDeath) {
                    wasKilled = true;
                 } else {
                    processEvent(event);
                 }
                 if (wasKilled)
                    synchronized (waitingEvents) {
                       if (waitingEvents.isEmpty()) {
                          isRunning = false;
                          break;
                       }
                    }
              }
           } catch (InterruptedException e) {
              LOG.error("Event thread exiting due to interruption", e);
           }

            LOG.info("EventThread shut down for session: 0x{}",
                     Long.toHexString(getSessionId()));
        }

主要流程为:

1.读取请求事件Object event = waitingEvents.take()

2.处理请求事件processEvent(event),再此先不展开事件具体处理。

参考资料:ZooKeeper的Java客户端使用

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • zookeeper原生客户端启动流程
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档