zk源码阅读31:集群server中QuorumPeer源码解析
我们需要观察线程启动的start方法,和线程运行的run方法。
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
设置currentVote,当前投票投自己
- this.electionAlg = createElectionAlgorithm(electionType);
createElectionAlgorithm启动一个Election线程(一般是FastLeaderElection线程),所以会:
1. 创建QuorumCnxManager,赋给AtomicReference<QuorumCnxManager> qcmRef
2. 启动QuorumCnxManager.Listener线程,该线程用来监听竞选端口的新建连接Socket,并根据Socket加载其它用于交流的数据结构(流、BlockingQueue、搬运线程等)。
3. FastLeaderElection fle = new FastLeaderElection(this, qcm)
创建启动FastLeaderElection线程
- 启动Messenger线程
通常至此,FastLeaderElection线程和它需要的线程都已启动public void run() {
updateThreadName();
LOG.debug("Starting quorum peer");
try {
jmxQuorumBean = new QuorumBean(this);
MBeanRegistry.getInstance().register(jmxQuorumBean, null);
for(QuorumServer s: getView().values()){
ZKMBeanInfo p;
if (getId() == s.id) {
p = jmxLocalPeerBean = new LocalPeerBean(this);
try {
MBeanRegistry.getInstance().register(p, jmxQuorumBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxLocalPeerBean = null;
}
} else {
RemotePeerBean rBean = new RemotePeerBean(this, s);
try {
MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
jmxRemotePeerBean.put(s.id, rBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
}
}
}
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxQuorumBean = null;
}
...
其中大量用到MBeanRegistry.getInstance().register
,我们看下MBeanRegistry的源码:
/**
* This class provides a unified interface for registering/unregistering of
* zookeeper MBeans with the platform MBean server. It builds a hierarchy of MBeans
* where each MBean represented by a filesystem-like path. Eventually, this hierarchy
* will be stored in the zookeeper data tree instance as a virtual data tree.
*/
public class MBeanRegistry {
...
根据注释,该类是用于提供jmx服务的,并且会将zookeeper目录树根据路径存储在其中。我们不需要使用jmx,此处不必理解。
同理,jmx相关的MBeanServer和ZKMBeanInfo也不必理解。
另外,源码getView()
用到了QuorumVerifier类变量quorumVerifier。QuorumVerifier包含了所有成员的引用,关键方法containsQuorum用来判断是否集群操作成功。其实现类一般用QuorumMaj,根据是否ACK数过半来判断操作是否成功。
try {
/*
* Main loop
*/
while (running) {
switch (getPeerState()) {
case LOOKING:
LOG.info("LOOKING");
if (Boolean.getBoolean("readonlymode.enabled")) {...} else {
try {...} catch (Exception e) {...}
}
case OBSERVING:
try {
LOG.info("OBSERVING");
setObserver(makeObserver(logFactory));
observer.observeLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
setObserver(null);
updateServerState();
}
break;
case FOLLOWING:
try {
LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
follower.shutdown();
setFollower(null);
updateServerState();
}
break;
case LEADING:
LOG.info("LEADING");
try {
setLeader(makeLeader(logFactory));
leader.lead();
setLeader(null);
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
} finally {
if (leader != null) {
leader.shutdown("Forcing shutdown");
setLeader(null);
}
updateServerState();
}
break;
}
start_fle = Time.currentElapsedTime();
}
} finally {
LOG.warn("QuorumPeer main thread exited");
... // 卸载jmx,回收变量内存
}
}
quorumcxnmanager.haveDelivered似乎实现有问题。