开源代码memberlist源码分析

本文微信公众号链接:https://mp.weixin.qq.com/s/abY24PhBgNDJgh5m9Taq4w

memberlist是go语言开发的,基于Gossip协议来传播消息,用来管理分布式集群内节点发现、 节点失效探测、节点列表的软件包。

对于Gossip协议之前写过一篇文章: Gossip协议简介---病毒感染模型的p2p算法

源码地址 https://github.com/hashicorp/memberlist

为了学习memberlist的原理设计,遵循个人从低版本代码研究的习惯。这里一提交号fe04265为分析。

再次备注:学习早期版本,只是为了学习开源代码的设计原理,底层工作原理。以及版本在进化过程中,源码的改进。

源码目录:

整体代码风格像面向对象c的风格。模块划分刚好以文件名为划分

1、broadcast.go :广播模块

2、net.go:传输与协议处理模块

3、state.go:节点状态管理模块

4、memberlist.go:主模块

github.com/hashicorp/memberlist/memberlist.go

Memberlist

在结构体Memberlist中,成员变量也是按照功能不同分隔

type Memberlist struct {
config *Config  //配置
shutdown bool  //本地服务关闭的标志位
leave bool  //本节点退出的标志位

udpListener *net.UDPConn
tcpListener *net.TCPListener
//udp和tcp的链接管理。对应的net.go,传输与协议管理

sequenceNum uint32 // Local sequence number
//本地seq num
incarnation uint32 // Local incarnation number
//本地inc num

nodeLock sync.RWMutex
nodes []*NodeState // Known nodes
nodeMap map[string]*NodeState // Maps Addr.String() -> NodeState
//node管理以及state管理对应state.go

tickerLock sync.Mutex
tickers []*time.Ticker
stopTick chan struct{}
probeIndex int

ackLock sync.Mutex
ackHandlers map[uint32]*ackHandler

broadcastLock sync.Mutex
bcQueue broadcasts
//broadcast管理,对应broadcast.go
}

Config

在config中,前面是一些基本的配置项,注释也都有解释。

type Config struct {
Name string // Node name (FQDN)
BindAddr string // Binding address
UDPPort int // UDP port to listen on
TCPPort int // TCP port to listen on
TCPTimeout time.Duration // TCP timeout
IndirectChecks int // Number of indirect checks to use
RetransmitMult int // Retransmits = RetransmitMult * log(N+1)
SuspicionMult int // Suspicion time = SuspcicionMult * log(N+1) * Interval
PushPullInterval time.Duration // How often we do a Push/Pull update
RTT time.Duration // 99% precentile of round-trip-time
ProbeInterval time.Duration // Failure probing interval length

GossipNodes int // Number of nodes to gossip to per GossipInterval
GossipInterval time.Duration // Gossip interval for non-piggyback messages (only if GossipNodes > 0)

JoinCh chan<- *Node
LeaveCh chan<- *Node
}

在最后两行

JoinCh:这个是对外提供的一个接口,用于做新增node的时候,作为外部注册通知处理

LeaveCh:这个是对外提供的一个接口,用于做对去除一个node的时候,做为外部注册通知处理

这两个chan,在更早的版本中是在结构体memberlist中。后来移到了config中。

开始进入流程

Create

// Create will start memberlist and create a new gossip pool, but
// will not connect to an existing node. This should only be used
// for the first node in the cluster.
func Create(conf *Config) (*Memberlist, error) {
  m, err := newMemberlist(conf)
//newMemberlist,中开启了tcplisten和udplisten
  if err != nil {
  return nil, err
  }
  if err := m.setAlive(); err != nil {
  m.Shutdown()
  return nil, err
  }
  m.schedule()
//schedule中开启了三个服务:probe、pushpull、gossip
  return m, nil
}
这里面有两个重要步骤
1、newMemberlist
2、m.schedule
newMemberlist
// newMemberlist creates the network listeners.
// Does not schedule exeuction of background maintenence.
func newMemberlist(conf *Config) (*Memberlist, error) {
  
    tcpAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.TCPPort)
    tcpLn, err := net.Listen("tcp", tcpAddr)
    if err != nil {
        return nil, fmt.Errorf("Failed to start TCP listener. Err: %s", err)
    }
