前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >MIT 6.824 Lab2 - Raft 实现

MIT 6.824 Lab2 - Raft 实现

作者头像
s09g
发布2022-07-06 15:32:45
1K0
发布2022-07-06 15:32:45
举报
文章被收录于专栏:s09g的技术博客

本文将介绍6.824 Lab2(测试用例2021/2020版 2A + 2B + 2C部分)的具体实现,视频版的讲解将发在B站:s09g谷歌摸鱼 。代码通过5000次测试,大致上应该没有问题。2021版的测试还有一个2D的部分,并没有包含在本文中。2D部分是关于Raft Snapshot,过早的实现2D可能会掩盖一些隐藏的bug。比如2C的一些test其实会产生超长的歧义链,这个时候就需要实现fast rollback优化,但是如果过早实现了snapshot就可以通过发送snapshot的方式直接修正歧义链。

Raft的结构

代码语言:javascript
复制
type Raft struct {
    mu        sync.Mutex
    peers     []*labrpc.ClientEnd
    persister *Persister
    me        int
    dead      int32

    state         RaftState
    appendEntryCh chan *Entry
    heartBeat     time.Duration
    electionTime  time.Time

    currentTerm int
    votedFor    int
    log         Log

    commitIndex int
    lastApplied int

    nextIndex  []int
    matchIndex []int

    applyCh   chan ApplyMsg
    applyCond *sync.Cond
}

Raft的结构有一部分已经给出,剩下的大部分可以根据Figure 2补全。

代码语言:javascript
复制
func Make(peers []*labrpc.ClientEnd, me int,
    persister *Persister, applyCh chan ApplyMsg) *Raft {
    rf := &Raft{}
    rf.peers = peers
    rf.persister = persister
    rf.me = me

    rf.state = Follower
    rf.currentTerm = 0
    rf.votedFor = -1
    rf.heartBeat = 100 * time.Millisecond
    rf.resetElectionTimer()

    rf.log = makeEmptyLog()
    rf.log.append(Entry{-1, 0, 0})
    rf.commitIndex = 0
    rf.lastApplied = 0
    rf.nextIndex = make([]int, len(rf.peers))
    rf.matchIndex = make([]int, len(rf.peers))

    rf.applyCh = applyCh
    rf.applyCond = sync.NewCond(&rf.mu)

    rf.readPersist(persister.ReadRaftState())

    go rf.ticker()

    go rf.applier()
    return rf
}

初始化Raft的时候,除了给raft做基本的赋值之外,还要额外启动两个goroutine。作业要求中提到不要使用Go内置的timer,在2021版的代码里新增了一个ticker函数,作用也很简单,计时并且按时间触发leader election或者append entry。而applier则是负责将command应用到state machine,这一点和论文中一致

ticker

代码语言:javascript
复制
func (rf *Raft) ticker() {
    for rf.killed() == false {
        time.Sleep(rf.heartBeat)
        rf.mu.Lock()
        if rf.state == Leader {
            rf.appendEntries(true)
        }
        if time.Now().After(rf.electionTime) {
            rf.leaderElection()
        }
        rf.mu.Unlock()
    }
}

ticker会以心跳为周期不断检查状态。如果当前是Leader就会发送心跳包,而心跳包是靠appendEntries()发送空log,而不是额外的函数,这一点在论文和student guide都有强调。

如果发现选举超时,这时候就会出发新一轮leader election。

leader election

代码语言:javascript
复制
func (rf *Raft) leaderElection() {
    rf.currentTerm++
    rf.state = Candidate
    rf.votedFor = rf.me
    rf.persist()
    rf.resetElectionTimer()
    term := rf.currentTerm
    voteCounter := 1
    lastLog := rf.log.lastLog()
    args := RequestVoteArgs{
        Term:         term,
        CandidateId:  rf.me,
        LastLogIndex: lastLog.Index,
        LastLogTerm:  lastLog.Term,
    }

    var becomeLeader sync.Once
    for serverId, _ := range rf.peers {
        if serverId != rf.me {
            go rf.candidateRequestVote(serverId, &args, &voteCounter, &becomeLeader)
        }
    }
}

启动新一轮leader election时,首先要将自己转为candidate状态,并且给自己投一票。然后向所有peer请求投票。RequestVote RPC的参数和返回值需要按照Figure 2实现。

