基于mit的6.824课程,github代码地址:https://github.com/zhuanxuhit/distributed-system
测试中3个server,然后启动,验证在同一个任期(term)内是否只有一个leader,并且在2 * RaftElectionTimeout
后,由于心跳的存在,不会发生重选。
在代码实现中,主要有以下几点:
AppendEnties
和RequestVote
两个rpc部分功能Make
新建Raft我们来看下其中主要的关键点:
程序整体组织上是在Make
中启动了一个goroutine,是一个无限循环,根据不同的状态进行不同的处理,结构如下:
图片
先讲第一个状态follower
的处理
所有的server重启后第一个状态都是follower
,如果在election timeout
时间内,既没有收到leader的heartbeat
,也没有收到RequestVote
请求,那么开启选举过程,此时状态将转换为candidate
,代码如下:
rf.resetElectionTimeout()
// 等待心跳,如果心跳未到,但是选举超时了,则开始新一轮选举
select {
case <-rf.heartbeatChan:
case <-time.After(rf.randomizedElectionTimeout):
// 开始重新选举
log.Println("election timeout:", rf.randomizedElectionTimeout)
if rf.status != STATUS_FOLLOWER {
// panic
log.Fatal("status not right when in follower and after randomizedElectionTimeout:", rf.randomizedElectionTimeout)
}
rf.convertToCandidate()
}
接着开始第二个状态candidate
的处理:
结构大致如下:
图片
如果此时赢得了选举,则进入第3个状态leader
的处理:目前leader只实现了一个功能,周期性的发送心跳,功能非常简单,此处不再贴代码了。
剩下就是两个rpc的发送和接收处理了,其中需要特别注意的点如下:
在完成第一个测试的过程中:AppendEnties只需要处理心跳请求即可。
最后给出代码的地址:https://github.com/zhuanxuhit/distributed-system,tag是:lab3-raft-case1
有3个server,选举出来一个leader后,模拟leader故障,重新选举出一个leader,然后再模拟older leader故障恢复重新加入,此时也只会有一个leader,再模拟3个2个都故障了,那理论上就不会有leader出现了,此时再逐个加入故障的server,都只会有一个leader
直接运行测试
go test -v -run ReElection
先看第1个,出现的调试信息:
2016/10/10 18:44:46 follower: 0 election timeout: 1.287113937s
2016/10/10 18:44:46 now I begin to candidate,index: 0
2016/10/10 18:44:47 follower: 2 election timeout: 1.54916732s
2016/10/10 18:44:47 now I begin to candidate,index: 2
可以看到0开始选举后,不知道为什么2没有投票,去看代码,发现问题是:
修改后即可通过测试,接着马上又出现另一个问题:
2016/10/10 18:54:50 candidate: 0 'slog is not at least as up-to-date as receiver’s log
但是我们现在做的是没有日志的,查看代码发现问题是:
rf.commitIndex == 0
表示还没有日志,则没必要检查修改完后,再次运行case,这次是两个server故障,不会有新的leader出问题了,选举不出来,接着查原因:
在处理投票的时候,往heartbeatChan
写的时候阻塞了,rf.heartbeatChan = make(chan bool, 1)
是有一个缓冲的channel,那为什么会阻塞呢,我们看下有几个地方会写,几个地方会去读
有两个地方会去写:
读的地方也有两个
heartbeatChan
,如果选举超时内没收到心跳,则开始candidateheartbeatChan
,表示已经有新的leader产生了于是就发现了问题:
修改代码后,通过case2
这个case开始要做提交了,实现Start()
函数了,这个case主要测试是:有5个server,没提交前检查没有提交的log,然后提交后,测试该log是否已经被每个server都存储了。
在实现start中,其做的步骤是:
// 客户端的一次日志请求操作触发
// 1)Leader将该请求记录到自己的日志之中;
// 2)Leader将请求的日志以并发的形式,发送AppendEntries RCPs给所有的服务器;
// 3)Leader等待获取多数服务器的成功回应之后(如果总共5台,那么只要收到另外两台回应),
// 将该请求的命令应用到状态机(也就是提交),更新自己的commitIndex 和 lastApplied值;
// 4)Leader在与Follower的下一个AppendEntries RPCs通讯中,
// 就会使用更新后的commitIndex,Follower使用该值更新自己的commitIndex;
// 5)Follower发现自己的 commitIndex > lastApplied
// 则将日志commitIndex的条目应用到自己的状态机(这里就是Follower提交条目的时机)
实现的关键点:在Start函数中,一旦判断出当前server是leader,马上开启一个goroutine,开始异步进行agree工作,然后立即返回,代码如下:
图片
此处第4步和第5步需要在另外的地方完成,一个是heartbeat中,另一个是follower在处理AppendEntries过程中
还有就是在成为leader的时候,需要初始化nextIndex,matchIndex
图片
而在发送heartbeat中,判断log的最大index ≥ nextIndex,如果大于,需要发送从nextIndex开始的log,在发送完后需要判断成功与否,成功则更新nextIndex,matchIndex
,失败则减少nextIndex
,并重试
图片
还有最重要的一点:为了通过测试,记住要在日志提交后,发送消息ApplyMsg
给applymsg
,这样才能通过测试
好了到此为止,写的代码刚好通过第三个测试,继续下一关的!
测试的内容是:有3个server,其中一个follower故障,发的命令只有2个能收到,当恢复故障后,发的命令都能收到
出现的问题:由于每个command真正提交都是通过goroutine来执行的,因此每个goroutine之间并发执行,怎么保证前一个agree了,下一个才能agree成功呢? 现在出现的问题是: map[3:103 5:104 1:101 2:102],乱序,即4还没有提交了,5就提交成功了
现在的问题是:谁也不服谁,当follower恢复后,大家都竞选,但是没有一个成功,查明原因后发现是因为没有处理一个概念: >如果候选人的日志至少和大多数的服务器节点一样新
这个一样新通过:比较两份日志中最后一条日志条目的索引值和任期号定义谁的日志比较新。如果两份日志最后的条目的任期号不同,那么任期号大的日志更加新。如果两份日志最后的条目任期号相同,那么日志比较长的那个就更加新。
进行到这,发现已经很难调试了,代码太乱,逻辑混乱,于是准备开始重构
现有代码的问题:
重构的代码最重要的一点是:抽象出了状态机,在里面去更新
测试内容是:5个server,3个follow故障,此时提交的命令将不会Committed,然后恢复3个follower,此时发送第3个命令,会忘记第2个没有确认的命令,此时第3个命令的index应该还是2
现在出现的问题是: follow的日志没更新,但是leader的nextIndex确更新了!
2016/10/13 10:44:20 leader is 4
2016/10/13 10:44:22 server:0,currentTerm:3,role:candidate
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:22 server:1,currentTerm:3,role:candidate
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:22 server:2,currentTerm:3,role:candidate
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:22 server:3,currentTerm:2,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1} {2 20 2}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:22 server:4,currentTerm:2,role:leader
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1} {2 20 2}]
nextIndex is:[2 2 2 3 3]
matchIndex is:[1 1 1 2 0]
2016/10/13 10:44:22 恢复3个server
2016/10/13 10:44:25 LeaderId: 4 has big term: 5 than follower: 3 currentTerm: 4
2016/10/13 10:44:25 server 3 len(rf.log) 3 args.PrevLogIndex 1
2016/10/13 10:44:26 重新选举后leader is 4
2016/10/13 10:44:26 server:0,currentTerm:5,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:26 server:1,currentTerm:5,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:26 server:2,currentTerm:5,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {2 10 1}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:26 server:3,currentTerm:5,role:follower
commitIndex:2,lastApplied:2
log is:[{0 <nil> 0} {2 10 1} {2 20 2}]
nextIndex is:[0 0 0 0 0]
matchIndex is:[0 0 0 0 0]
2016/10/13 10:44:26 server:4,currentTerm:5,role:leader
commitIndex:2,lastApplied:2
log is:[{0 <nil> 0} {2 10 1} {2 20 2}]
nextIndex is:[3 3 3 3 3]
matchIndex is:[2 2 2 2 0]
看重新选举后,leader4:matchIndex is:[2 2 2 2 0],但是其他的follower确没有收到新的日志,怎么回事呢?看代码什么情况下回去更新matchIndex呢?
问题在于发送心跳的时候返回了reply=true了,确没有去检查日志是否是最新的
此处记住appendEntries如果返回true,则一定表示是日志一样新了!
true if follower contained entry matching prevLogIndex and prevLogTerm
这个case测试的是: 同时发送5个命令,然后测试5个命令能够被顺序的提交 测试中的修改是:
图片
将红色框中的内容移动到了锁里面,为了防止并发访问的时候,index得到相同。
测试重新加入直接通过了,之前的代码就能实现 测试内容是:3个server,leader故障,然后向故障的leader发送命令,同时向新选举出来的leader发送命令,大致如下图,最后能统一
图片
类似case7:不同在于此处有5个server,然后命令更多,测试也是网络分区后出现多leader,然后恢复网络后,再重新同步数据 不用修改,直接通过
case9主要是性能测试,测试rpc的次数不能太多
持久化的逻辑一直没有加上,此处加上的
先看需要持久化哪些数据,然后持久化的时机是什么时候?
需要持久化哪些日志?
e.Encode(rf.currentTerm) // 当前任期
e.Encode(rf.log) // 收到的日志
e.Encode(rf.votedFor) // 投票的
e.Encode(rf.commitIndex) // 已经确认的一致性日志,之后的日志表示还没有确认是否可以同步,一旦确认的日志都不会改变了
既然这几个需要同步,那就是发生改变的时候把数据持久化下来就可以了
需要调用persist()
函数的地方有:
测试主要测试的是下面的这张图:
图片
描述的问题是:为什么领导人无法通过老的日志的任期号来判断其提交状态。
Raft采用计算副本数的方式,使得永远不会提交前前 面纪元的日志条目,
现在出现的问题是commit了不同的值? 即在没有达成一致的情况下就就行了提交!
Test: Figure 8 ...
2016/10/13 20:38:35 server:0,currentTerm:2,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {1 1752890841475247006 1}]
nextIndex is:[1 1 1 1 1]
matchIndex is:[0 0 0 0 0]
2016/10/13 20:38:35 server:2,currentTerm:2,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {1 1752890841475247006 1}]
nextIndex is:[1 1 1 1 1]
matchIndex is:[0 0 0 0 0]
2016/10/13 20:38:35 server:4,currentTerm:2,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {1 1752890841475247006 1}]
nextIndex is:[1 1 1 1 1]
matchIndex is:[0 0 0 0 0]
2016/10/13 20:38:35 apply error: commit index=2 server=1 4541014630978635374 != server=3 8558661384468427932
到这就得加上之前忘记的一个策略
如果存在以个N满足 N>commitIndex,多数的matchIndex[i] >= N,并且 log[N].term == currentTerm:设置commitIndex = N
主要是指:leader只会提交本纪元的日志
模拟网络不可靠,在不可靠的情况下cfg.setunreliable(false)
,则有概率还是丢弃请求,在这种情况下测试协议最后还能达成一致
通过设置cfg.setlongreordering(true)
,在labrpc中会直接睡眠一段时间,模拟这次情况下协议还是达成一致
ms := 200 + rand.Intn(1 + rand.Intn(2000))
time.Sleep(time.Duration(ms) * time.Millisecond)
2016/10/14 14:51:11 server:4,currentTerm:31,role:follower
commitIndex:3,lastApplied:3
2016/10/14 14:51:11 server:3,currentTerm:31,role:follower
commitIndex:3,lastApplied:3
2016/10/14 14:51:11 server:2,currentTerm:31,role:follower
commitIndex:3,lastApplied:3
2016/10/14 14:51:11 server:1,currentTerm:31,role:follower
commitIndex:3,lastApplied:3
2016/10/14 14:51:11 server:0,currentTerm:31,role:leader
commitIndex:3,lastApplied:3
nextIndex is:[186 53 58 51 62]
matchIndex is:[185 0 0 0 0]
2016/10/14 16:09:45 check log type: raft.AppendEntiesArgs value: {6 1 1 1 1 [{1 4411 2} {2 9540 3} {4 3863 4} {6 2769 5}]}
2016/10/14 16:09:45 error log indexserver:0,currentTerm:6,role:follower
commitIndex:1,lastApplied:1
log is:[{0 <nil> 0} {1 606 1} {1 4411 2} {4 3863 4} {6 2769 5}]
nextIndex is:[84 0 0 3 2]
matchIndex is:[83 1 1 2 1]
错误日志,由于没有很好的传递日志,代码bug
测试通过
下一篇的计划是结合代码再次看下关键实现