//上面是创建tcplisten

    udpAddr := fmt.Sprintf("%s:%d", conf.BindAddr, conf.UDPPort)
    udpLn, err := net.ListenPacket("udp", udpAddr)
    if err != nil {
        tcpLn.Close()
        return nil, fmt.Errorf("Failed to start UDP listener. Err: %s", err)
    }
//上面是创建udplisten


    m := &Memberlist{config: conf,
        udpListener: udpLn.(*net.UDPConn),
        tcpListener: tcpLn.(*net.TCPListener),
        nodeMap: make(map[string]*NodeState),
        stopTick: make(chan struct{}, 32),
        ackHandlers: make(map[uint32]*ackHandler),
      }//构建Memberlist实例

    go m.tcpListen() //开启tcp服务
    go m.udpListen() //开启udp服务
    return m, nil
}
在newMemberlist中,最主要的动作就是开启了tcp服务和udp服务
那么就看看net服务(tcp和udp)
github.com/hashicorp/memberlist/net.go
tcp
tcplisten
// tcpListen listens for and handles incoming connections
func (m *Memberlist) tcpListen() {
    for {
        //tcp accept
        conn, err := m.tcpListener.AcceptTCP()
        if err != nil {
            if m.shutdown {
                break
            }
            log.Printf("[ERR] Error accepting TCP connection: %s", err)
            continue
        }
        //每个链接都有一个处理部分handleConn
        go m.handleConn(conn)
    }
}
继续看
handleConn
// handleConn handles a single incoming TCP connection
func (m *Memberlist) handleConn(conn *net.TCPConn) {
    defer conn.Close()

//读取Remote的状态
    remoteNodes, err := readRemoteState(conn)
    if err != nil {
        log.Printf("[ERR] Failed to receive remote state: %s", err)
    return
    }
//发送本地节点的状态
    if err := m.sendLocalState(conn); err != nil {
        log.Printf("[ERR] Failed to push local state: %s", err)
    }
//将收到的Remote状态进行更新
    m.mergeState(remoteNodes)
}
tcp服务提供的功能就是:同步节点状态。
readRemoteState
读取节点状态信息,并返回
// recvRemoteState is used to read the remote state from a connection
func readRemoteState(conn net.Conn) ([]pushNodeState, error) {
// Read the message type
//读取数据
    buf := []byte{0}
    if _, err := conn.Read(buf); err != nil {
        return nil, err
    }
//读取消息类型
    msgType := uint8(buf[0])

// Quit if not push/pull
//支持push和pull消息
    if msgType != pushPullMsg {
        err := fmt.Errorf("received invalid msgType (%d)", msgType)
        return nil, err
    }

// Read the push/pull header
//解码
    var header pushPullHeader
    hd := codec.MsgpackHandle{}
    dec := codec.NewDecoder(conn, &hd)
    if err := dec.Decode(&header); err != nil {
        return nil, err
    }

// Allocate space for the transfer
//解码所有的节点信息
    remoteNodes := make([]pushNodeState, header.Nodes)
// Try to decode all the states
    for i := 0; i < header.Nodes; i++ {
        if err := dec.Decode(&remoteNodes[i]); err != nil {
            return remoteNodes, err
        }
    }
//返回节点状态信息
    return remoteNodes, nil
}
sendLocalState
发送本地存储的节点状态信息
// sendLocalState is invoked to send our local state over a tcp connection
func (m *Memberlist) sendLocalState(conn net.Conn) error {
// Prepare the local node state
//收集本地存储的节点状态信息
    m.nodeLock.RLock()
    localNodes := make([]pushNodeState, len(m.nodes))
    for idx, n := range m.nodes {
        localNodes[idx].Name = n.Name
        localNodes[idx].Addr = n.Addr
        localNodes[idx].Incarnation = n.Incarnation
        localNodes[idx].State = n.State
    }
    m.nodeLock.RUnlock()

//添加头部信息
// Send our node state
    header := pushPullHeader{Nodes: len(localNodes)}
    hd := codec.MsgpackHandle{}
    enc := codec.NewEncoder(conn, &hd)

// Begin state push
    conn.Write([]byte{pushPullMsg})
//编码并发送
    if err := enc.Encode(&header); err != nil {
        return err
    }
    for i := 0; i < header.Nodes; i++ {
        if err := enc.Encode(&localNodes[i]); err != nil {
            return err
        }
    }
    return nil
}
mergeState
更新节点状态
// mergeState is invoked by the network layer when we get a Push/Pull
// state transfer
func (m *Memberlist) mergeState(remote []pushNodeState) {
   for _, r := range remote {
      // Look for a matching local node
      m.nodeLock.RLock()
      local, ok := m.nodeMap[r.Name]
      m.nodeLock.RUnlock()

      // Skip if we agree on states
      if ok && local.State == r.State {
  //若状态与本地存储状态一直,则跳过
        continue
      }
//三种状态
      switch r.State {
//StateAlive
      case StateAlive:
         a := alive{Incarnation: r.Incarnation, Node: r.Name, Addr: r.Addr}
         m.aliveNode(&a)

//StateSupect
      case StateSuspect:
         s := suspect{Incarnation: r.Incarnation, Node: r.Name}
         m.suspectNode(&s)

//StateDead
      case StateDead:
         d := dead{Incarnation: r.Incarnation, Node: r.Name}
         m.deadNode(&d)
      }
   }
}
存在三种状态:
StateAlive:处理函数aliveNode
StateSupect:处理函数supectNode
StateDead:处理函数deadNode
这三者的处理在后续解读
tcp小结:
tcp链接,主要处理节点状态信息的同步与更新。
udp
udpListen
代码还是很简单的,不断读取数据,然后交给handleCommand处理
// udpListen listens for and handles incoming UDP packets
func (m *Memberlist) udpListen() {
   mainBuf := make([]byte, udpBufSize)
   var n int
   var addr net.Addr
   var err error
   for {
      // Reset buffer
      buf := mainBuf[0:udpBufSize]

      // Read a packet
//不断从udplisten中读取数据
      n, addr, err = m.udpListener.ReadFrom(buf)
      if err != nil {
         if m.shutdown {
            break
         }
         log.Printf("[ERR] Error reading UDP packet: %s", err)
         continue
      }

      // Check the length
      if n < 1 {
         log.Printf("[ERR] UDP packet too short (%d bytes). From: %s", len(buf), addr)
         continue
      }

      // Handle the command
//真正处理部分
    m.handleCommand(buf[:n], addr)
   }
}
handleCommand
func (m *Memberlist) handleCommand(buf []byte, from net.Addr) {
   // Decode the message type
//解码消息类型
   msgType := uint8(buf[0])
   buf = buf[1:]

   // Switch on the msgType
//根据消息不同消息类型,进行不同的处理
switch msgType {
   case compoundMsg:
      m.handleCompound(buf, from)
   case pingMsg:
      m.handlePing(buf, from)
   case indirectPingMsg:
      m.handleIndirectPing(buf, from)
   case ackRespMsg:
      m.handleAck(buf, from)
   case suspectMsg:
      m.handleSuspect(buf, from)
   case aliveMsg:
      m.handleAlive(buf, from)
   case deadMsg:
      m.handleDead(buf, from)
   default:
      log.Printf("[ERR] UDP msg type (%d) not supported. From: %s", msgType, from)
   }
}
一共有:
compoundMsg:处理函数为handleCompound
    多个消息聚合在一起,进行分割,然后再重新调用handleCommand