代码语言:javascript
复制
func (rf *Raft) candidateRequestVote(serverId int, args *RequestVoteArgs,    voteCounter *int, becomeLeader *sync.Once) {
    reply := RequestVoteReply{}
    ok := rf.sendRequestVote(serverId, args, &reply)
    if !ok {
        return
    }
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if reply.Term > args.Term {
        rf.setNewTerm(reply.Term)
        return
    }
    if reply.Term < args.Term {
        return
    }
    if !reply.VoteGranted {
        return
    }

    *voteCounter++
    if *voteCounter > len(rf.peers)/2 &&
        rf.currentTerm == args.Term &&
        rf.state == Candidate {
        becomeLeader.Do(func() {
            rf.state = Leader
            lastLogIndex := rf.log.lastLog().Index
            for i, _ := range rf.peers {
                rf.nextIndex[i] = lastLogIndex + 1
                rf.matchIndex[i] = 0
            }
            rf.appendEntries(true)
        })
    }
}

除了要满足论文的Figure 2中Rules for Servers的要求之外,要注意当candidate收到半数以上投票之后就可以进入leader状态,而这个状态转变会更新nextIndex[]和matchIndex[],并且再成为leader之后要立刻发送一次心跳。我们希望状态转变只发生一次,这里我使用了go的sync.Once。简单的使用bool flag也同样可以达成目的,只不过可读性没有这么直观。

RequestVote

另一方面,任何服务器收到RequestVote RPC之后,要实现Figure 2中RequestVote RPC Receiver implementation的逻辑,同时也要满足Rules for Servers

代码语言:javascript
复制
func (rf *Raft) RequestVote(args *RequestVoteArgs, reply *RequestVoteReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    // rules for servers
    // all servers 2
    if args.Term > rf.currentTerm {
        rf.setNewTerm(args.Term)
    }

    // request vote rpc receiver 1
    if args.Term < rf.currentTerm {
        reply.Term = rf.currentTerm
        reply.VoteGranted = false
        return
    }

    // request vote rpc receiver 2
    myLastLog := rf.log.lastLog()
    upToDate := args.LastLogTerm > myLastLog.Term ||
        (args.LastLogTerm == myLastLog.Term && args.LastLogIndex >= myLastLog.Index)
    if (rf.votedFor == -1 || rf.votedFor == args.CandidateId) && upToDate {
        reply.VoteGranted = true
        rf.votedFor = args.CandidateId
        rf.persist()
        rf.resetElectionTimer()
    } else {
        reply.VoteGranted = false
    }
    reply.Term = rf.currentTerm
}

论文5.2 & 5.4节详细解释了这部分逻辑的来源。

AppendEntry

完成了leader election之后,leader会立刻触发一次心跳包,随后在每个心跳周期发送心跳包,来阻止新一轮leader election。 Figure 2中Rules for ServersLeaders部分将心跳称为initial empty AppendEntries RPCs (heartbeat),将包含log的RPC称为AppendEntries RPC with log entries starting at nextIndex。这种描述听起来像是用了两段不同的代码。 而实际上因为这里的心跳有两种理解:每个心跳周期,发送一次AppendEntries RPC,当这个RPC不包含log时,这个包被称为心跳包。所以也有可能发生这么一种情况:触发了一次心跳,但是带有log(即心跳周期到了,触发了一次AppendEntries RPC,但是由于follower落后了,所以这个RPC带有一段log,此时这个包就不能称为心跳包)。

实践中,我在每个心跳周期和收到新的command之后各会触发一次AppendEntries RPC。然而仔细读论文后发现,论文中并没有只说了心跳会触发AppendEntries RPC,并没有说收到客户端的指令之后应该触发AppendEntries RPC。

我甚至认为在理论上AppendEntries可以完全交给heartbeat周期来触发,即收到command后,并不立刻发送AppendEntries,而是等待下一个心跳。这种方法可以减少RPC的数量,并且通过了连续1000次测试。但是代价就是每条command的提交周期变长。

代码语言:javascript
复制
func (rf *Raft) appendEntries(heartbeat bool) {
    lastLog := rf.log.lastLog()
    for peer, _ := range rf.peers {
        if peer == rf.me {
            rf.resetElectionTimer()
            continue
        }
        // rules for leader 3
        if lastLog.Index > rf.nextIndex[peer] || heartbeat {
            nextIndex := rf.nextIndex[peer]
            if nextIndex <= 0 {
                nextIndex = 1
            }
            if lastLog.Index+1 < nextIndex {
                nextIndex = lastLog.Index
            }
            prevLog := rf.log.at(nextIndex - 1)
            args := AppendEntriesArgs{
                Term:         rf.currentTerm,
                LeaderId:     rf.me,
                PrevLogIndex: prevLog.Index,
                PrevLogTerm:  prevLog.Term,
                Entries:      make([]Entry, lastLog.Index-nextIndex+1),
                LeaderCommit: rf.commitIndex,
            }
            copy(args.Entries, rf.log.slice(nextIndex))
            go rf.leaderSendEntries(peer, &args)
        }
    }
}

