你好,我是 aoho,大家周末快乐。今天我和你分享的主题是:etcd-raft 模块如何实现分布式一致性?
我们在上一篇介绍了 etcd 读写操作的底层实现,但至于 etcd 集群如何实现分布式数据一致性并没有详细介绍。在分布式环境中,常用数据复制来避免单点故障,实现多副本,提高服务的高可用性以及系统的吞吐量。etcd 集群中的多个节点不可避免地会出现相互之间数据不一致的情况。但不管是同步复制、异步复制还是半同步复制,会存在可用性或者一致性的问题。解决多个节点数据一致性的方案其实就是共识算法,常见的共识算法有 Paxos 和 Raft。Zookeeper 使用的 Zab 协议,etcd 使用的共识算法就是 Raft。
本课时将会首先介绍如何使用 raftexample,接着介绍 etcd-raft 模块的实现。etcd-raft 模块是 etcd 中解决分布式一致性的模块,我们结合源码分析下 raft 在 etcd 中的实现。
etcd 项目中包含了 Raft 库使用的示例。raftexample 基于 etcd-raft 库实现了键值对存储服务器。
raftexample 的入口方法实现代码如下所示:
func main() {
cluster := flag.String("cluster", "http://127.0.0.1:9021", "comma separated cluster peers")
id := flag.Int("id", 1, "node ID")
kvport := flag.Int("port", 9121, "key-value server port")
join := flag.Bool("join", false, "join an existing cluster")
flag.Parse()
// 构建 propose
proposeC := make(chan string)
defer close(proposeC)
confChangeC := make(chan raftpb.ConfChange)
defer close(confChangeC)
// raft 为来自http api的提案提供 commit 流
var kvs *kvstore
getSnapshot := func() ([]byte, error) { return kvs.getSnapshot() }
commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)
kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)
// 键值对的处理器将会向 raft 发起提案来更新
serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}
在入口函数中创建了两个 channel:proposeC 用于提交写入的数据;confChangeC 用于提交配置改动数据。
然后分别启动如下核心的 goroutine:
到了这里,已经对 raft 的使用有一个基本的概念了,即通过 node 结构体实现的 Node 接口与 raft 库进行交互,涉及数据变更的核心数据结构就是 Ready 结构体,接下来可以进一步来分析该库的实现了。
raft 库对外提供一个 Node 的 interface,由 raft/node.go 中的 node 结构体实现,这也是应用层唯一需要与这个 raft 库直接打交道的结构体, Node 接口需要实现的函数包括:Tick、Propose、Ready、Step 等。
我们重点需要了解 Ready,这是一个核心函数,将返回 Ready 对应的 channel,该通道表示当前时间点的 channel。应用层需要关注该 channel,当发生变更时,其中的数据也将会进行相应的操作。其他的函数对应的功能如下:
接着是 raft 算法的实现,node 结构体实现了 Node 接口,其定义如下:
type node struct {
propc chan msgWithResult
recvc chan pb.Message
confc chan pb.ConfChangeV2
confstatec chan pb.ConfState
readyc chan Ready
advancec chan struct{}
tickc chan struct{}
done chan struct{}
stop chan struct{}
status chan chan Status
rn *RawNode
}
这个结构体会在后面经常用到。在 raft/raft.go 中还有两个核心数据结构:
我们来看看 raft StateMachine 的状态机转换,实际上就是 raft 算法中各种角色的转换。每个 raft 节点,可能具有以下三种状态中的一种:
每一个状态都有其对应的状态机,每次收到一条提交的数据时,都会根据其不同的状态将消息输入到不同状态的状态机中。同时,在进行 tick 操作时,每种状态对应的处理函数也是不一样的。
因此 raft 结构体中将不同的状态及其不同的处理函数,独立出来几个成员变量:
etcd-raft StateMachine 封装在 raft 机构体中,其状态转换如下图:
raft-StateMachine.png
raft state 转换的调用接口都在 raft.go 中,定义如下:
func (r *raft) becomeFollower(term uint64, lead uint64)
func (r *raft) becomePreCandidate()
func (r *raft) becomeCandidate()
func (r *raft) becomeLeader()
raft 在各种状态下,如何驱动 raft StateMachine 状态机运转?etcd 将 raft 相关的所有处理都抽象为了 Msg,通过 Step 接口处理:
func (r *raft) Step(m pb.Message) error {
r.step(r, m)
}
其中 step 是一个回调函数,在不同的 state 会设置不同的回调函数来驱动 raft,这个回调函数 stepFunc 就是在 becomeXX()
函数完成的设置
type raft struct {
...
step stepFunc
}
step 回调函数有如下几个值,其中 stepCandidate 会处理 PreCandidate 和 Candidate 两种状态:
func stepFollower(r *raft, m pb.Message) error
func stepCandidate(r *raft, m pb.Message) error
func stepLeader(r *raft, m pb.Message) error
这里以 stepCandidate 为例说明:
func stepCandidate(r *raft, m pb.Message) error {
...
switch m.Type {
case pb.MsgProp:
r.logger.Infof("%x no Leader at term %d; dropping proposal", r.id, r.Term)
return ErrProposalDropped
case pb.MsgApp:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
...
case pb.MsgTimeoutNow:
r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
}
return nil
}
即对各种 Msg 进行处理,这里就不展开详细展开。我们来看下 raft 消息的类型及其定义。
raft 算法本质上是一个大的状态机,任何的操作例如选举、提交数据等,最后封装成一个消息结构体,输入到 raft 算法库的状态机中。
在 raft/raftpb/raft.proto 文件中,定义了 raft 算法中传输消息的结构体。熟悉 raft 论文的都知道,raft 算法其实由好几个协议组成,但是在这里,统一定义在了 Message 这个结构体之中,以下总结了该结构体的成员用途。
// 位于 raft/raftpb/raft.pb.go:295
type Message struct {
Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"` // 消息类型
To uint64 `protobuf:"varint,2,opt,name=to" json:"to"` // 消息接收者的节点ID
From uint64 `protobuf:"varint,3,opt,name=from" json:"from"` // 消息发送者的节点ID
Term uint64 `protobuf:"varint,4,opt,name=term" json:"term"` // 任期ID
LogTerm uint64 `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"` // 日志所处的任期ID
Index uint64 `protobuf:"varint,6,opt,name=index" json:"index"` // 日志索引ID,用于节点向Leader汇报自己已经commit的日志数据ID
Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"` // 日志条目数组
Commit uint64 `protobuf:"varint,8,opt,name=commit" json:"commit"` // 提交日志索引
Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"` // 快照数据
Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"` // 是否拒绝
RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"` // 拒绝同步日志请求时返回的当前节点日志ID,用于被拒绝方快速定位到下一次合适的同步日志位置
Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"` // 上下文数据
XXX_unrecognized []byte `json:"-"`
}
Message 结构体相关的数据类型为 MessageType,MessageType 有十九种。当然,并不是所有的消息类型都会用到上面定义的 Message 结构体中的所有字段,因此其中有些字段是 optinal 的,我其中常用的协议(即不同的消息类型)的用途总结成如下的表格:
type | 功能 | to | from |
---|---|---|---|
MsgHup | 不用于节点间通信,仅用于发送给本节点让本节点进行选举 | 消息接收者的节点ID | 本节点 ID |
MsgBeat | 不用于节点间通信,仅用于 Leader 节点在 heartbeat 定时器到期时向集群中其他节点发送心跳消息 | 消息接收者的节点 ID | 本节点 ID |
MsgProp | raft 库使用者提议(propose)数据 | 消息接收者的节点 ID | 本节点 ID |
MsgApp | 用于 Leader 向集群中其他节点同步数据的消息 | 消息接收者的节点 ID | 本节点 ID |
MsgSnap | 用于 Leader 向 Follower 同步数据用的快照消息 | 消息接收者的节点 ID | 本节点 ID |
MsgAppResp | 集群中其他节点针对 Leader 的 MsgApp/MsgSnap 消息的应答消息 | 消息接收者的节点 ID | 本节点 ID |
MsgVote/MsgPreVote 消息 | 节点投票给自己以进行新一轮的选举 | 消息接收者的节点 ID | 本节点 ID |
MsgVoteResp/MsgPreVoteResp 消息 | 投票应答消息 | 消息接收者的节点ID | 本节点 ID |
MsgUnreachable | 用于应用层向 raft 库汇报某个节点当前已不可达 | 消息接收者的节点 ID | 节点 ID |
MsgSnapStatus | 用于应用层向 raft 库汇报某个节点当前接收快照状态 | 消息接收者的节点 ID | 本节点 ID |
MsgTransferLeader | 用于迁移 Leader | 消息接收者的节点 ID | 注意这里不是发送者的 ID 了,而是准备迁移过去成为新 Leader 的节点 ID |
MsgCheckQuorum | 消息接收者的节点 ID | 消息接收者的节点 ID | 节点 ID |
MsgTimeoutNow | Leader 迁移时,当新旧 Leader 的日志数据同步后,旧 Leader 向新 Leader 发送该消息通知可以进行迁移了 | 新的 Leader ID | 旧的 Leader 的节点 ID |
MsgReadIndex 和 MsgReadIndexResp 消息 | 用于读一致性的消息 | 接收者节点 ID | 发送者节点 ID |
上表列出了消息的类型对应的功能、消息接收者的节点 ID 和 消息发送者的节点 ID。在收到消息之后,根据消息类型检索本表,帮助我们理解 raft 算法的操作。
raft 一致性算法实现的关键有 Leader 选举、日志复制和安全性限制。Leader 故障后集群能快速选出新 Leader;日志复制, 集群只有 Leader 能写入日志, Leader 负责复制日志到 Follower 节点,并强制 Follower 节点与自己保持相同;安全性
raft 算法的第一步是首先选举出 Leader 出来,在 Leader 出现故障后也需要快速选出新 Leader,所以我们来关注下选举的流程。
只有在 Candidate 或者 Follower 状态下的节点,才有可能发起一个选举流程,而这两种状态的节点,其对应的 tick 函数为 raft.tickElection 函数,用来发起选举和选举超时控制。选举流程如下所示:
当收到任期号大于当前节点任期号的消息,同时该消息类型如果是选举类的消息(类型为 prevote 或者 vote)时,会做以下判断:
如果不是强制要求选举,同时又在租约期以内,那么就忽略该选举消息返回不进行处理,这么做是为了避免出现那些离开集群的节点,频繁发起新的选举请求。
只有在同时满足以上两个条件的情况下,才能同意该节点的选举,否则都会被拒绝。这么做的原因是:保证最后能胜出来当新的 Leader 的节点,它上面的日志都是最新的。
选举好 Leader 之后,Leader 在收到 put 提案时,如何将提案复制给其他 Follower 呢?
我们回顾下前面课时所讲的 etcd 读写请求的处理流程。以下面的图示来说明日志复制的流程。
follower-state-2.png
put foo bar
。这里涉及到两个索引值,committedIndex 存储的最后一条提交(commit)日志的索引,appliedIndex 存储的是最后一条应用到状态机中的日志索引值,一条日志只有被提交了才能应用到状态机中,因此总有 committedIndex >= appliedIndex 不等式成立。在这里只是添加一条日志还并没有提交,两个索引值还指向上一条日志。接着看看 Leader 怎么将日志数据复制到 Follower 节点。
follower-state-3.png
put foo bar
命令成功复制,可以进行提交,于是修改了本地 committed 日志的索引指向最新的存储 put foo bar
的日志,而 appliedIndex 还是保持着上一次的值,因为还没有应用该命令到状态机中。当这个命令提交完成了之后,命令就可以提交给应用层了。
本文主要介绍了 etcd-raft 模块实现分布式一致性的原理,通过 raftexample 了解 raft 模块的使用方式和过程。接着重点介绍了选举流程和日志复制的过程。除此之外,etcd 还有安全性限制保证日志选举和日志复制的正确性,比如 raft 算法中,并不是所有节点都能成为 Leader。一个节点要成为 Leader,需要得到集群中半数以上节点的投票,而一个节点会投票给一个节点,其中一个充分条件是:这个进行选举的节点的日志,比本节点的日志更新。其他还有判断日志的新旧以及提交前面任期的日志条目等措施。
最后,留一个问题,什么情况会选举超时到来时没有任何一个节点成为 Leader,后续会怎么处理呢?欢迎你在留言区提出。