func (m *Memberlist) handleCompound(buf []byte, from net.Addr) {
   // Decode the parts
//消息分割   trunc, parts, err := decodeCompoundMessage(buf)
   if err != nil {
      log.Printf("[ERR] Failed to decode compound request: %s", err)
      return
   }

   // Log any truncation
   if trunc > 0 {
      log.Printf("[WARN] Compound request had %d truncated messages", trunc)
   }

   // Handle each message
   for _, part := range parts {
//分割的消息重新调用handleCommand
      m.handleCommand(part, from)
   }
}
pingMsg:处理函数为:handlePing
indirectPingMsg:处理函数为handleindirectPing
ackRespMsg:处理函数为handleAck
    上面三个消息,都比较简单
suspectMsg:处理函数handleSuspect
    调用的函数为suspectNode
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
   var sus suspect
   if err := decode(buf, &sus); err != nil {
      log.Printf("[ERR] Failed to decode suspect message: %s", err)
      return
   }
   m.suspectNode(&sus)
}
aliveMsg:处理函数handleAlive
    调用的函数为aliveNode
func (m *Memberlist) handleAlive(buf []byte, from net.Addr) {
   var live alive
   if err := decode(buf, &live); err != nil {
      log.Printf("[ERR] Failed to decode alive message: %s", err)
      return
   }
   m.aliveNode(&live)
}
deadMsg:处理函数handleDead
    调用的函数为deadNode
