raft 系列解读(3) 之 代码实现最小规则followercandidateleader规则RequestVote RPCAppendEntries RPC

首先,其实raft如果你不去看理论正确性的证明,光实现的话,只要按照raft里面给出的原则写代码就ok!如果代码写出来不正确,只能是你自己实现的问题。囧

最小规则

package raft

// 所有server的原则 Rules for Servers
// 1. 如果commitIndex > lastApplied:则递增lastApplied,应用 log[lastApplied] 到状态机之中
// 2. 如果Rpc请求或回复包括纪元T > currentTerm: 设置currentTerm = T,转换成 follower, 并且设置 votedFor=-1,表示未投票

// rules for Followers
// 回复 candidates与leaders的RPC请求
// 如果选举超时时间达到,并且没有收到来自当前leader或者要求投票的候选者的 AppendEnties RPC调 :转换角色为candidate

// rules for Candidates
// 转换成candidate时,开始一个选举:
// 1. 递增currentTerm;投票给自己;
// 2. 重置election timer;
// 3. 向所有的服务器发送 RequestVote RPC请求
// 如果获取服务器中多数投票:转换成Leader
// 如果收到从新Leader发送的AppendEnties RPC请求:转换成follower
// 如果选举超时时间达到:开始一次新的选举

// rules for Leaders
// 给每个服务器发送初始空的AppendEntires RPCs(heartbeat);指定空闲时间之 后重复该操作以防 election timeouts
// 如果收到来自客户端的命令:将条目插入到本地日志,在条目应用到状态机后回复给客户端
// 如果last log index >= nextIndex for a follower:发送包含开始于nextIndex的日志条目的AppendEnties RPC
// 如果成功:为follower更新nextIndex与matchIndex
// 如果失败是由于日志不一致:递减nextIndex然后重试
// 如果存在以个N满足 N>commitIndex,多数的matchIndex[i] >= N,并且 log[N].term == currentTerm:设置commitIndex = N

// AppendEntries RPC的实现:在回复给RPCs之前需要更新到持久化存储之上
// 有3类用途
// 1. candidate赢得选举的后,宣誓主权
// 2. 保持心跳
// 3. 让follower的日志和自己保持一致
// 接收者的处理逻辑:
// 1. 如果term < currentTerm 则返回false
// 2. 如果日志不包含一个在preLogIndex位置纪元为prevLogTerm的条目,则返回 false
//      该规则是需要保证follower已经包含了leader在PrevLogIndex之前所有的日志了
// 3. 如果一个已存在的条目与新条目冲突(同样的索引但是不同的纪元),则删除现存的该条目与其后的所有条
// 4. 将不在log中的新条目添加到日志之中
// 5. 如果leaderCommit > commitIndex,那么设置 commitIndex = min(leaderCommit,index of last new entry)

// RequestVote RPC 的实现: 由候选者发起用于收集选票
// 1. 如果term < currentTerm 则返回false
// 2. 如果本地的voteFor为空或者为candidateId,
//      并且候选者的日志至少与接受者的日志一样新,则投给其选票
// 怎么定义日志新
// 比较两份日志中最后一条日志条目的索引值和任期号定义谁的日志比较新
// 如果两份日志最后的条目的任期号不同,那么任期号大的日志更加新
// 如果两份日志最后的条目任期号相同,那么日志比较长的那个就更加新。

// 以上所有的规则保证的下面的几个点:
// 1. Election Safety 在一个特定的纪元中最多只有一个Leader会被选举出来
// 2. Leader Append-Only Leader不会在他的日志中覆盖或删除条 ,他只执行添加新的条
// 3. Log Matching:如果两个日志包含了同样index和term的条 ,那么在该index之前的所有条目都是相同的
// 4. Leader Completeness:如果在一个特定的term上提交了一个日志条目,那么该条目将显示在编号较大的纪元的Leader的日志里
// 5. State Machine Safety:如果一个服务器在一个给定的index下应用一个日志条目到他的状态机上,没有其他服务器会在相同index上应用不同的日志条目

以上就是全部,我们接下去挨个来看看这些规则在代码中是怎么体现的

follower

规则有两条

  1. 回复 candidates与leaders的RPC请求
  2. 如果选举超时时间达到,并且没有收到来自当前leader或者要求投票的候选者的 AppendEnties RPC调 :转换角色为candidate

对于第一条follower处理来自candidate的RequestVote RPC和来自leader的AppendEntries RPC两种请求 对于第二条则是说follower在规定的选举时间内没有收到来自leader的心跳,则认为leader已经不存在了,自己开始竞选leader,因此此处选举的超时时间要大于心跳时间 看下代码实现

follower逻辑

candidate

候选者的规则有

  • 转换成candidate时,开始一个选举
    • 递增currentTerm;投票给自己;
    • 重置election timer;
    • 向所有的服务器发送 RequestVote RPC请求
  • 如果获取服务器中多数投票:转换成Leader
  • 如果收到从新Leader发送的AppendEnties RPC请求:转换成follower
  • 如果选举超时时间达到:开始一次新的选举

