专栏首页后台全栈之路用 etcd/raft 组建能够选举的最简集群 demo
原创

用 etcd/raft 组建能够选举的最简集群 demo

https://cloud.tencent.com/developer/article/1644111当今互联网行业中,对于分布式一致性算法,个人觉得实用性最高并且应用最广泛的就是 Raft 算法了。Raft 非常适合用于所有的节点均为可信节点时的必要数据同步场景中。Raft 的基本原理理解起来并不难,网上很多文字简介,都不如一个很生动的动画来得直观。

etcd/raft

在 Kubenetes 中广泛使用的分布式 KV 存储系统 etcd 使用的就是 Raft 算法。算法的实现就直接作为 etcd 的子 package(用 Go 编写),路径为:github.com/etcd-io/etcd/raft

官方提供了一个 demo。这个 demo 其实已经非常完整了,它包含了网络通信、快照压缩、数据同步等完整的功能。而对于 etcd/raft 的初见者而言,还是稍微有点门槛了。本文的目的是尽量抽丝剥茧,首先从 raft 最基本的功能——选举来入手,构建一个小的集群 demo,一步一步说明 etcd/raft 的用法。

Demo 功能

这个小 demo 只实现一个功能:已知数量的集群节点,能够进行 leader 的选举。更多的功能(比如数据的存储)在以后的文章陆续解析。

为此,我们需要研究 etcd/raft 的相关函数的用法。

Raft 节点数据结构

raft 使用接口 Node 来描述一个 Raft 节点。该接口的函数中,本文(或者说本阶段)涉及的有四个:

type Node interface {
    Tick()
    Step(ctx context.Context, msg raftpb.Message) error
    Ready() <-chan Ready
    Status() Status
}

启动节点

Raft 节点数量建议是一个素数。这里我采用 3 个。在节点数量已知的情况下,我们首先要告知 Raft node 节点的列表。每个节点应该有唯一的一个 uint64 类型的 ID:

    peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}

应用程序需要自己实现节点与节点之间的网络通信。这里我就在本地单进程运行三个协程,模拟三个节点,给三个节点分配三个 channel 用来通信:

var (
    bcChans = []chan raftpb.Message{
        make(chan raftpb.Message),
        make(chan raftpb.Message),
        make(chan raftpb.Message),
    }
)

// ......

func main() {
    peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}
    go startNode(0x01, peers)
    go startNode(0x02, peers)
    go startNode(0x03, peers)

    time.Sleep(2 * time.Second)
    return
}

节点程序中要完成的功能

在 etcd/raft 中对 Raft 算法的逻辑实现是尽量地轻量化,只实现算法的核心功能。但与此相对的,需要调用 Raft 的应用程序实现较多的额外逻辑来实现完整的节点功能。在本文中,我们只关心节点的选举,该场景下我们需要实现的功能有以下两个:

节点内部心跳机制

Raft 节点依赖定期的心跳来进行周期性的状态机流转,应用程序需要给 raft 节点提供。在 demo 中,我用了一个带随机抖动的 ticker 来实现——而这也是 Raft 算法中建议的方案,也就是带有一点随机因素。当每一次 tick 到来时,就可以调用 raft node 的 Tick() 方法,推动内部状态机的更新:

func startNode(id uint64, peers []raft.Peer) {
    // ......

    for {
        select {
        case <-n.tick.Elapsed():    // 相当于 time 包 Ticker 的 tick.C
            n.node.Tick()           // n.node 是 raft.Node 对象,下同

        // ......
    }

    // ......
}

转发节点之间的 raft 通信

前文说到,Raft 节点之间的网络通信需要应用程序来实现。应用程序通过 etcd/raft 节点的 Ready() 方法接收节点需要对其他发出的的信息。Ready() 函数返回 raft.Ready 结构体,在这一阶段中,我们需要使用的是 Ready 结构体的 Messages 成员,这是一个 []raftpb.Message 类型。应用程序需要负责的,就是将这些 message 发送出去。

Message 的定义并不长,如下所示:

type Message struct {
    Type             MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
    To               uint64      `protobuf:"varint,2,opt,name=to" json:"to"`
    From             uint64      `protobuf:"varint,3,opt,name=from" json:"from"`
    Term             uint64      `protobuf:"varint,4,opt,name=term" json:"term"`
    LogTerm          uint64      `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
    Index            uint64      `protobuf:"varint,6,opt,name=index" json:"index"`
    Entries          []Entry     `protobuf:"bytes,7,rep,name=entries" json:"entries"`
    Commit           uint64      `protobuf:"varint,8,opt,name=commit" json:"commit"`
    Snapshot         Snapshot    `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
    Reject           bool        `protobuf:"varint,10,opt,name=reject" json:"reject"`
    RejectHint       uint64      `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
    Context          []byte      `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
    XXX_unrecognized []byte      `json:"-"`
}

可以看到,这个结构体分别按照 protobuf 和 json 进行了定义,这就非常方便应用程序根据不同的通信模式对数据进行序列化和反序列化后在网络中传输。而 To 则告诉了应用程序应该将这个消息发送给哪一个节点。在 demo 则是根据 To 发到对应的 channel 里。

接收其他节点发来的 raft 通信

在 demo 中,节点从 channel 中获取到 Message 对象之后,调用本节点的 Step() 函数:

func startNode(id uint64, peers []raft.Peer) {
    // ......

    for {
        select {
        // ......

        case m := <-n.recv:
            n.node.Step(context.TODO(), m)
    }

    // ......
}

完整 demo 代码

完整代码九十来行,可以直接运行之后观察 shell 输出,了解 raft 的选举过程。

package main

import (
    "context"
    "log"
    "strings"
    "time"

    "github.com/coreos/etcd/raft/raftpb"
    "github.com/etcd-io/etcd/raft"
    "github.com/influxdata/telegraf/agent"
)

func init() {
    log.SetFlags(log.Lshortfile | log.LstdFlags)
}

var (
    infof  = log.Printf
    errorf = log.Printf

    bcChans = []chan raftpb.Message{
        make(chan raftpb.Message),
        make(chan raftpb.Message),
        make(chan raftpb.Message),
    }
)

const (
    tickInterval      = 100 * time.Millisecond
    jitterMillisecond = 15 * time.Millisecond
)

func main() {
    infof("hello, raft!")
    defer infof("end of raft")

    peers := []raft.Peer{{ID: 0x01}, {ID: 0x02}, {ID: 0x03}}
    go startNode(0x01, peers)
    go startNode(0x02, peers)
    go startNode(0x03, peers)

    time.Sleep(2 * time.Second)
    return
}

func startNode(id uint64, peers []raft.Peer) {
    ctx := context.TODO()
    storage := raft.NewMemoryStorage()
    c := raft.Config{
        ID:              id,
        ElectionTick:    10,
        HeartbeatTick:   1,
        Storage:         storage,
        MaxSizePerMsg:   4096,
        MaxInflightMsgs: 256,
    }

    n := &node{
        id:          id,
        prefix:      strings.Repeat("\t\t\t", int(id)) + "| ",
        node:        raft.StartNode(&c, peers),
        tick:        agent.NewRollingTicker(tickInterval-jitterMillisecond, tickInterval+jitterMillisecond),
        recv:        bcChans[id-1],
        raftStorage: storage,
    }

    for {
        select {
        case <-n.tick.Elapsed():
            n.node.Tick()

        case rd := <-n.node.Ready():
            n.raftStorage.Append(rd.Entries)
            go n.sendMessage(rd.Messages)
            n.node.Advance()

        case m := <-n.recv:
            infof("%d -%s got message from %v to %v, type %v", id, n.prefix, m.From, m.To, m.Type)
            n.node.Step(ctx, m)
            infof("%d -%s status: %v", id, n.prefix, n.node.Status().RaftState)
        }
    }

    return
}

type node struct {
    id          uint64
    prefix      string
    node        raft.Node
    tick        *agent.RollingTicker
    recv        chan raftpb.Message
    raftStorage *raft.MemoryStorage
}

func (n *node) sendMessage(msg []raftpb.Message) {
    for _, m := range msg {
        to := m.To
        ch := bcChans[to-1]
        infof("%d -%s send to %v, type %v", n.id, n.prefix, m.To, m.Type)
        ch <- m
    }
    return
}

本文章采用 知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议 进行许可。

原作者: amc,欢迎转载,但请注明出处。

原文标题:用 etcd/raft 组建能够选举的最简集群 demo

发布日期:2020/06/12

原文链接:https://cloud.tencent.com/developer/article/1644111

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 腾讯 Tars Web 管理端用户体系对接

    这段时间一直在基于 Tars 作开发。最近的文章也多是针对 Tars 的一些学习笔记。前面我们搭建了 Tars 基础框架,打开了 Tars web 管理界面进行...

    amc
  • 腾讯 Tars-Go 服务获取自定义模版(配置)值

    腾讯 Tars 框架中,有两种可以称之为 “配置” 的地方:其中一个是可以自定义的,在 Tars 管理页面中称为 “服务配置”。在这里,可以按照开发者喜欢的格式...

    amc
  • 小面试官教你 MySQL——引擎、索引和算法

    弄懂了 MySQL 的基本 CURD 操作之后,下一个必须掌握的知识就是 MySQL 的索引。

    amc
  • 二叉树-最近的公共祖先

    已知二叉树,求二叉树中给定的两个节点的最近公共祖先。 最近公共祖先: 两节点v与w的最近公共祖先u,满足在树上最低(离根最 远),且v,w两个节点都是u的子孙。...

    小飞侠xp
  • 【一天一大 lee】 把二叉搜索树转换为累加树 (难度:简单)-Day20200921

    给定一个二叉搜索树(Binary Search Tree),把它转换成为累加树(Greater Tree),使得每个节点的值是原来的节点值加上所有大于它的节点值...

    前端小书童
  • 一天一大 lee(二叉树的所有路径)难度:简单-Day20200904

    前端小书童
  • 剑指offer_8_二叉树转双链表

    描述:输入一颗二叉搜索树,将该二叉搜索树转换成一个排序的双向链表,要求不能创建任何新的节点,只能调整树中节点指针的指向。

    用户6055494
  • 剑指offer - 反转链表 - JavaScript

    借助栈的后入先出的顺序,可以将顺序列表逆序。不过这不是原地反转,当然题目也没有要求。

    心谭博客
  • LRU算法的实现

    LRU算法全称为Least Recently Used,也就是最近最少使用,操作系统的页面置换算法中就有LRU算法,用来将内存中的页换出,下面我们用JA...

    用户6055494
  • HDU5033

    真蠢,和网络赛的时候我WA掉的思想已经很接近了,被他们又是说这说那的绕进去了,就是一个单调栈,栈中元素的纵坐标严格降低,并且栈中顶部两点之间斜率的绝对值要小于栈...

    triplebee

扫码关注云+社区

领取腾讯云代金券