func (m *Memberlist) handleDead(buf []byte, from net.Addr) {
   var d dead
   if err := decode(buf, &d); err != nil {
      log.Printf("[ERR] Failed to decode dead message: %s", err)
      return
   }
   m.deadNode(&d)
}
udp小结:
udp服务提供了一些基本的Command操作
github.com/hashicorp/memberlist/state.go
节点状态信息管理
在github.com/hashicorp/memberlist/memberlist.go中
Create,最后调用的函数schedule
// Schedule is used to ensure the Tick is performed periodically
func (m *Memberlist) schedule() {
   m.tickerLock.Lock()
   defer m.tickerLock.Unlock()

   // Create a new probeTicker
//开启了probe协程
   if m.config.ProbeInterval > 0 {
      t := time.NewTicker(m.config.ProbeInterval)
      go m.triggerFunc(t.C, m.probe)
      m.tickers = append(m.tickers, t)
   }

   // Create a push pull ticker if needed
//开启了pushpull协程
   if m.config.PushPullInterval > 0 {
      t := time.NewTicker(m.config.PushPullInterval)
      go m.triggerFunc(t.C, m.pushPull)
      m.tickers = append(m.tickers, t)
   }

   // Create a gossip ticker if needed
//开启了gossip协程
   if m.config.GossipNodes > 0 {
      t := time.NewTicker(m.config.GossipInterval)
      go m.triggerFunc(t.C, m.gossip)
      m.tickers = append(m.tickers, t)
   }
}
在这里面一共开启了三个定时任务
probe、pushpull、gossip
probe

当节点启动后,每隔一定时间间隔,会选取一个节点对其发送PING消息,当PING消息失败后,会随机选取 IndirectChecks 个节点发起间接PING的请求和直接更其再发起一个tcp PING消息。 收到间接PING请求的节点会根据请求中的地址发起一个PING消息,将PING的结果返回给间接请求的源节点。 如果探测超时之间内,本节点没有收到任何一个要探测节点的ACK消息,则标记要探测的节点状态为suspect。 https://www.colabug.com/1010287.html