根据这些规则,代码即:

candidate规则

其中广播后的结果是通过voteResultChan来传递的,而心跳则是通过heartbeatChan来传递,在这基础上,需要有个注意的地方,candidate在具体开始这些任务的时候,需要去读voteResultChanheartbeatChan,因为可能里面已经有通知了,对于voteResultChan的需要忽略它,而对于heartbeatChan则说明已经有leader产生了

candidate前置

leader规则

  1. 给每个服务器发送初始空的AppendEntires RPCs(heartbeat);指定空闲时间之后重复该操作以防 election timeouts
  2. 如果收到来自客户端的命令:将条目插入到本地日志,在条目应用到状态机后回复给客户端
  3. 如果last log index >= nextIndex for a follower:发送包含开始于nextIndex的日志条目的AppendEnties RPC
  4. 如果成功:为follower更新nextIndex与matchIndex
  5. 如果失败是由于日志不一致:递减nextIndex然后重试
  6. 如果存在以个N满足 N>commitIndex,多数的matchIndex[i] >= N,并且 log[N].term == currentTerm:设置commitIndex = N

其中最重要的一点是6,这个第六点主要是解决如下问题:

提交日志覆盖

  • (a) S1 是领导者,部分的复制了索引位置 2 的日志条目
  • (b) S1 崩溃了,然后 S5 在任期 3 里通过 S3、S4 和自己的选票赢得选举,然后从客户端接收了一条不一样的日志条目放在了索引2 处
  • (c) S5 又崩溃了;S1 重新启动,选举成功,开始复制日志。在这时,来自任期 2 的那条日志已经被复制到了集群中的大多数机器上,但是还没有被提交
  • (d) S1 又崩溃了,S5 可以重新被选举成功(通过来自 S2,S3 和 S4 的选票),然后覆盖了他们在索引 2 处的日志。但是,在崩溃之前,如果 S1 在自己的任期里复制了日志条目到大多数机器上
  • (e) 然后这个条目就会被提交(S5 就不可能选举成功)。 在这个时候,之前的所有日志就会被正常提交处理

该问题是因为:当一个新Leader当选时,由于所有成员的日志进度不同,很可能需要继续复制前面纪元的日志条目,因为即使为前面纪元的日志复制到多数服务器并且提交,如步骤C,但是在D中还是可能被覆盖,这就产生了不一致。解决的方法就是通过规则6:如果存在以个N满足 N>commitIndex,多数的matchIndex[i] >= N,并且 log[N].term == currentTerm:设置commitIndex = N,具体就是在c阶段,S1成为Leader,此时的纪元是4。S1一样向其他服务器发送日志2,当发送到多数服务器S1,S2,S3时,此时并不提交该日志,而是继续复制日志4,直到日志4到达多数服务器后,提交日志4,即leader只会提交当前纪元的日志。如果提交了4之后宕机,S5就不会被选举为新的 Leader,如果在提交4之前宕机,那么日志2,日志4还是可能被覆盖,但是由于没有提交,也就没有执行日志中的命令,即使被覆盖也无关系。

代码上就是:

leader规则

不间断的发送心跳

状态机的应用

RequestVote RPC

投票rpc的规则

// 1. 如果term < currentTerm 则返回false
// 2. 如果本地的voteFor为空或者为candidateId,
//      并且候选者的日志至少与接受者的日志一样新,则投给其选票
// 怎么定义日志新
// 比较两份日志中最后一条日志条目的索引值和任期号定义谁的日志比较新
// 如果两份日志最后的条目的任期号不同,那么任期号大的日志更加新
// 如果两份日志最后的条目任期号相同,那么日志比较长的那个就更加新。

需要注意的是日志新的处理,实现

图片

其中红色是通用规则

如果Rpc请求或回复包括纪元T > currentTerm: 设置currentTerm = T,转换成 follower, 并且设置 votedFor=-1,表示未投票

这个通用规则的保证了有新纪元开始的时候,所有server都转变为follower,开始新一轮选举

AppendEntries RPC

有3类用途

  1. candidate赢得选举的后,宣誓主权
  2. 保持心跳
  3. 让follower的日志和自己保持一致

接收者的处理逻辑:

  1. 如果term < currentTerm 则返回false
  2. 如果日志不包含一个在preLogIndex位置纪元为prevLogTerm的条目,则返回 false,该规则是需要保证follower已经包含了leader在PrevLogIndex之前所有的日志了
  3. 如果一个已存在的条目与新条目冲突(同样的索引但是不同的纪元),则删除现存的该条目与其后的所有条
  4. 将不在log中的新条目添加到日志之中
  5. 如果leaderCommit > commitIndex,那么设置 commitIndex =min(leaderCommit,index of last new entry)

在实现上也是完全按照上面的规则

