前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:etcd(11)

golang源码分析:etcd(11)

作者头像
golangLeetcode
发布2023-09-09 08:27:50
1410
发布2023-09-09 08:27:50
举报

我们继续在文件 server/etcdserver/server.go 中分析EtcdServer的初始化流程,它会先调用bootstrap函数初始化后端存储bolt-db然后初始化raftNode,最后初始化transport,调用start开始raft协议的网络传输。具体实现如下

代码语言:javascript
复制
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
      b, err := bootstrap(cfg)
      srv = &EtcdServer{
    readych:               make(chan struct{}),
    Cfg:                   cfg,
    lgMu:                  new(sync.RWMutex),
    lg:                    cfg.Logger,
    errorc:                make(chan error, 1),
    v2store:               b.storage.st,
    snapshotter:           b.ss,
    r:                     *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
    memberId:              b.cluster.nodeID,
    attributes:            membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
    cluster:               b.cluster.cl,
    stats:                 sstats,
    lstats:                lstats,
    SyncTicker:            time.NewTicker(500 * time.Millisecond),
    peerRt:                b.prt,
    reqIDGen:              idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
    AccessController:      &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
    consistIndex:          b.storage.backend.ci,
    firstCommitInTerm:     notify.NewNotifier(),
    clusterVersionChanged: notify.NewNotifier(),
  }
      srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
      tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
      srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
      tr := &rafthttp.Transport{
    Logger:      cfg.Logger,
    TLSInfo:     cfg.PeerTLSInfo,
    DialTimeout: cfg.PeerDialTimeout(),
    ID:          b.cluster.nodeID,
    URLs:        cfg.PeerURLs,
    ClusterID:   b.cluster.cl.ID(),
    Raft:        srv,
    Snapshotter: b.ss,
    ServerStats: sstats,
    LeaderStats: lstats,
    ErrorC:      srv.errorc,
  }
      if err = tr.Start(); err != nil {

其中EtcdServer的定义如下:

代码语言:javascript
复制
type EtcdServer struct {
  // inflightSnapshots holds count the number of snapshots currently inflight.
  inflightSnapshots int64  // must use atomic operations to access; keep 64-bit aligned.
  appliedIndex      uint64 // must use atomic operations to access; keep 64-bit aligned.
  committedIndex    uint64 // must use atomic operations to access; keep 64-bit aligned.
  term              uint64 // must use atomic operations to access; keep 64-bit aligned.
  lead              uint64 // must use atomic operations to access; keep 64-bit aligned.


  consistIndex cindex.ConsistentIndexer // consistIndex is used to get/set/save consistentIndex
  r            raftNode                 // uses 64-bit atomics; keep 64-bit aligned.


  readych chan struct{}
  Cfg     config.ServerConfig


  lgMu *sync.RWMutex
  lg   *zap.Logger


  w wait.Wait


  readMu sync.RWMutex
  // read routine notifies etcd server that it waits for reading by sending an empty struct to
  // readwaitC
  readwaitc chan struct{}
  // readNotifier is used to notify the read routine that it can process the request
  // when there is no error
  readNotifier *notifier


  // stop signals the run goroutine should shutdown.
  stop chan struct{}
  // stopping is closed by run goroutine on shutdown.
  stopping chan struct{}
  // done is closed when all goroutines from start() complete.
  done chan struct{}
  // leaderChanged is used to notify the linearizable read loop to drop the old read requests.
  leaderChanged *notify.Notifier


  errorc     chan error
  memberId   types.ID
  attributes membership.Attributes


  cluster *membership.RaftCluster


  v2store     v2store.Store
  snapshotter *snap.Snapshotter


  applyV2 ApplierV2


  uberApply apply.UberApplier


  applyWait wait.WaitTime


  kv         mvcc.WatchableKV
  lessor     lease.Lessor
  bemu       sync.RWMutex
  be         backend.Backend
  beHooks    *serverstorage.BackendHooks
  authStore  auth.AuthStore
  alarmStore *v3alarm.AlarmStore


  stats  *stats.ServerStats
  lstats *stats.LeaderStats


  SyncTicker *time.Ticker
  // compactor is used to auto-compact the KV.
  compactor v3compactor.Compactor


  // peerRt used to send requests (version, lease) to peers.
  peerRt   http.RoundTripper
  reqIDGen *idutil.Generator


  // wgMu blocks concurrent waitgroup mutation while server stopping
  wgMu sync.RWMutex
  // wg is used to wait for the goroutines that depends on the server state
  // to exit when stopping the server.
  wg sync.WaitGroup


  // ctx is used for etcd-initiated requests that may need to be canceled
  // on etcd server shutdown.
  ctx    context.Context
  cancel context.CancelFunc


  leadTimeMu      sync.RWMutex
  leadElectedTime time.Time


  firstCommitInTerm     *notify.Notifier
  clusterVersionChanged *notify.Notifier


  *AccessController
  // forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
  // Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
  forceSnapshot     bool
  corruptionChecker CorruptionChecker
}

这里我们重点关注下它的属性r,它代表了一个raftNode节点,它调用了raft包的初始化函数来进行初始化

代码语言:javascript
复制
r:                     *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),