// Tick is used to perform a single round of failure detection and gossip
func (m *Memberlist) probe() {
   // Track the number of indexes we've considered probing
   numCheck := 0
START:
   // Make sure we don't wrap around infinitely
   if numCheck >= len(m.nodes) {
      return
   }

   // Handle the wrap around case
//probeIndex是node索引,循环进行探测
   if m.probeIndex >= len(m.nodes) {
      m.resetNodes()
      m.probeIndex = 0
      numCheck++
      goto START
   }

   // Determine if we should probe this node
   skip := false
   var node *NodeState
   m.nodeLock.RLock()

   node = m.nodes[m.probeIndex]
   if node.Name == m.config.Name {
      skip = true//当node在配置文件中
} else if node.State == StateDead {
      skip = true//当node为dead时候
}

   // Potentially skip
   m.nodeLock.RUnlock()
   if skip {//node在配置文件中或者为dead时候则跳过
      numCheck++
      m.probeIndex++
      goto START
   }

   // Probe the specific node
//进行probem.probeNode(node)
}
// probeNode handles a single round of failure checking on a node
func (m *Memberlist) probeNode(node *NodeState) {
   // Send a ping to the node
   ping := ping{SeqNo: m.nextSeqNo()}
   destAddr := &net.UDPAddr{IP: node.Addr, Port: m.config.UDPPort}

   // Setup an ack handler
   ackCh := make(chan bool, m.config.IndirectChecks+1)
   m.setAckChannel(ping.SeqNo, ackCh, m.config.ProbeInterval)

   // Send the ping message
//发送pingMsg
   if err := m.encodeAndSendMsg(destAddr, pingMsg, &ping); err != nil {
      log.Printf("[ERR] Failed to send ping: %s", err)
      return
   }

   // Wait for response or round-trip-time
   select {
   case v := <-ackCh:
      if v == true {
         return
      }
   case <-time.After(m.config.RTT):
   }

   // Get some random live nodes
   m.nodeLock.RLock()
   excludes := []string{m.config.Name, node.Name}
//随机获取一些节点
   kNodes := kRandomNodes(m.config.IndirectChecks, excludes, m.nodes)
   m.nodeLock.RUnlock()

   // Attempt an indirect ping
   ind := indirectPingReq{SeqNo: ping.SeqNo, Target: node.Addr}
   for _, peer := range kNodes {
      destAddr := &net.UDPAddr{IP: peer.Addr, Port: m.config.UDPPort}
//发送indirectPingMsg
      if err := m.encodeAndSendMsg(destAddr, indirectPingMsg, &ind); err != nil {
         log.Printf("[ERR] Failed to send indirect ping: %s", err)
      }
   }

   // Wait for the acks or timeout
   select {
   case v := <-ackCh:
      if v == true {
         return
      }
   }

   // No acks received from target, suspect
   s := suspect{Incarnation: node.Incarnation, Node: node.Name}
//若探测结果失败则将node设置为suspect
   m.suspectNode(&s)
}
pushpull

每隔一个时间间隔,随机选取一个节点,跟它建立tcp连接,然后将本地的全部节点 状态、用户数据发送过去,然后对端将其掌握的全部节点状态、用户数据发送回来,然后完成2份数据的合并。 此动作可以加速集群内信息的收敛速度。 https://www.jianshu.com/p/e2173b44db65

// pushPull is invoked periodically to randomly perform a state
// exchange. Used to ensure a high level of convergence.
func (m *Memberlist) pushPull() {
   // Get a random live node
   m.nodeLock.RLock()
   excludes := []string{m.config.Name}
//随机选取1个节点
   nodes := kRandomNodes(1, excludes, m.nodes)
   m.nodeLock.RUnlock()

   // If no nodes, bail
   if len(nodes) == 0 {
      return
   }
   node := nodes[0]

   // Attempt a push pull
//调用pushPullNode
if err := m.pushPullNode(node.Addr); err != nil {
      log.Printf("[ERR] Push/Pull with %s failed: %s", node.Name, err)
   }
}
上面随机选取一个节点
// pushPullNode is invoked to do a state exchange with
// a given node
func (m *Memberlist) pushPullNode(addr []byte) error {
   // Attempt to send and receive with the node
//发送并获取状态信息
   remote, err := m.sendAndReceiveState(addr)
   if err != nil {
      return nil
   }

   // Merge the state
//合并更新节点状态信息
   m.mergeState(remote)
   return nil
}
// sendState is used to initiate a push/pull over TCP with a remote node
func (m *Memberlist) sendAndReceiveState(addr []byte) ([]pushNodeState, error) {
   // Attempt to connect
//创建tcp client链接
   dialer := net.Dialer{Timeout: m.config.TCPTimeout}
   dest := net.TCPAddr{IP: addr, Port: m.config.TCPPort}
   conn, err := dialer.Dial("tcp", dest.String())
   if err != nil {
      return nil, err
   }

   // Send our state
//发送本地节点状态信息
   if err := m.sendLocalState(conn); err != nil {
      return nil, err
   }

   // Read remote state
//读取Remote节点状态信息并返回
remote, err := readRemoteState(conn)
   if err != nil {
      return nil, err
   }

   // Return the remote state
   return remote, nil
}
gossip

节点通过udp协议向K个节点发送消息,节点从广播队列里面获取消息,广播队列里的消息发送失败超过一定次数后,消息就会被丢弃。发送次数参考Config 里的 RetransmitMul的注释。 https://www.jianshu.com/p/e2173b44db65