Leader在AppendEntries中会并行地给所有server发送消息,然后根据返回的消息更新nextIndex和matchIndex,这部分需要按照论文5.3节来实现。 但是同样在5.3节,作者提到了fast rollback优化。Morris的讲座上,实现这种优化需要在返回消息中额外加入XTerm, XIndex, XLen三个字段。

代码语言:javascript
复制
type AppendEntriesReply struct {
    Term     int
    Success  bool
    Conflict bool
    XTerm    int
    XIndex   int
    XLen     int
}

原作的说法上,这种优化可能不是必须的,所以并不作为raft核心算法的一部分。实际上,我感觉如果直接在raft-core的代码上实现,有可能会引入一个小bug,不影响运行但可能会拖效率。然而这点我也不好证明,只能说里面多半有一部分冗余代码,但是我也不敢删,所以就留着……

代码语言:javascript
复制
func (rf *Raft) leaderSendEntries(serverId int, args *AppendEntriesArgs) {
    var reply AppendEntriesReply
    ok := rf.sendAppendEntries(serverId, args, &reply)
    if !ok {
        return
    }
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if reply.Term > rf.currentTerm {
        rf.setNewTerm(reply.Term)
        return
    }
    if args.Term == rf.currentTerm {
        // rules for leader 3.1
        if reply.Success {
            match := args.PrevLogIndex + len(args.Entries)
            next := match + 1
            rf.nextIndex[serverId] = max(rf.nextIndex[serverId], next)
            rf.matchIndex[serverId] = max(rf.matchIndex[serverId], match)
        } else if reply.Conflict {
            if reply.XTerm == -1 {
                rf.nextIndex[serverId] = reply.XLen
            } else {
                lastLogInXTerm := rf.findLastLogInTerm(reply.XTerm)
                if lastLogInXTerm > 0 {
                    rf.nextIndex[serverId] = lastLogInXTerm
                } else {
                    rf.nextIndex[serverId] = reply.XIndex
                }
            }
        } else if rf.nextIndex[serverId] > 1 {
            rf.nextIndex[serverId]--
        }
        rf.leaderCommitRule()
    }
}

当peer收到AppendEntry RPC的时候,需要实现Figure 2中AppendEntry RPC Receiver implementation + Rules for Servers。具体哪些相关,我已经加在注释里了。论文里的步骤必须严格遵守,不要自由发挥。这一点想必大家在debug的时候都深有体会……

代码语言:javascript
复制
func (rf *Raft) AppendEntries(args *AppendEntriesArgs, reply *AppendEntriesReply) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    // rules for servers
    // all servers 2
    reply.Success = false
    reply.Term = rf.currentTerm
    if args.Term > rf.currentTerm {
        rf.setNewTerm(args.Term)
        return
    }

    // append entries rpc 1
    if args.Term < rf.currentTerm {
        return
    }
    rf.resetElectionTimer()

    // candidate rule 3
    if rf.state == Candidate {
        rf.state = Follower
    }
    // append entries rpc 2
    if rf.log.lastLog().Index < args.PrevLogIndex {
        reply.Conflict = true
        reply.XTerm = -1
        reply.XIndex = -1
        reply.XLen = rf.log.len()
        return
    }
    if rf.log.at(args.PrevLogIndex).Term != args.PrevLogTerm {
        reply.Conflict = true
        xTerm := rf.log.at(args.PrevLogIndex).Term
        for xIndex := args.PrevLogIndex; xIndex > 0; xIndex-- {
            if rf.log.at(xIndex-1).Term != xTerm {
                reply.XIndex = xIndex
                break
            }
        }
        reply.XTerm = xTerm
        reply.XLen = rf.log.len()
        return
    }

    for idx, entry := range args.Entries {
        // append entries rpc 3
        if entry.Index <= rf.log.lastLog().Index && rf.log.at(entry.Index).Term != entry.Term {
            rf.log.truncate(entry.Index)
            rf.persist()
        }
        // append entries rpc 4
        if entry.Index > rf.log.lastLog().Index {
            rf.log.append(args.Entries[idx:]...)
            rf.persist()
            break
        }
    }

    // append entries rpc 5
    if args.LeaderCommit > rf.commitIndex {
        rf.commitIndex = min(args.LeaderCommit, rf.log.lastLog().Index)
        rf.apply()
    }
    reply.Success = true
}

