前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >zk QuorumPeer分析1 启动与运行

zk QuorumPeer分析1 启动与运行

作者头像
平凡的学生族
发布2019-12-02 21:57:35
5540
发布2019-12-02 21:57:35
举报
文章被收录于专栏:后端技术后端技术

参考

zk源码阅读31:集群server中QuorumPeer源码解析

我们需要观察线程启动的start方法,和线程运行的run方法。

1. QuorumPeer.start

  1. loadDataBase(); 加载DataTree和Sessions
  2. startServerCnxnFactory(); 运行ServerCnxnFactory和secureCnxnFactory的start方法。这两个都
  3. secureCnxnFactory的用途暂时没分析,应该充当"守护线程"的作用(它不是线程,但却会启动一系列能接替同样工作的线程)
  4. ServerCnxnFactory还会启动:
    1. workerPool线程
    2. selectorThread, acceptThread, expirerThread线程
  5. adminServer.start(); (一个新功能,便于在本地通过web来调用命令,不是核心功能。)
  6. synchronized startLeaderElection();启动投票线程 - currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());设置currentVote,当前投票投自己 - this.electionAlg = createElectionAlgorithm(electionType);createElectionAlgorithm启动一个Election线程(一般是FastLeaderElection线程),所以会: 1. 创建QuorumCnxManager,赋给AtomicReference<QuorumCnxManager> qcmRef 2. 启动QuorumCnxManager.Listener线程,该线程用来监听竞选端口的新建连接Socket,并根据Socket加载其它用于交流的数据结构(流、BlockingQueue、搬运线程等)。 3. FastLeaderElection fle = new FastLeaderElection(this, qcm)创建启动FastLeaderElection线程 - 启动Messenger线程 通常至此,FastLeaderElection线程和它需要的线程都已启动

2. QuorumPeer.run

2.1 注册jmx

代码语言:txt
复制
public void run() {
        updateThreadName();

        LOG.debug("Starting quorum peer");
        try {
            jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(jmxQuorumBean, null);
            for(QuorumServer s: getView().values()){
                ZKMBeanInfo p;
                if (getId() == s.id) {
                    p = jmxLocalPeerBean = new LocalPeerBean(this);
                    try {
                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                        jmxLocalPeerBean = null;
                    }
                } else {
                    RemotePeerBean rBean = new RemotePeerBean(this, s);
                    try {
                        MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                        jmxRemotePeerBean.put(s.id, rBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                    }
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxQuorumBean = null;
        }
...

其中大量用到MBeanRegistry.getInstance().register,我们看下MBeanRegistry的源码:

代码语言:txt
复制
/**
 * This class provides a unified interface for registering/unregistering of
 * zookeeper MBeans with the platform MBean server. It builds a hierarchy of MBeans
 * where each MBean represented by a filesystem-like path. Eventually, this hierarchy
 * will be stored in the zookeeper data tree instance as a virtual data tree.
 */
public class MBeanRegistry {
   ...

根据注释,该类是用于提供jmx服务的,并且会将zookeeper目录树根据路径存储在其中。我们不需要使用jmx,此处不必理解。

同理,jmx相关的MBeanServer和ZKMBeanInfo也不必理解。

另外,源码getView()用到了QuorumVerifier类变量quorumVerifier。QuorumVerifier包含了所有成员的引用,关键方法containsQuorum用来判断是否集群操作成功。其实现类一般用QuorumMaj,根据是否ACK数过半来判断操作是否成功。

2.2 main loop

代码语言:txt
复制
try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {...} else {
                        try {...} catch (Exception e) {...}
                    }
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );
                    } finally {
                        observer.shutdown();
                        setObserver(null);  
                       updateServerState();
                    }
                    break;
                case FOLLOWING:
                    try {
                       LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                       LOG.warn("Unexpected exception",e);
                    } finally {
                       follower.shutdown();
                       setFollower(null);
                       updateServerState();
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        updateServerState();
                    }
                    break;
                }
                start_fle = Time.currentElapsedTime();
            }
        } finally {
            LOG.warn("QuorumPeer main thread exited");
            ... // 卸载jmx,回收变量内存
        }
    }
  1. 观察switch case可见,在大循环里,每次都会根据当前状态进行相应操作。
  2. 观察OBSERVING、FOLLOWING、LEADING的try-catch-finally格式可见,在这三种状态中,
    • 在try里都会设置对应角色,调用角色对应方法,并在
    • 在finally里都会调用shutdown方法,进一步观察三者该方法的实现:
      • 三者都会取消角色下的线程、关闭所有连接、回收各种变量
      • 不用担心关闭连接会影响之后的运行,如果之后要用到,会重新建立的

quorumcxnmanager.haveDelivered似乎实现有问题。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 参考
  • 1. QuorumPeer.start
  • 2. QuorumPeer.run
    • 2.1 注册jmx
      • 2.2 main loop
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档