// gossip is invoked every GossipInterval period to broadcast our gossip
// messages to a few random nodes.
func (m *Memberlist) gossip() {
   // Get some random live nodes
   m.nodeLock.RLock()
   excludes := []string{m.config.Name}
//随机获取gossipNodes配置项个数的节点
   kNodes := kRandomNodes(m.config.GossipNodes, excludes, m.nodes)
   m.nodeLock.RUnlock()

   // Compute the bytes available
   bytesAvail := udpSendBuf - compoundHeaderOverhead
   for _, node := range kNodes {
      // Get any pending broadcasts
//获取能够广播消息大小
      msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
      if len(msgs) == 0 {
         return
      }

      // Create a compound message
//创建一个合并的消息
      compound := makeCompoundMessage(msgs)

      // Send the compound message
      destAddr := &net.UDPAddr{IP: node.Addr, Port: m.config.UDPPort}
//发送消息
      if err := m.rawSendMsg(destAddr, compound); err != nil {
         log.Printf("[ERR] Failed to send gossip to %s: %s", destAddr, err)
      }
   }
}
那么节点的三个状态有
alive
用于标识活跃节点
aliveNode
// aliveNode is invoked by the network layer when we get a message
// about a live node
func (m *Memberlist) aliveNode(a *alive) {
   m.nodeLock.Lock()
   defer m.nodeLock.Unlock()
在节点信息中进行查找
   state, ok := m.nodeMap[a.Node]

   // Check if we've never seen this node before

  if !ok {
//以下为添加新的节点 
      state = &NodeState{
         Node: Node{
            Name: a.Node,
            Addr: a.Addr,
         },
         State: StateDead,
      }

      // Add to map
      m.nodeMap[a.Node] = state
      // Get a random offset. This is important to ensure
      // the failure detection bound is low on average. If all
      // nodes did an append, failure detection bound would be
      // very high.
      n := len(m.nodes)
      offset := randomOffset(n)

      // Add at the end and swap with the node at the offset
      m.nodes = append(m.nodes, state)
      m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]
   }

   // Bail if the incarnation number is old
//inc若是更新的则返回
if a.Incarnation <= state.Incarnation {
      return
   }

//inc信息为旧的,则广播alivemsg
   // Re-Broadcast
   m.encodeAndBroadcast(a.Node, aliveMsg, a)

   // Update the state and incarnation number
   oldState := state.State
   state.Incarnation = a.Incarnation
   if state.State != StateAlive {
      state.State = StateAlive
      state.StateChange = time.Now()
   }

//节点状态发生变化,则通知到joinch
   // if Dead -> Alive, notify of join
   if oldState == StateDead {
      notify(m.config.JoinCh, &state.Node)
   }
}
suspect

当探测一些节点失败时,或者suspect某个节点的信息时,会将本地对应的信息标记为suspect,然后启动一个 定时器,并发出一个suspect广播,此期间内如果收到其他节点发来的相同的suspect信息时,将本地suspect的 确认数+1,当定时器超时后,该节点信息仍然不是alive的,且确认数达到要求,会将该节点标记为dead。 当本节点收到别的节点发来的suspect消息时,会发送alive广播,从而清除其他节点上的suspect标记。 https://www.colabug.com/1010287.html

suspectNode

// suspectNode is invoked by the network layer when we get a message
// about a suspect node
func (m *Memberlist) suspectNode(s *suspect) {
   m.nodeLock.Lock()
   defer m.nodeLock.Unlock()
   state, ok := m.nodeMap[s.Node]

   // If we've never heard about this node before, ignore it
   if !ok {
      return
   }

   // Ignore old incarnation numbers
   if s.Incarnation < state.Incarnation {
      return
   }

   // Ignore non-alive nodes
   if state.State != StateAlive {
      return
   }

   // If this is us we need to refute, otherwise re-broadcast
   if state.Name == m.config.Name {
      inc := m.nextIncarnation()
      a := alive{Incarnation: inc, Node: state.Name, Addr: state.Addr}
      m.encodeAndBroadcast(s.Node, aliveMsg, a)

      state.Incarnation = inc
      return // Do not mark ourself suspect
   } else {
      m.encodeAndBroadcast(s.Node, suspectMsg, s)
   }

   // Update the state
   state.Incarnation = s.Incarnation
   state.State = StateSuspect
   changeTime := time.Now()
   state.StateChange = changeTime
   // Setup a timeout for this
   timeout := suspicionTimeout(m.config.SuspicionMult, len(m.nodes), m.config.ProbeInterval)
   time.AfterFunc(timeout, func() {
      if state.State == StateSuspect && state.StateChange == changeTime {
         m.suspectTimeout(state)
      }
   })
}
dead