server的Start方法定义如下:

代码语言:javascript
复制
func (s *EtcdServer) Start() {
        s.start()
  s.GoAttach(func() { s.adjustTicks() })
  s.GoAttach(func() { s.publishV3(s.Cfg.ReqTimeout()) })
  s.GoAttach(s.purgeFile)
  s.GoAttach(func() { monitorFileDescriptor(s.Logger(), s.stopping) })
  s.GoAttach(s.monitorClusterVersions)
  s.GoAttach(s.monitorStorageVersion)
  s.GoAttach(s.linearizableReadLoop)
  s.GoAttach(s.monitorKVHash)
  s.GoAttach(s.monitorCompactHash)
  s.GoAttach(s.monitorDowngrade)
代码语言:javascript
复制
func (s *EtcdServer) start() {
      go s.run()

里面定义了一部分raft处理流程

代码语言:javascript
复制
func (s *EtcdServer) run() {
      sn, err := s.r.raftStorage.Snapshot()
        sched := schedule.NewFIFOScheduler(lg)
      rh := &raftReadyHandler{
    getLead:    func() (lead uint64) { return s.getLead() },
    updateLead: func(lead uint64) { s.setLead(lead) },
    updateLeadership: func(newLeader bool) {
      s.r.start(rh)
      s.r.stop()
        for {
    select {
    case ap := <-s.r.apply():
      f := schedule.NewJob("server_applyAll", func(context.Context) { s.applyAll(&ep, &ap) })
      sched.Schedule(f)
    case leases := <-expiredLeaseC:
      s.revokeExpiredLeases(leases)

raft的Step方法被包裹在函数

代码语言:javascript
复制
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
      return s.r.Step(ctx, m)
代码语言:javascript
复制
func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
  s.r.ReportSnapshot(id, status)
}
代码语言:javascript
复制
func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
        <-apply.notifyc


  s.triggerSnapshot(ep)
  select {
  // snapshot requested via send()
  case m := <-s.r.msgSnapC:
    merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
    s.sendMergedSnap(merged)

在applySnapshot函数里会调用transport的AddPeer方法来增加节点:

代码语言:javascript
复制
func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) {
        if raft.IsEmptySnap(toApply.snapshot) {
    return
  }
        // recover raft transport
  s.r.transport.RemoveAllPeers()
        for _, m := range s.cluster.Members() {
    if m.ID == s.MemberId() {
      continue
    }
    s.r.transport.AddPeer(m.ID, m.PeerURLs)
  }
代码语言:javascript
复制
func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
      resp, err := s.promoteMember(ctx, id)
      for _, url := range leader.PeerURLs {
      resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
代码语言:javascript
复制
func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
        cc := raftpb.ConfChange{
    Type:    raftpb.ConfChangeAddNode,
    NodeID:  id,
    Context: b,
  }


  return s.configure(ctx, cc)
代码语言:javascript
复制
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
      cc.NodeID = raft.None
代码语言:javascript
复制
func (s *EtcdServer) raftStatus() raft.Status {
  return s.r.Node.Status()
}

pkg/schedule/schedule.go包里定义了一个先进先出的队列,如果能处理完直接处理,处理不完,起goroutine来进行处理

代码语言:javascript
复制
func NewFIFOScheduler(lg *zap.Logger) Scheduler {
          f := &fifo{
    resume: make(chan struct{}, 1),
    donec:  make(chan struct{}, 1),
    lg:     lg,
  }
        go f.run()
代码语言:javascript
复制
func (f *fifo) Schedule(j Job) {
        select {
    case f.resume <- struct{}{}:
代码语言:javascript
复制
func (f *fifo) run() {
                for _, todo := range pendings {
          f.executeJob(todo, true)
        }

真正执行函数的方法是

代码语言:javascript
复制
func (f *fifo) executeJob(todo Job, updatedFinishedStats bool) {
        todo.Do(f.ctx)

server/etcdserver/bootstrap.go方法里面定义了raftNode的初始化方法,最终调用了raft算法的StartNode方法:

代码语言:javascript
复制
func (b *bootstrappedRaft) newRaftNode(ss *snap.Snapshotter, wal *wal.WAL, cl *membership.RaftCluster) *raftNode {
        if len(b.peers) == 0 {
    n = raft.RestartNode(b.config)
  } else {
    n = raft.StartNode(b.config, b.peers)
  }

transport的Start方法位于server/etcdserver/api/rafthttp/transport.go

代码语言:javascript
复制
func (t *Transport) Start() error {
        t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
  if err != nil {
    return err
  }
  t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)

raftNode的start方法定义于server/etcdserver/raft.go,相关有两个核心的结构体

代码语言:javascript
复制
  type raftNode struct {

代码语言:javascript
复制
type raftNodeConfig struct {
      raft.Node
  raftStorage *raft.MemoryStorage
代码语言:javascript
复制
func (r *raftNode) start(rh *raftReadyHandler) {
    go func() {
      for {
      select {
      case <-r.ticker.C:
        r.tick()
      case rd := <-r.Ready():
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-09-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档