func (rf *Raft) AppendEnties(args AppendEntiesArgs, reply *AppendEntiesReply) {
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        reply.Success = false
        return
    }
    // 本身自己是leader,有可能收到别人的请求吗,实验中是可能的,会收到的一个大的term
    //if rf.status == STATUS_LEADER {
    //  log.Println("I am leader, but get AppendEnties",rf.Detail())
    //  log.Println("args.term",args.Term,"currentTerm",rf.currentTerm)
    //}
    // 心跳一定来自leader
    rf.heartbeatChan <- true

    rf.mu.Lock()
    defer rf.mu.Unlock()
    rf.rpcRuleForAllServer(args.Term)
    // 如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false
    // len(rf.log) >= args.PrevLogIndex + 1,说明本地日志长度 >= leader日志长度
    if len(rf.log) >= args.PrevLogIndex + 1 && rf.log[args.PrevLogIndex].Term == args.PrevLogTerm {
        // 该规则是需要保证follower已经包含了leader在PrevLogIndex之前所有的日志了
        for i:=0;i<len(args.Entries);i++ {
            if args.PrevLogIndex+1+i < len(rf.log){
                if rf.log[args.PrevLogIndex+1+i] != args.Entries[i]{
                    // index相同,但是纪元不同
                    rf.log = rf.log[:args.PrevLogIndex+1+i]// 之前的还是相同的,再加上本条之后的
                    rf.log = append(rf.log,args.Entries[i:]...)
                    break
                }
            }else {
                // 本条目不存在
                rf.log = append(rf.log,args.Entries[i:]...)
                break
            }
        }
        if args.LeaderCommit > rf.commitIndex {
            rf.commitIndex = min(args.LeaderCommit,len(rf.log)-1)
        }
        // !!为了查明为什么日志错了
        rf.checkLog(args)
        //在回复给RPCs之前需要更新到持久化存储之上
        rf.persist()
        reply.Term = rf.currentTerm
        reply.Success = true
        reply.NextIndex = len(rf.log)
        return
    }else {
        reply.Term = rf.currentTerm
        reply.Success = false
        reply.NextIndex = min(rf.commitIndex+1,args.PrevLogIndex-1)//
        return
    }
}

以上就是所有规则和代码对应的实现了,github地址

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏jojo的技术小屋

原 web安全、XSS、CSRF、注入攻击

作者:汪娇娇 时间:2017年8月15日 当时也是看了一本书《白帽子讲web安全》,简单的摘录然后做了个技术分享,文章不是很详细,建议大家结合着这本书看哈。 w...

4687
来自专栏向治洪

百度地图android studio导入开发插件

百度地图SDK v3.5.0开发包下载地址:http://lbsyun.baidu.com/sdk/download?selected=location 开...

1.1K8
来自专栏技术记录

netty同端口监听tcp和websocket协议

在netty编程中,对于不同的消息肯定需要不同的编解码来处理,所以我们需要利用netty具有动态增删处理器handle的功能。 

1883
来自专栏熊二哥

快速入门系列--WCF--01基础概念

转眼微软的WCF已走过十个年头,它是微软通信框架的集大成者,将之前微软所有的通信框架进行了整合,提供了统一的应用方式。记得从自己最开始做MFC时,就使用过Nam...

21710
来自专栏君赏技术博客

【已解决】怎么快速检索 Localizable.strings 文件里面格式化错误的地方

我们 APP 支持几十种语言切换 这就需要一个工具提供分析 CSV 文件自动生成 Localizable.strings 的文件 导致里面有的双引号什么或者翻译...

1613
来自专栏lulianqi

超高性能管线式HTTP请求(实践·原理·实现)

这里的高性能指的就是网卡有多快请求发送就能有多快,基本上一般的服务器在一台客户端的压力下就会出现明显延时。

1351
来自专栏码匠的流水账

聊聊kafka的group coordinator

本文主要来讲一个kafka的group coordinator。在kafka0.9.0版本的时候,开始启用了新的consumer config,这个新的cons...

2231
来自专栏安恒网络空间安全讲武堂

护网杯easy laravel ——Web菜鸡的详细复盘学习

复现让我发现了很多读wp以为懂了动手做的时候却想不通的漏掉的知识点(还是太菜orz),也让我对这道题解题逻辑更加理解。所以不要怂,就是干23333!

1753
来自专栏Seebug漏洞平台

CVE-2017-16943 Exim UAF漏洞分析——后续

上一篇分析出来后,经过@orange的提点,得知了meh公布的PoC是需要特殊配置才能触发,所以我上一篇分析文章最后的结论应该改成,在默认配置情况下,meh提供...

4158
来自专栏后台全栈之路

DNS 报文结构和个人 DNS 解析代码实现——解决 getaddrinfo() 阻塞问题

实际应用中发现一个问题,在某些国家/ 地区的某些 ISP 提供的网络中,程序在请求 DNS 以连接一些服务器的时候,有时候会因为 ISP 的 DNS 递归查询太...

6026

扫码关注云+社区