As mentioned, the implementation up to version 3.3.3 has not included epoch variables acceptedEpoch and currentEpoch. This omission has generated problems [5] (issue ZOOKEEPER-335 in Apache’s issue tracking system) in a production version and was noticed by many ZooKeeper clients. The origin of this problem is at the beginning of Recovery Phase (Algorithm 4 line 2), when the leader increments its epoch (contained in lastZxid) even before acquiring a quorum of successfully connected followers (such leader is called false leader ). Since a follower goes back to FLE if its epoch is larger than the leader’s epoch (line 25), when a false leader drops leadership and becomes a follower of a leader from a previous epoch, it finds a smaller epoch (line 25) and goes back to FLE. This behavior can loop, switching from Recovery Phase to FLE. Consequently, using lastZxid to store the epoch number, there is no distinction between a tried epoch and a joined epoch in the implementation. Those are the respective purposes for acceptedEpoch and currentEpoch, hence the omission of them render such problems. These variables have been properly inserted in recent (unstable) ZooKeeper versions to fix the problems mentioned above.
意思是,以前是不区分acceptedEpoch 和 currentEpoch的,以前epoch是直接从zxid中前32位里提取的。但这会导致一个问题:假设有三个服务器s1, s2, s3. 集群s1和s2取得联系,且s1为leader,s3为LOOKING:
之后s3就不断在4和5之间徘徊,不断在FLE阶段和RECOVER阶段循环。 别的都讲得通,但还有个关键疑惑,不是说"leader在不能与半数以上follower取得联系时,会重回选举FLE"吗?那旧集群的follower s2重启时,为何s1会仍然会认为自己是LEADER?
我特意做实验试了一下,在4个server的集群中,启动3个server,其中s3为leader,s1和s2为follower,并在选举算法开始处增添输出字样。然后快速关闭、重启s2,发现s3并没有进入选举模式,而是再次接纳了s2。
我认为是基于一种心跳包的机制,在一段时间(self.tickTime / 2
)
我们看下Leader::lead的源码,代码里加了一些我的中文注释理解:
while (true) {
synchronized (this) {
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.tickTime / 2;
while (cur < end) {
// 我认为end - cur是心跳包的周期,每过这段时间要检查一遍集群的连接情况
// 以此决定是否维持leadership
// 另外,debug时看到self.tickTime值为200000,也就是200秒
wait(end - cur);
cur = Time.currentElapsedTime();
}
if (!tickSkip) {
self.tick.incrementAndGet();
}
// We use an instance of SyncedLearnerTracker to
// track synced learners to make sure we still have a
// quorum of current (and potentially next pending) view.
// 我们使用SyncedLearnerTracker来跟踪追随者们,以保证自己维持了一组集群
SyncedLearnerTracker syncedAckSet = new SyncedLearnerTracker();
syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
syncedAckSet.addQuorumVerifier(self
.getLastSeenQuorumVerifier());
}
syncedAckSet.addAck(self.getId());
// 查看每个追随者是否还维持着连接
for (LearnerHandler f : getLearners()) {
if (f.synced()) {
syncedAckSet.addAck(f.getSid());
}
}
// check leader running status
if (!this.isRunning()) {
// set shutdown flag
shutdownMessage = "Unexpected internal error";
break;
}
if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
// Lost quorum of last committed and/or last proposed
// config, set shutdown flag
// 如果没有过半数的连接,则不再维持leadership
shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
+ syncedAckSet.ackSetsToString() + " ]";
break;
}
tickSkip = !tickSkip;
}
// 根据上面的逻辑,每个tickTime会调用两次下方代码
// 对每个追随者发送一个ping
for (LearnerHandler f : getLearners()) {
f.ping();
}
}
if (shutdownMessage != null) {
shutdown(shutdownMessage);
// leader goes in looking state
}
Leader.leader
的while (true)
循环中,对每个LearnerHandler线程查看是否同步
for (LearnerHandler f : getLearners()) {
if (f.synced()) {
syncedAckSet.addAck(f.getSid());
}
}
...
if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
// Lost quorum of last committed and/or last proposed
// config, set shutdown flag
shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
+ syncedAckSet.ackSetsToString() + " ]";
break;
}
Leader::getLearners返回的是learners
变量的拷贝
public List<LearnerHandler> getLearners() {
synchronized (learners) {
return new ArrayList<LearnerHandler>(learners);
}
}
LearnerHandler::synced主要判断线程是否存活
public boolean synced() {
return isAlive()
&& leader.self.tick.get() <= tickOfNextAckDeadline;
}
看到这里我们可以认为,SyncedLearnerTracker syncedAckSet
是否判断集群成立,主要取决于Leader.learners
每个LearnerHandler
线程的存活情况。
因此,这个集群的健康状态,取决于LearnerHandler
线程何时会退出,以及Leader.learners
变量何时会增减元素。
LearnerCnxAcceptor::run中, LearnerCnxAcceptor线程
不断收听新连接socket,并作为参数启动LearnerHandler
。
@Override
public void run() {
try {
leader.addLearnerHandler(this);
...
void addLearnerHandler(LearnerHandler learner) {
synchronized (learners) {
learners.add(learner);
}
}
private final HashSet<LearnerHandler> learners =
new HashSet<LearnerHandler>();
LearnerHandler在启动时就把自己加入Leader.learners
了。
(查看源码发现,LearnerHandler并没有重写hashCode、equals,个人觉得这不够严谨,当某个追随者重启时,会导致代表该追随者的LearnerHandler有两个,尽管其中一个应该会shutdown)
LearnerHandler::shutdown
public void shutdown() {
...
this.interrupt();
leader.removeLearnerHandler(this);
}
该方法有两处调用 LearnerHandler::run
@Override
public void run() {
try {
leader.addLearnerHandler(this);
...
}
} catch (IOException e) {
...
} finally {
LOG.warn("******* GOODBYE "
+ (sock != null ? sock.getRemoteSocketAddress() : "<null>")
+ " ********");
shutdown();
}
}
LearnerHandler::ping
public void ping() {
// If learner hasn't sync properly yet, don't send ping packet
// otherwise, the learner will crash
if (!sendingThreadStarted) {
return;
}
long id;
if (syncLimitCheck.check(System.nanoTime())) {
synchronized(leader) {
id = leader.lastProposed;
}
QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
queuePacket(ping);
} else {
LOG.warn("Closing connection to peer due to transaction timeout.");
shutdown();
}
}
这说明两点:
我在自己实验follwer断线重连时,发现第一处的shutdown被调用,也就是说,通常来说只要连接断开,对应的LearnerHandler就会断开了连接了。
从上文总结我们看到
Leader.learners
,在socket关闭时会移出Leader.learners
。我们可以认为一个存活的LearnerHandler代表了一个追随者的连接。self.tickTime
时间会检查LearnerHandler数是否过半(if (!tickSkip && !syncedAckSet.hasAllQuorums())
),若不过半就退出leadershipself.tickTime / 2
时间会对所有追随者ping一次,这之中可能会导致LearnerHandler的销亡self.tickTime
为200秒,该值应该是在QuorumPeerMain::runFromConfig的quorumPeer.setTickTime(config.getTickTime());
中设置的总结而言,Leader维持领导确实采取了心跳包的策略,而且只要在200秒到期检查的时候,能凑齐过半数(加上自己以后)的存活追随者即可。