Tendermint的共识算法可以看成是POS+BFT,Tendermint在进行BFT共识算法确认区块前,首先使用POS算法从Validators中选举出Proposer。 然后由Proposer进行提案,最后使用BFT算法生成区块。Tendermint 的共识协议使用的gossip协议。
另外,源码分析部分由于代码篇幅太长,会省略部分源码,不影响阅读。
Tendermint 共识网络中有两个重要角色
+--------------------------------------+
v |(Wait til `CommmitTime+timeoutCommit`)
+-----------+ +-----+-----+
+----------> | Propose +--------------+ | NewHeight |
| +-----------+ | +-----------+
| | ^
|(Else, after timeoutPrecommit) v |
+-----+-----+ +-----------+ |
| Precommit | <------------------------+ Prevote | |
+-----+-----+ +-----------+ |
|(When +2/3 Precommits for block found) |
v |
+--------------------------------------------------------------------+
| Commit |
| |
| * Set CommitTime = now; |
| * Wait for block, then stage/save/commit block; |
+--------------------------------------------------------------------+
阶段:Propose阶段、Prevote阶段、Precommit阶段 投票种类:prevote、precommit、commit
整个Tendermint区块链网络需要通过Round-based协议来决定下一个区块,在区块链中共识的直接目的就是确定下一个区块内容、链接下一个区块
round-based 协议是一个状态机,主要有:
NewHeigh -> Propose -> Prevote -> Precommit -> Commit
上述每个状态都被称为一个 Step。首尾的 NewHeigh 和 Commit ,这两个 Steps 被称为特殊的 Step。
而中间循环三个 Steps则被称为一个 Round
(Propose
-> Prevote
-> Precommit
),是共识阶段
,也是算法的核心原理所在。
一个块的最终提交(Commit)可能需要多个 Round 过程,这是因为有许多原因可能会导致当前 Round 不成功(比如出块节点 Offline,提出的块是无效块,收到的 Prevote 或者 Precommit 票数不够 +2/3 等等)。
如果出块节点 Offline,提出的块是无效块,收到的 Prevote 或者 Precommit 票数不够 +2/3 出现这些情况的话,解决方案就是移步到下一轮,或者增加 timeout 时间。
服务刚启动时,节点进入第一轮状态共识,Tendenmint 称之为Round0
。
启动流程如下
主要是通过监听消息,来处理对应消息类型携带的事件。
round0
的事件,事件类型:receiveRoutine
receiveRoutine 核心方法 这个函数就比较重要了,它处理了可能导致状态转换的消息。 其中超时消息、完成一个提案和超过2/3的投票都会导致状态转换。 通过监听各种 Queue 的消息类型来处理
state.go 源码分析
// OnStart loads the latest state via the WAL, and starts the timeout and
// receive routines.
// OnStart通过WAL加载最新状态,并启动超时和接收程序。
func (cs *State) OnStart() error {
// We may set the WAL in testing before calling Start, so only OpenWAL if its
// still the nilWAL.
// 在测试中,我们可能会在调用Start之前设置WAL,所以只有在其仍然是nilWAL的情况下才会打开WAL。
if _, ok := cs.wal.(nilWAL); ok {
if err := cs.loadWalFile(); err != nil {
return err
}
}
// We may have lost some votes if the process crashed reload from consensus
// log to catchup.
// 如果从共识日志到追赶的过程中崩溃重新加载,我们可能会失去一些票数。
if cs.doWALCatchup {
repairAttempted := false
LOOP:
for {
err := cs.catchupReplay(cs.Height)
switch {
case err == nil:
break LOOP
case !IsDataCorruptionError(err):
cs.Logger.Error("error on catchup replay; proceeding to start state anyway", "err", err)
break LOOP
case repairAttempted:
return err
}
cs.Logger.Error("the WAL file is corrupted; attempting repair", "err", err)
// 1) prep work
if err := cs.wal.Stop(); err != nil {
return err
}
repairAttempted = true
// 2) backup original WAL file
corruptedFile := fmt.Sprintf("%s.CORRUPTED", cs.config.WalFile())
if err := tmos.CopyFile(cs.config.WalFile(), corruptedFile); err != nil {
return err
}
cs.Logger.Debug("backed up WAL file", "src", cs.config.WalFile(), "dst", corruptedFile)
// 3) try to repair (WAL file will be overwritten!)
if err := repairWalFile(corruptedFile, cs.config.WalFile()); err != nil {
cs.Logger.Error("the WAL repair failed", "err", err)
return err
}
cs.Logger.Info("successful WAL repair")
// reload WAL file
if err := cs.loadWalFile(); err != nil {
return err
}
}
}
// EventSwitch 只监听 EventNewRoundStep、EventValidBlock和EventVote 这三种事件
if err := cs.evsw.Start(); err != nil {
return err
}
// we need the timeoutRoutine for replay so
// we don't block on the tick chan.
// NOTE: we will get a build up of garbage go routines
// firing on the tockChan until the receiveRoutine is started
// to deal with them (by that point, at most one will be valid)
// 我们需要重放的timeoutRoutine,这样我们就不会在tick chan上阻塞。
// 注意:我们将得到大量的垃圾程序
// 直到receiveRoutine开始处理它们(到那时,最多只有一个是有效的)来处理它们(到那时,最多只有一个是有效的)。
if err := cs.timeoutTicker.Start(); err != nil {
return err
}
// Double Signing Risk Reduction
// 检查双重验签
if err := cs.checkDoubleSigningRisk(cs.Height); err != nil {
return err
}
// now start the receiveRoutine
// 启动接收程序
go cs.receiveRoutine(0)
// schedule the first round!
// use GetRoundState so we don't race the receiveRoutine for access
// 安排第一轮!
// 使用GetRoundState,这样我们就不会和receiveRoutine争夺访问权了。
cs.scheduleRound0(cs.GetRoundState())
return nil
}
scheduleRound0 的作用是将消息发送到内部的 chan 当中,receiveRoutine 负责监听不同类型事件,会监听到这个事件。
// enterNewRound(height, 0) at cs.StartTime.
func (cs *State) scheduleRound0(rs *cstypes.RoundState) {
// cs.Logger.Info("scheduleRound0", "now", tmtime.Now(), "startTime", cs.StartTime)
sleepDuration := rs.StartTime.Sub(tmtime.Now())
// 这一轮是发送了 cstypes.RoundStepNewHeight 事件类型
cs.scheduleTimeout(sleepDuration, rs.Height, 0, cstypes.RoundStepNewHeight)
}
发送内部消息,最终将消息发送到 chan
// ScheduleTimeout schedules a new timeout by sending on the internal tickChan.
// The timeoutRoutine is always available to read from tickChan, so this won't block.
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
// ScheduleTimeout通过在内部tickChan上发送来安排一个新的超时。
// timeoutRoutine总是可以从tickChan中读取,所以这不会阻塞。
// 如果timeoutRoutine已经为以后的高度/轮次/步长安排了一个超时,则调度可能会失败。
func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) {
t.tickChan <- ti
}
receiveRoutine 处理消息类型
scheduleRound0
发出的消息RoundStepNewHeight
事件
case cstypes.RoundStepNewHeight://-----------------------------------------
// the main go routines
// receiveRoutine handles messages which may cause state transitions.
// it's argument (n) is the number of messages to process before exiting - use 0 to run forever
// It keeps the RoundState and is the only thing that updates it.
// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities.
// State must be locked before any internal state is updated.
// receiveRoutine处理可能导致状态转换的消息。
// 它的参数(n)是退出前要处理的消息的数量--用0表示永远运行。
// 它保持RoundState,并且是唯一能更新它的东西。
// 更新(状态转换)发生在超时、完整提案和2/3多数的情况下。
// 在任何内部状态被更新之前,状态必须被锁定。
func (cs *State) receiveRoutine(maxSteps int) {
...
// 拿到当前链状态
rs := cs.RoundState
// 注意,这个是接收的 reactor 的消息
var mi msgInfo
// 处理三种类型的消息
// 1.peerMsgQueue 来自节点的消息
// 2.internalMsgQueue 内部消息
// 3.timeoutTicker 超时的消息
select {
case <-cs.txNotifier.TxsAvailable():
cs.handleTxsAvailable()
// peer 节点消息
case mi = <-cs.peerMsgQueue:
if err := cs.wal.Write(mi); err != nil {
cs.Logger.Error("failed writing to WAL", "err", err)
}
// handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi)
//监听内部队列消息
case mi = <-cs.internalMsgQueue:
err := cs.wal.WriteSync(mi) // NOTE: fsync
if err != nil {
panic(fmt.Sprintf(
"failed to write %v msg to consensus WAL due to %v; check your file system and restart the node",
mi, err,
))
}
// handles proposals, block parts, votes
// 核心的状态逻辑处理,处理 proposals, block parts, votes
cs.handleMsg(mi)
// 注意这个监听,ScheduleTimeout 的 channel
case ti := <-cs.timeoutTicker.Chan(): // tockChan:
if err := cs.wal.Write(ti); err != nil {
cs.Logger.Error("failed writing to WAL", "err", err)
}
// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti, rs)
case <-cs.Quit():
onExit(cs)
return
}
}
}
// 进入新一轮
func (cs *State) enterNewRound(height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round)
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) {
logger.Debug(
"entering new round with invalid args",
"current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step),
)
return
}
if now := tmtime.Now(); cs.StartTime.After(now) {
logger.Debug("need to set a buffer and log message here for sanity", "start_time", cs.StartTime, "now", now)
}
logger.Debug("entering new round", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))
// increment validators if necessary
// 必要时增加 validator
validators := cs.Validators
if cs.Round < round {
validators = validators.Copy()
validators.IncrementProposerPriority(tmmath.SafeSubInt32(round, cs.Round))
}
// Setup new round
// we don't fire newStep for this step,
// but we fire an event, so update the round step first
// 只是 set 值,并没有接口调用
cs.updateRoundStep(round, cstypes.RoundStepNewRound)
...//省略部分代码
cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round+1) to allow round-skipping
cs.TriggeredTimeoutPrecommit = false
// 发布事件??
if err := cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()); err != nil {
cs.Logger.Error("failed publishing new round", "err", err)
}
cs.metrics.Rounds.Set(float64(round))
// Wait for txs to be available in the mempool
// before we enterPropose in round 0. If the last block changed the app hash,
// we may need an empty "proof" block, and enterPropose immediately.
// 进入 round0 之前,等待mempool中的txs可用。
// 如果最后一个区块改变了应用程序的哈希值,我们可能需要一个空的 "证明 "区块,并立即输入Propose。
waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height)
if waitForTxs {
if cs.config.CreateEmptyBlocksInterval > 0 {
// 构建空块证明,进入下一个阶段
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
cstypes.RoundStepNewRound)
}
} else {
// 进入 propose 阶段
cs.enterPropose(height, round)
}
}