当本节点离开集群时或者本地探测的其他节点超时被标记死亡,会向集群发送本节点dead广播。收到dead广播 消息的节点会跟本地的记录比较,当本地记录也是dead时会忽略消息,当本地的记录不是dead时,会删除本地 的记录再将dead消息再次广播出去,形成再次传播。 如果从其他节点收到自身的dead广播消息时,说明本节点相对于其他节点网络分区,此时会发起一个alive广播 以修正其他节点上存储的本节点数据。 https://www.colabug.com/1010287.html

deadNode
// deadNode is invoked by the network layer when we get a message
// about a dead node
func (m *Memberlist) deadNode(d *dead) {
   m.nodeLock.Lock()
   defer m.nodeLock.Unlock()
   state, ok := m.nodeMap[d.Node]

   // If we've never heard about this node before, ignore it
   if !ok {
      return
   }

   // Ignore old incarnation numbers
   if d.Incarnation < state.Incarnation {
      return
   }

   // Ignore if node is already dead
   if state.State == StateDead {
      return
   }

   // If this is us we need to refute, otherwise re-broadcast
   if state.Name == m.config.Name && !m.leave {
      inc := m.nextIncarnation()
      a := alive{Incarnation: inc, Node: state.Name, Addr: state.Addr}
      m.encodeAndBroadcast(d.Node, aliveMsg, a)

      state.Incarnation = inc
      return // Do not mark ourself dead
   } else {
      m.encodeAndBroadcast(d.Node, deadMsg, d)
   }

   // Update the state
   state.Incarnation = d.Incarnation
   state.State = StateDead
   state.StateChange = time.Now()

   // Notify of death
//将dead节点信息通知到LeaveCh
   notify(m.config.LeaveCh, &state.Node)
}
github.com/hashicorp/memberlist/broadcast.go
broadcast模块是广播模块
提供了三个函数,最主要的函数是
getBroadcasts
返回一个广播的最大size,主要是用于填充udp包。很简单代码如下
// getBroadcasts is used to return a slice of broadcasts to send up to
// a maximum byte size, while imposing a per-broadcast overhead. This is used
// to fill a UDP packet with piggybacked data
func (m *Memberlist) getBroadcasts(overhead, limit int) []*bytes.Buffer {
   m.broadcastLock.Lock()
   defer m.broadcastLock.Unlock()

   transmitLimit := retransmitLimit(m.config.RetransmitMult, len(m.nodes))
   bytesUsed := 0
   var toSend []*bytes.Buffer
   for i := len(m.bcQueue) - 1; i >= 0; i-- {
      // Check if this is within our limits
      b := m.bcQueue[i]
      if bytesUsed+overhead+b.msg.Len() > limit {
         continue
      }

      // Add to slice to send
      bytesUsed += overhead + b.msg.Len()
      toSend = append(toSend, b.msg)

      // Check if we should stop transmission
      b.transmits++
      if b.transmits >= transmitLimit {
         n := len(m.bcQueue)
         m.bcQueue[i], m.bcQueue[n-1] = m.bcQueue[n-1], nil
         m.bcQueue = m.bcQueue[:n-1]
      }
   }

   // If we are sending anything, we need to re-sort to deal
   // with adjusted transmit counts
   if len(toSend) > 0 {
      m.bcQueue.Sort()
   }

   return toSend
}
总结:
上面这个代码版本,算是一个比较小集的版本,功能模块分的比较清晰
但其还未提供对用户的使用接口。在其后续的版本中,模块代码做了一
些调整,并抽象出了一个接口,供给使用者。
关于Delegate的接口使用,后续会有新的文章案例来说明。

龚浩华

月牙寂道长

qq:29185807

2019年06月13日

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券