本文内容参考网络,侵删
本系列文章将整理到我在GitHub上的《Java面试指南》仓库
https://github.com/h2pl/Java-Tutorial
文章将同步到我的个人博客:
www.how2playlife.com
该系列博文会告诉你什么是分布式系统,这对后端工程师来说是很重要的一门学问,我们会逐步了解常见的分布式技术、以及一些较为常见的分布式系统概念,同时也需要进一步了解zookeeper、分布式事务、分布式锁、负载均衡等技术,以便让你更完整地了解分布式技术的具体实战方法,为真正应用分布式技术做好准备。
如果对本系列文章有什么建议,或者是有什么疑问的话,也可以关注公众号【Java技术江湖】联系作者,欢迎你参与本系列博文的创作和修订。
Zab与Paxos Zab的作者认为Zab与paxos并不相同,只所以没有采用Paxos是因为Paxos保证不了全序顺序:Because multiple leaders can propose a value for a given instance two problems arise. First, proposals can conflict. Paxos uses ballots to detect and resolve conflicting proposals. Second, it is not enough to know that a given instance number has been committed, processes must also be able to fi gure out which value has been committed. Paxos算法的确是不关心请求之间的逻辑顺序,而只考虑数据之间的全序,但很少有人直接使用paxos算法,都会经过一定的简化、优化。
Paxos算法优化 Paxos算法在出现竞争的情况下,其收敛速度很慢,甚至可能出现活锁的情况,例如当有三个及三个以上的proposer在发送prepare请求后,很难有一个proposer收到半数以上的回复而不断地执行第一阶段的协议。因此,为了避免竞争,加快收敛的速度,在算法中引入了一个Leader这个角色,在正常情况下同时应该最多只能有一个参与者扮演Leader角色,而其它的参与者则扮演Acceptor的角色。在这种优化算法中,只有Leader可以提出议案,从而避免了竞争使得算法能够快速地收敛而趋于一致;而为了保证Leader的健壮性,又引入了Leader选举,再考虑到同步的阶段,渐渐的你会发现对Paxos算法的简化和优化已经和上面介绍的ZAB协议很相似了。
总结 Google的粗粒度锁服务Chubby的设计开发者Burrows曾经说过:“所有一致性协议本质上要么是Paxos要么是其变体”。这句话还是有一定道理的,ZAB本质上就是Paxos的一种简化形式。
这篇主要分析leader的选主机制,zookeeper提供了三种方式:
默认的算法是FastLeaderElection,所以这篇主要分析它的选举机制。
比如有三台服务器,编号分别是1,2,3。
编号越大在选择算法中的权重越大。
服务器中存放的最大数据ID.
值越大说明数据越新,在选举算法中数据越新权重越大。
或者叫投票的次数,同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断。
在投票完成后,需要将投票信息发送给集群中的所有服务器,它包含如下内容。
因为每个服务器都是独立的,在启动时均从初始状态开始参与选举,下面是简易流程图。
下面详细解释一下这个流程:
首先给出几个名词定义:
(1)Serverid:在配置server时,给定的服务器的标示id。
(2)Zxid:服务器在运行时产生的数据id,zxid越大,表示数据越新。
(3)Epoch:选举的轮数,即逻辑时钟。随着选举的轮数++
(4)Server状态:LOOKING,FOLLOWING,OBSERVING,LEADING
步骤:
一、 Server刚启动(宕机恢复或者刚启动)准备加入集群,此时读取自身的zxid等信息。
二、 所有Server加入集群时都会推荐自己为leader,然后将(leader id 、 zixd 、 epoch)作为广播信息,广播到集群中所有的服务器(Server)。然后等待集群中的服务器返回信息。
三、 收到集群中其他服务器返回的信息,此时要分为两类:该服务器处于looking状态,或者其他状态。
(1) 服务器处于looking状态
首先判断逻辑时钟 Epoch:
a) 如果接收到Epoch大于自己目前的逻辑时钟(说明自己所保存的逻辑时钟落伍了)。更新本机逻辑时钟Epoch,同时 Clear其他服务发送来的选举数据(这些数据已经OUT了)。然后判断是否需要更新当前自己的选举情况(一开始选择的leader id 是自己)
判断规则rules judging:保存的zxid最大值和leader Serverid来进行判断的。先看数据zxid,数据zxid大者胜出;其次再判断leaderServerid, leader Serverid大者胜出;然后再将自身最新的选举结果(也就是上面提到的三种数据(leader Serverid,Zxid,Epoch)广播给其他server)
b) 如果接收到的Epoch小于目前的逻辑时钟。说明对方处于一个比较OUT的选举轮数,这时只需要将自己的 (leader Serverid,Zxid,Epoch)发送给他即可。
c) 如果接收到的Epoch等于目前的逻辑时钟。再根据a)中的判断规则,将自身的最新选举结果广播给其他 server。
同时Server还要处理2种情况:
a) 如果Server接收到了其他所有服务器的选举信息,那么则根据这些选举信息确定自己的状态(Following,Leading),结束Looking,退出选举。
b) 即使没有收到所有服务器的选举信息,也可以判断一下根据以上过程之后最新的选举leader是不是得到了超过半数以上服务器的支持,如果是则尝试接受最新数据,倘若没有最新的数据到来,说明大家都已经默认了这个结果,同样也设置角色退出选举过程。
(2) 服务器处于其他状态(Following, Leading)
a) 如果逻辑时钟Epoch相同,将该数据保存到recvset,如果所接收服务器宣称自己是leader,那么将判断是不是有半数以上的服务器选举它,如果是则设置选举状态退出选举过程
b) 否则这是一条与当前逻辑时钟不符合的消息,那么说明在另一个选举过程中已经有了选举结果,于是将该选举结果加入到outofelection集合中,再根据outofelection来判断是否可以结束选举,如果可以也是保存逻辑时钟,设置选举状态,退出选举过程。
以上就是FAST选举过程。
以上就是我自己配置的Zookeeper选主日志,从一开始LOOKING,然后new election, my id = 1, proposedzxid=0x0 也就是选自己为Leader,之后广播选举并重复之前Fast选主算法,最终确定Leader。
主要看这个类,只有LOOKING状态才会去执行选举算法。每个服务器在启动时都会选择自己做为领导,然后将投票信息发送出去,循环一直到选举出领导为止。
public void run() {
//.......
try {
while (running) {
switch (getPeerState()) {
case LOOKING:
if (Boolean.getBoolean("readonlymode.enabled")) {
//...
try {
//投票给自己...
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
//...
} finally {
//...
}
} else {
try {
//...
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
//...
}
}
break;
case OBSERVING:
//...
break;
case FOLLOWING:
//...
break;
case LEADING:
//...
break;
}
}
} finally {
//...
}
}
它是zookeeper默认提供的选举算法,核心方法如下:具体的可以与本文上面的流程图对照。
public Vote lookForLeader() throws InterruptedException {
//...
try {
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = finalizeWait;
synchronized(this){
//给自己投票
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//将投票信息发送给集群中的每个服务器
sendNotifications();
//循环,如果是竞选状态一直到选举出结果
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
//没有收到投票信息
if(n == null){
if(manager.haveDelivered()){
sendNotifications();
} else {
manager.connectAll();
}
//...
}
//收到投票信息
else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
switch (n.state) {
case LOOKING:
// 判断投票是否过时,如果过时就清除之前已经接收到的信息
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
//更新投票信息
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//发送投票信息
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
//忽略
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
//更新投票信息
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判断是否投票结束
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
//忽略
break;
case FOLLOWING:
case LEADING:
//如果是同一轮投票
if(n.electionEpoch == logicalclock.get()){
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
//判断是否投票结束
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
//记录投票已经完成
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
//忽略
break;
}
} else {
LOG.warn("Ignoring notification from non-cluster member " + n.sid);
}
}
return null;
} finally {
//...
}
}
默认是采用投票数大于半数则胜出的逻辑。
目前有5台服务器,每台服务器均没有数据,它们的编号分别是1,2,3,4,5,按编号依次启动,它们的选择举过程如下:
初始投票给自己 集群刚启动时,所有服务器的logicClock都为1,zxid都为0。
各服务器初始化后,都投票给自己,并将自己的一票存入自己的票箱,如下图所示。
在上图中,(1, 1, 0)第一位数代表投出该选票的服务器的logicClock,第二位数代表被推荐的服务器的myid,第三位代表被推荐的服务器的最大的zxid。由于该步骤中所有选票都投给自己,所以第二位的myid即是自己的myid,第三位的zxid即是自己的zxid。
此时各自的票箱中只有自己投给自己的一票。
更新选票 服务器收到外部投票后,进行选票PK,相应更新自己的选票并广播出去,并将合适的选票存入自己的票箱,如下图所示。
服务器1收到服务器2的选票(1, 2, 0)和服务器3的选票(1, 3, 0)后,由于所有的logicClock都相等,所有的zxid都相等,因此根据myid判断应该将自己的选票按照服务器3的选票更新为(1, 3, 0),并将自己的票箱全部清空,再将服务器3的选票与自己的选票存入自己的票箱,接着将自己更新后的选票广播出去。此时服务器1票箱内的选票为(1, 3),(3, 3)。
同理,服务器2收到服务器3的选票后也将自己的选票更新为(1, 3, 0)并存入票箱然后广播。此时服务器2票箱内的选票为(2, 3),(3, ,3)。
服务器3根据上述规则,无须更新选票,自身的票箱内选票仍为(3, 3)。
服务器1与服务器2更新后的选票广播出去后,由于三个服务器最新选票都相同,最后三者的票箱内都包含三张投给服务器3的选票。
根据选票确定角色 根据上述选票,三个服务器一致认为此时服务器3应该是Leader。因此服务器1和2都进入FOLLOWING状态,而服务器3进入LEADING状态。之后Leader发起并维护与Follower间的心跳。
Follower重启投票给自己 Follower重启,或者发生网络分区后找不到Leader,会进入LOOKING状态并发起新的一轮投票。
发现已有Leader后成为Follower 服务器3收到服务器1的投票后,将自己的状态LEADING以及选票返回给服务器1。服务器2收到服务器1的投票后,将自己的状态FOLLOWING及选票返回给服务器1。此时服务器1知道服务器3是Leader,并且通过服务器2与服务器3的选票可以确定服务器3确实得到了超过半数的选票。因此服务器1进入FOLLOWING状态。
Follower发起新投票 Leader(服务器3)宕机后,Follower(服务器1和2)发现Leader不工作了,因此进入LOOKING状态并发起新的一轮投票,并且都将票投给自己。
广播更新选票 服务器1和2根据外部投票确定是否要更新自身的选票。这里有两种情况
在上图中,服务器1的zxid为11,而服务器2的zxid为10,因此服务器2将自身选票更新为(3, 1, 11),如下图所示。
选出新Leader 经过上一步选票更新后,服务器1与服务器2均将选票投给服务器1,因此服务器2成为Follower,而服务器1成为新的Leader并维护与服务器2的心跳。
旧Leader恢复后发起选举 旧的Leader恢复后,进入LOOKING状态并发起新一轮领导选举,并将选票投给自己。此时服务器1会将自己的LEADING状态及选票(3, 1, 11)返回给服务器3,而服务器2将自己的FOLLOWING状态及选票(3, 1, 11)返回给服务器3。如下图所示。
旧Leader成为Follower 服务器3了解到Leader为服务器1,且根据选票了解到服务器1确实得到过半服务器的选票,因此自己进入FOLLOWING状态。
ZAB协议保证了在Leader选举的过程中,已经被Commit的数据不会丢失,未被Commit的数据对客户端不可见。
Failover前状态 为更好演示Leader Failover过程,本例中共使用5个Zookeeper服务器。A作为Leader,共收到P1、P2、P3三条消息,并且Commit了1和2,且总体顺序为P1、P2、C1、P3、C2。根据顺序性原则,其它Follower收到的消息的顺序肯定与之相同。其中B与A完全同步,C收到P1、P2、C1,D收到P1、P2,E收到P1,如下图所示。
这里要注意
选出新Leader 旧Leader也即A宕机后,其它服务器根据上述FastLeaderElection算法选出B作为新的Leader。C、D和E成为Follower且以B为Leader后,会主动将自己最大的zxid发送给B,B会将Follower的zxid与自身zxid间的所有被Commit过的消息同步给Follower,如下图所示。
在上图中
通知Follower可对外服务 同步完数据后,B会向D、C和E发送NEWLEADER命令并等待大多数服务器的ACK(下图中D和E已返回ACK,加上B自身,已经占集群的大多数),然后向所有服务器广播UPTODATE命令。收到该命令后的服务器即可对外提供服务。
在上例中,P3未被A Commit过,同时因为没有过半的服务器收到P3,因此B也未Commit P3(如果有过半服务器收到P3,即使A未Commit P3,B会主动Commit P3,即C3),所以它不会将P3广播出去。
具体做法是,B在成为Leader后,先判断自身未Commit的消息(本例中即P3)是否存在于大多数服务器中从而决定是否要将其Commit。然后B可得出自身所包含的被Commit过的消息中的最小zxid(记为min_zxid)与最大zxid(记为max_zxid)。C、D和E向B发送自身Commit过的最大消息zxid(记为max_zxid)以及未被Commit过的所有消息(记为zxid_set)。B根据这些信息作出如下操作
上述操作保证了未被Commit过的消息不会被Commit从而对外不可见。
上述例子中Follower上并不存在未被Commit的消息。但可考虑这种情况,如果将上述例子中的服务器数量从五增加到七,服务器F包含P1、P2、C1、P3,服务器G包含P1、P2。此时服务器F、A和B都包含P3,但是因为票数未过半,因此B作为Leader不会Commit P3,而会通过TRUNC命令通知F删除P3。如下图所示。