完成AppendEntry RPC之后,Leader需要提交已有的日志条目,这一点在论文5.3 & 5.4有文字叙述。但是具体在什么时候提交,需要自己去把握。仔细看Figure 2的话,其实这部分对应Rules for Servers中Leader部分的最后一小段

代码语言:javascript
复制
func (rf *Raft) leaderCommitRule() {
    // leader rule 4
    if rf.state != Leader {
        return
    }

    for n := rf.commitIndex + 1; n <= rf.log.lastLog().Index; n++ {
        if rf.log.at(n).Term != rf.currentTerm {
            continue
        }
        counter := 1
        for serverId := 0; serverId < len(rf.peers); serverId++ {
            if serverId != rf.me && rf.matchIndex[serverId] >= n {
                counter++
            }
            if counter > len(rf.peers)/2 {
                rf.commitIndex = n
                rf.apply()
                break
            }
        }
    }
}

applier

student guide中提到应该使用一个a dedicated “applier”来专门处理日志commit的事情。所以按TA说的来,并且按照作业要求使用applyCond。这里可能会触发student guide所说的The four-way deadlock,不过guide中也给出来解决方案。不重复赘述,文末有中文版的链接,自己去读。

代码语言:javascript
复制
func (rf *Raft) apply() {
    rf.applyCond.Broadcast()
}

func (rf *Raft) applier() {
    rf.mu.Lock()
    defer rf.mu.Unlock()

    for !rf.killed() {
        if rf.commitIndex > rf.lastApplied && rf.log.lastLog().Index > rf.lastApplied {
            rf.lastApplied++
            applyMsg := ApplyMsg{
                CommandValid: true,
                Command:      rf.log.at(rf.lastApplied).Command,
                CommandIndex: rf.lastApplied,
            }
            rf.mu.Unlock()
            rf.applyCh <- applyMsg
            rf.mu.Lock()
        } else {
            rf.applyCond.Wait()
        }
    }
}

Start

最后是start函数,它会接受客户端的command,并且应用raft算法。前面也说来,每次start并不一定要立刻触发AppendEntry。理论上如果每次都触发AppendEntry,而start被调用的频率又超高,Leader就会疯狂发送RPC。如果不主动触发,而被动的依赖心跳周期,反而可以造成batch operation的效果,将QPS固定成一个相对较小的值。当中的trade-off需要根据使用场景自己衡量。

代码语言:javascript
复制
func (rf *Raft) Start(command interface{}) (int, int, bool) {
    rf.mu.Lock()
    defer rf.mu.Unlock()
    if rf.state != Leader {
        return -1, rf.currentTerm, false
    }
    index := rf.log.lastLog().Index + 1
    term := rf.currentTerm

    log := Entry{
        Command: command,
        Index:   index,
        Term:    term,
    }
    rf.log.append(log)
    rf.persist()
    rf.appendEntries(false)

    return index, term, true
}

总结

  1. 一定要按照论文+student guide来实现,完全按照论文确实可以完美复现。但是话说回来,都做到这份上了,为啥不直接给个伪代码版本。。。
  2. 千万不要过早优化。直接使用函数粒度的锁,细粒度的锁在提升性能的同时,会增加复杂度,尤其debug的难度,并且这个难度在复杂的高并发+不可靠的网络背景下可以无限上升。等待debug难度过大,就只能删掉重构了。
  3. 通过单次测试只是第一步,真正的考验才刚刚开始。很多bug出现的概率不高(话说统计课上将概率低于5%叫做小概率事件,然而这种bug到处都是……
  4. 所以debug的log一定要写详细点,像我单跑一次TestFigure8Unreliable2C能打出两万条log
  5. 接上条,早点写个log可视化的脚本来处理。Python写了一下,大约30多行,可以把45s左右的test过程,变成一个5分钟左右的动画,能看到每个server的append、commit等过程
  6. 论文+student guide需要反复看,所以早点把重点摘出来写成笔记放在手边。我也发过中文版的翻译Raft: 寻找可理解的共识算法(2) MIT 6.824 - Raft学生指南
  7. 之后会在B站发个视频版,过一遍整个代码
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 s09g的技术博客 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Raft的结构
  • ticker
  • leader election
  • RequestVote
  • AppendEntry
  • applier
  • Start
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档