前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >etcd watch:etcd 如何实现 watch 机制?

etcd watch:etcd 如何实现 watch 机制?

作者头像
aoho求索
发布2022-04-28 08:08:05
3.1K0
发布2022-04-28 08:08:05
举报
文章被收录于专栏:aoho求索aoho求索

你好,我是 aoho,今天我和你分享的主题是 etcd watch:etcd 如何实现 watch 机制?

etcd v2 和 v3 版本之间的重要变化之一就是 watch 机制的优化。etcd v2 watch 机制采用的是基于 HTTP/1.x 协议的客户端轮询机制,历史版本存储则是通过滑动窗口。在大量的客户端连接的场景或者集群规模较大的场景,导致 etcd 服务端的扩展性和稳定性都无法保证。etcd v3 在此基础上进行优化,满足了 Kubernetes pods 部署和状态管理等业务场景诉求。

watch 是监听一个或一组 key,key 的任何变化都会发出消息。某种意义上讲,etcd 就是发布订阅模式。

Watch 的用法

在具体将讲解 Watch 的实现方式之前,我们先来体验下如何使用 Watch。通过 etcdctl 命令行工具实现键值对的检测:

代码语言:javascript
复制
$ etcdctl put hello aoho
$ etcdctl put hello boho
$ etcdctl watch hello -w=json --rev=1

{
 "Header": {
  "cluster_id": 14841639068965178418,
  "member_id": 10276657743932975437,
  "revision": 4,
  "raft_term": 4
 },
 "Events": [{
  "kv": {
   "key": "aGVsbG8=",
   "create_revision": 3,
   "mod_revision": 3,
   "version": 1,
   "value": "YW9obw=="
  }
 }, {
  "kv": {
   "key": "aGVsbG8=",
   "create_revision": 3,
   "mod_revision": 4,
   "version": 2,
   "value": "Ym9obw=="
  }
 }],
 "CompactRevision": 0,
 "Canceled": false,
 "Created": false
}

依次在命令行中输入上面三条命令,前面两条依次更新 hello 对应的值,第三条命令监测键为 hello 的变化,并指定版本号从 1 开始。结果输出了两条 watch 事件。我们接着在另一个命令行继续输入如下的更新命令:

代码语言:javascript
复制
$ etcdctl put hello coho

可以看到前一个命令行输出了如下的内容:

代码语言:javascript
复制
{
 "Header": {
  "cluster_id": 14841639068965178418,
  "member_id": 10276657743932975437,
  "revision": 5,
  "raft_term": 4
 },
 "Events": [{
  "kv": {
   "key": "aGVsbG8=",
   "create_revision": 3,
   "mod_revision": 5,
   "version": 3,
   "value": "Y29obw=="
  }
 }],
 "CompactRevision": 0,
 "Canceled": false,
 "Created": false
}

命令行输出的事件表明,键 hello 对应的键值对发生了更新,并输出了事件的详细信息。如上就是通过 etcdctl 客户端工具实现 watch 指定的键值对功能。接着我们看下,clientv3 中是如何实现 watch 功能。

代码语言:javascript
复制
func testWatch() {
    s := newWatchableStore()

    w := s.NewWatchStream()

    w.Watch(start_key: foo, end_key: nil)

    w.Watch(start_key: bar, end_key: nil)

    for {
        consume := <- w.Chan()
    }
}

etcd 的 mvcc 模块对外提供了两种访问键值对的实现,一种是键值存储 kvstore,另一种是 watchableStore。它们都实现了 KV 接口,KV 接口的具体实现则是 store 结构体。在上面的实现中,我们先调用了 watchableStore。

当我们要使用 Watch 功能时,我们创建了一个 watchStream。创建出来的 w 可以监听的键为 hello,之后我们就可以消费 w.Chan() 返回的 channel。键为 hello 的任何变化,都会通过这个 channel 发送给客户端。

可以看到 watchStream 实现了在大量 kv 的变化中,过滤出当前所监听的 key,将 key 的变化输出。

watchableStore 存储

在前面的课时已经介绍过 kvstore,这里我们介绍 watchableStore 的实现。Watch 的实现是在 store 上封装了一层叫做 watchableStore,重写了 store 的 Write 方法。

代码语言:javascript
复制
// 位于 mvcc/watchable_store_txn.go:22
func (tw *watchableStoreTxnWrite) End() {
 changes := tw.Changes()
 if len(changes) == 0 {
  tw.TxnWrite.End()
  return
 }

 rev := tw.Rev() + 1
 evs := make([]mvccpb.Event, len(changes))
 for i, change := range changes {
  evs[i].Kv = &changes[i]
  if change.CreateRevision == 0 {
   evs[i].Type = mvccpb.DELETE
   evs[i].Kv.ModRevision = rev
  } else {
   evs[i].Type = mvccpb.PUT
  }
 }

 // end write txn under watchable store lock so the updates are visible
 // when asynchronous event posting checks the current store revision
 tw.s.mu.Lock()
 tw.s.notify(rev, evs)
 tw.TxnWrite.End()
 tw.s.mu.Unlock()
}

type watchableStoreTxnWrite struct {
 TxnWrite
 s *watchableStore
}

func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {
 return &watchableStoreTxnWrite{s.store.Write(trace), s}
}

通过 MVCC 中介绍,store 的任何写操作,都需要 Write 方法返回的 TxnWrite。所以这里重写 Write 方法意味着任何写操作都会经过 watchableStore。从上面的代码不难看出,watchableStoreTxnWrite 在事务提交时,先将本次变更 changes 打包成 Event,然后调用 notify 来将变更通知出去。最后真正提交事务 TxnWrite.End()。

Watch 负责了注册、管理以及触发 Watcher 的功能。我们先来看一下这个结构体的各个字段:

代码语言:javascript
复制
// 位于 mvcc/watchable_store.go:47
type watchableStore struct {
 *store

 // 同步读写锁
 mu sync.RWMutex

 // 被阻塞在 watch channel 中的 watcherBatch
 victims []watcherBatch
 victimc chan struct{}

 // 未同步的 watchers
 unsynced watcherGroup

 // 已同步的 watchers
 synced watcherGroup

 stopc chan struct{}
 wg    sync.WaitGroup
}

每一个 watchableStore 其实都组合了来自 store 结构体的字段和方法,除此之外,还有两个 watcherGroup 类型的字段,watcherGroup 管理多个 watcher,能够根据 key 快速找到监听该 key 的一个或多个 watcher。其中 unsynced 用于存储未同步完成的实例,synced 用于存储已经同步完成的实例。

根据 watchableStore 的定义,我们可以描述 Watch 监听的过程。

watchableStore 收到了所有 key 的变更后,将这些 key 交给 synced(watchGroup),synced 能够快速地从所有 key 中找到监听的 key。将这些 key 发送给对应的 watcher,这些 watcher 再通过 chan 将变更信息发送出去。

synced 是怎么快速找到符合条件的 key 呢?etcd 中使用了 map 和 adt(红黑树)来实现。

不单独使用 map 是因为 watch 可以监听一个范围的 key。如果只监听一个 key:

代码语言:javascript
复制
watch(start_key: foo, end_key: nil)

则对应的存储为 map[key]*watcher。这样可以根据 key 快速找到对应的 watcher,etcd 也是这样做的。但对于一组 key 呢?

代码语言:javascript
复制
watch(start_key: foo, end_key: fop)

这里我监听了从 foo->fop 之间的所有 key,理论上这些 key 的数目是无限的,所以无法再使用 map。比如:key=fooac 也属于监听范围。etcd 用 adt 来存储这种 key。

代码语言:javascript
复制
// 位于 mvcc/watcher_group.go:147
// watcherGroup 是由一系列范围 watcher 组织起来的 watchers
type watcherGroup struct {
 // keyWatchers has the watchers that watch on a single key
 keyWatchers watcherSetByKey
 // ranges has the watchers that watch a range; it is sorted by interval
 ranges adt.IntervalTree
 // watchers is the set of all watchers
 watchers watcherSet
}

adt 的实现这里不做介绍,只用知道 adt 能够根据 key=fooac 快速地找到所属范围 foo->fop。在找到 watcher 后,调用 watcher 的 send() 方法,将变更的 Event 发送出去。

syncWatchers 同步监听

在初始化一个新的 watchableStore 时,etcd 会创建一个用于同步 watcherGroup 的 Goroutine,在 syncWatchersLoop 这个循环中会每隔 100ms 调用一次 syncWatchers 方法,将所有未通知的事件通知给所有的监听者,这可以说是整个模块的核心:

代码语言:javascript
复制
// 位于 mvcc/watchable_store.go:334
func (s *watchableStore) syncWatchers() int {
 s.mu.Lock()
 defer s.mu.Unlock()

 if s.unsynced.size() == 0 {
  return 0
 }

 s.store.revMu.RLock()
 defer s.store.revMu.RUnlock()

 // in order to find key-value pairs from unsynced watchers, we need to
 // find min revision index, and these revisions can be used to
 // query the backend store of key-value pairs
 curRev := s.store.currentRev
 compactionRev := s.store.compactMainRev

 wg, minRev := s.unsynced.choose(maxWatchersPerSync, curRev, compactionRev)
 minBytes, maxBytes := newRevBytes(), newRevBytes()
 revToBytes(revision{main: minRev}, minBytes)
 revToBytes(revision{main: curRev + 1}, maxBytes)

 // UnsafeRange returns keys and values. And in boltdb, keys are revisions.
 // values are actual key-value pairs in backend.
 tx := s.store.b.ReadTx()
 tx.RLock()
 revs, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
 var evs []mvccpb.Event
 evs = kvsToEvents(s.store.lg, wg, revs, vs)
 tx.RUnlock()

 var victims watcherBatch
 wb := newWatcherBatch(wg, evs)
 for w := range wg.watchers {
  w.minRev = curRev + 1

  eb, ok := wb[w]
  if !ok {
   // bring un-notified watcher to synced
   s.synced.add(w)
   s.unsynced.delete(w)
   continue
  }

  if eb.moreRev != 0 {
   w.minRev = eb.moreRev
  }

  if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: curRev}) {
   pendingEventsGauge.Add(float64(len(eb.evs)))
  } else {
   if victims == nil {
    victims = make(watcherBatch)
   }
   w.victim = true
  }

  if w.victim {
   victims[w] = eb
  } else {
   if eb.moreRev != 0 {
    // stay unsynced; more to read
    continue
   }
   s.synced.add(w)
  }
  s.unsynced.delete(w)
 }
 s.addVictim(victims)

 vsz := 0
 for _, v := range s.victims {
  vsz += len(v)
 }
 slowWatcherGauge.Set(float64(s.unsynced.size() + vsz))

 return s.unsynced.size()
}

简化后的 syncWatchers 方法中总共做了三件事情,首先是根据当前的版本从未同步的 watcherGroup 中选出一些待处理的任务,然后从 BoltDB 中取当前版本范围内的数据变更并将它们转换成事件,事件和 watcherGroup 在打包之后会通过 send 方法发送到每一个 watcher 对应的 Channel 中。

客户端监听事件

客户端监听键值对时,调用的正是 Watch 方法,Watch 在 stream 中创建一个新的 watcher,并返回对应的 WatchID。

代码语言:javascript
复制
// 位于 mvcc/watcher.go:108
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
 // 防止出现 ket>= end 的错误范围情况
 if len(end) != 0 && bytes.Compare(key, end) != -1 {
  return -1, ErrEmptyWatcherRange
 }

 ws.mu.Lock()
 defer ws.mu.Unlock()
 if ws.closed {
  return -1, ErrEmptyWatcherRange
 }

 if id == AutoWatchID {
  for ws.watchers[ws.nextID] != nil {
   ws.nextID++
  }
  id = ws.nextID
  ws.nextID++
 } else if _, ok := ws.watchers[id]; ok {
  return -1, ErrWatcherDuplicateID
 }

 w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)

 ws.cancels[id] = c
 ws.watchers[id] = w
 return id, nil
}

AutoWatchID 是 WatchStream 中传递的观察者 ID。当用户没有提供可用的 ID 时,如果有传递该值,etcd 将自动分配一个 ID。如果传递的 ID 已经存在,则会返回 ErrWatcherDuplicateID 错误。watchable_store.go 中的 watch 实现是监听的具体实现,实现代码如下:

代码语言:javascript
复制
// 位于 mvcc/watchable_store.go:120
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
 // 构建 watcher
 wa := &watcher{
  key:    key,
  end:    end,
  minRev: startRev,
  id:     id,
  ch:     ch,
  fcs:    fcs,
 }

 s.mu.Lock()
 s.revMu.RLock()
 synced := startRev > s.store.currentRev || startRev == 0
 if synced {
  wa.minRev = s.store.currentRev + 1
  if startRev > wa.minRev {
   wa.minRev = startRev
  }
 }
 if synced {
  s.synced.add(wa)
 } else {
  slowWatcherGauge.Inc()
  s.unsynced.add(wa)
 }
 s.revMu.RUnlock()
 s.mu.Unlock()
 // prometheus 的指标增加
 watcherGauge.Inc()

 return wa, func() { s.cancelWatcher(wa) }
}

对 watchableStore 进行操作之前,需要加锁。当 etcd 收到客户端的 watch 请求,如果请求携带了 revision 参数,则比较请求的 revision 和 store 当前的 revision,如果大于当前 revision,则放入 synced 组中,否则放入 unsynced 组。

服务端处理监听

当 etcd 服务启动时,会在服务端运行一个用于处理监听事件的 watchServer gRPC 服务,客户端的 Watch 请求最终都会被转发到这个服务的 Watch 函数中:

代码语言:javascript
复制
// 位于 etcdserver/api/v3rpc/watch.go:140
func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
 sws := serverWatchStream{
  lg: ws.lg,

  clusterID: ws.clusterID,
  memberID:  ws.memberID,

  maxRequestBytes: ws.maxRequestBytes,

  sg:        ws.sg,
  watchable: ws.watchable,
  ag:        ws.ag,

  gRPCStream:  stream,
  watchStream: ws.watchable.NewWatchStream(),
  // chan for sending control response like watcher created and canceled.
  ctrlStream: make(chan *pb.WatchResponse, ctrlStreamBufLen),

  progress: make(map[mvcc.WatchID]bool),
  prevKV:   make(map[mvcc.WatchID]bool),
  fragment: make(map[mvcc.WatchID]bool),

  closec: make(chan struct{}),
 }

 sws.wg.Add(1)
 go func() {
  sws.sendLoop()
  sws.wg.Done()
 }()

 errc := make(chan error, 1)
 // Ideally recvLoop would also use sws.wg to signal its completion
 // but when stream.Context().Done() is closed, the stream's recv
 // may continue to block since it uses a different context, leading to
 // deadlock when calling sws.close().
 go func() {
  if rerr := sws.recvLoop(); rerr != nil {
   if isClientCtxErr(stream.Context().Err(), rerr) {
    sws.lg.Debug("failed to receive watch request from gRPC stream", zap.Error(rerr))
   } else {
    sws.lg.Warn("failed to receive watch request from gRPC stream", zap.Error(rerr))
    streamFailures.WithLabelValues("receive", "watch").Inc()
   }
   errc <- rerr
  }
 }()

 select {
 case err = <-errc:
  close(sws.ctrlStream)

 case <-stream.Context().Done():
  err = stream.Context().Err()
  // the only server-side cancellation is noleader for now.
  if err == context.Canceled {
   err = rpctypes.ErrGRPCNoLeader
  }
 }

 sws.close()
 return err
}

当客户端想要通过 Watch 结果监听某一个 Key 或者一个范围的变动,在每一次客户端调用服务端上述方式都会创建两个 Goroutine,其中一个协程会负责向监听者发送数据变动的事件,另一个协程会负责处理客户端发来的事件。

服务端 recvLoop

recvLoop 协程主要用来负责处理客户端发来的事件。

代码语言:javascript
复制
// 位于 etcdserver/api/v3rpc/watch.go:216
func (sws *serverWatchStream) recvLoop() error {
 for {
  req, err := sws.gRPCStream.Recv()
  if err == io.EOF {
   return nil
  }
  if err != nil {
   return err
  }

  switch uv := req.RequestUnion.(type) {
  case *pb.WatchRequest_CreateRequest:
   if uv.CreateRequest == nil {
    break
   }

   creq := uv.CreateRequest
   if len(creq.Key) == 0 {
    // \x00 is the smallest key
    creq.Key = []byte{0}
   }
   if len(creq.RangeEnd) == 0 {
    // force nil since watchstream.Watch distinguishes
    // between nil and []byte{} for single key / >=
    creq.RangeEnd = nil
   }
   if len(creq.RangeEnd) == 1 && creq.RangeEnd[0] == 0 {
    // support  >= key queries
    creq.RangeEnd = []byte{}
   }

   if !sws.isWatchPermitted(creq) {
    wr := &pb.WatchResponse{
     Header:       sws.newResponseHeader(sws.watchStream.Rev()),
     WatchId:      creq.WatchId,
     Canceled:     true,
     Created:      true,
     CancelReason: rpctypes.ErrGRPCPermissionDenied.Error(),
    }

    select {
    case sws.ctrlStream <- wr:
    case <-sws.closec:
    }
    return nil
   }

   filters := FiltersFromRequest(creq)

   wsrev := sws.watchStream.Rev()
   rev := creq.StartRevision
   if rev == 0 {
    rev = wsrev + 1
   }
   id, err := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
   if err == nil {
    sws.mu.Lock()
    if creq.ProgressNotify {
     sws.progress[id] = true
    }
    if creq.PrevKv {
     sws.prevKV[id] = true
    }
    if creq.Fragment {
     sws.fragment[id] = true
    }
    sws.mu.Unlock()
   }
   wr := &pb.WatchResponse{
    Header:   sws.newResponseHeader(wsrev),
    WatchId:  int64(id),
    Created:  true,
    Canceled: err != nil,
   }
   if err != nil {
    wr.CancelReason = err.Error()
   }
   select {
   case sws.ctrlStream <- wr:
   case <-sws.closec:
    return nil
   }

  case *pb.WatchRequest_CancelRequest:
   if uv.CancelRequest != nil {
    id := uv.CancelRequest.WatchId
    err := sws.watchStream.Cancel(mvcc.WatchID(id))
    if err == nil {
     sws.ctrlStream <- &pb.WatchResponse{
      Header:   sws.newResponseHeader(sws.watchStream.Rev()),
      WatchId:  id,
      Canceled: true,
     }
     sws.mu.Lock()
     delete(sws.progress, mvcc.WatchID(id))
     delete(sws.prevKV, mvcc.WatchID(id))
     delete(sws.fragment, mvcc.WatchID(id))
     sws.mu.Unlock()
    }
   }
  case *pb.WatchRequest_ProgressRequest:
   if uv.ProgressRequest != nil {
    sws.ctrlStream <- &pb.WatchResponse{
     Header:  sws.newResponseHeader(sws.watchStream.Rev()),
     WatchId: -1, // response is not associated with any WatchId and will be broadcast to all watch channels
    }
   }
  default:
   // we probably should not shutdown the entire stream when
   // receive an valid command.
   // so just do nothing instead.
   continue
  }
 }
}

在用于处理客户端的 recvLoop 方法中调用了 mvcc 模块暴露出的 watchStream.Watch 方法,该方法会返回一个可以用于取消监听事件的 watchID;当 gRPC 流已经结束后者出现错误时,当前的循环就会返回,两个 Goroutine 也都会结束。

服务端 sendLoop

如果出现了更新或者删除事件,就会被发送到 watchStream 持有的 Channel 中,而 sendLoop 会通过 select 来监听多个 Channel 中的数据并将接收到的数据封装成 pb.WatchResponse 结构并通过 gRPC 流发送给客户端:

代码语言:javascript
复制
// 位于 etcdserver/api/v3rpc/watch.go:332
func (sws *serverWatchStream) sendLoop() {
 // watch ids that are currently active
for {
  select {
  case wresp, ok := <-sws.watchStream.Chan():
   evs := wresp.Events
   events := make([]*mvccpb.Event, len(evs))
   for i := range evs {
    events[i] = &evs[i]   }

   canceled := wresp.CompactRevision != 0
   wr := &pb.WatchResponse{
    Header:          sws.newResponseHeader(wresp.Revision),
    WatchId:         int64(wresp.WatchID),
    Events:          events,
    CompactRevision: wresp.CompactRevision,
    Canceled:        canceled,
   }

   sws.gRPCStream.Send(wr)

  case c, ok := <-sws.ctrlStream: // ...
  case <-progressTicker.C: // ...
  case <-sws.closec:
   return
  }
 }
}

对于每一个 Watch 请求来说,watchServer 会根据请求创建两个用于处理当前请求的 Goroutine,这两个协程会与更底层的 mvcc 模块协作提供监听和回调功能:

到这里,我们对于 Watch 功能的介绍就差不多结束了,从对外提供的接口到底层的使用的数据结构以及具体实现,其他与 Watch 功能相关的话题可以直接阅读 etcd 的源代码了解更加细节的实现。

Watch 异常场景

上述是正常流程,但是会有很多不正常的情况发生。可以知道,消息都是通过一个 Chan 发送出去,但如果消费者消费速度慢,Chan 就容易堆积。Chan 的空间不可能无限大,那就必然会有满的时候,满了后该怎么办呢?

接下来就要讨论前面小结所提及的 unsynced、victims 数组的作用。首先思考下 Chan 什么时候会满呢?

代码语言:javascript
复制
var (
 // chanBufLen is the length of the buffered chan
 // for sending out watched events.
 // TODO: find a good buf value. 1024 is just a random one that
 // seems to be reasonable.
 chanBufLen = 1024

 // maxWatchersPerSync is the number of watchers to sync in a single batch
 maxWatchersPerSync = 512
)

代码中 Chan 的长度是 1024。不过这也是一个随机值,只是没有现在更好的选择。

chan 一旦满了,会发生以下操作:

代码语言:javascript
复制
// 位于 mvcc/watchable_store.go:438
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
 var victim watcherBatch
 for w, eb := range newWatcherBatch(&s.synced, evs) {
  if eb.revs != 1 {
   s.store.lg.Panic(
    "unexpected multiple revisions in watch notification",
    zap.Int("number-of-revisions", eb.revs),
   )
  }
  if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
   pendingEventsGauge.Add(float64(len(eb.evs)))
  } else {
   // move slow watcher to victims
   w.minRev = rev + 1
   if victim == nil {
    victim = make(watcherBatch)
   }
   w.victim = true
   victim[w] = eb
   s.synced.delete(w)
   slowWatcherGauge.Inc()
  }
 }
 s.addVictim(victim)
}

notify 通知一个事实,即在给定修订版中的给定事件只是发生在监视事件键的观察者身上。watcher 会记录当前的 Revision,并将自身标记为受损的。此次的变更操作会被保存到 watchableStore 的 victims 中。同时该 watcher 会被从 synced 踢出。

假设此时有一个写操作:foo=f1。而正好 Chan 此时刚满,则监听 foo 的 watcher 将从 synced 中踢出,同时 foo=f1 被保存到 victims 中。

接下来对 foo 的任何变更,该 watcher 都不会记录。那这些消息就都丢掉了吗?当然不是,watcher 变成受损状态时记录下了当时的 Revision,这个很重要。

syncVictimsLoop 清除 victims

在上面的场景中,我们知道,队列满时,当时变更的 Event 被放入了 victims 中。这个协程就会试图清除这个 Event。怎么清除呢?协程会不断尝试让 watcher 发送这个 Event,一旦队列不满,watcher 将这个 Event 发出后。该 watcher 就被划入了 unsycned 中,同时不再是受损状态。

代码语言:javascript
复制
// 位于 mvcc/watchable_store.go:246
// syncVictimsLoop tries to write precomputed watcher responses to
// watchers that had a blocked watcher channel
func (s *watchableStore) syncVictimsLoop() {
 defer s.wg.Done()

 for {
  for s.moveVictims() != 0 {
   // try to update all victim watchers
  }
  s.mu.RLock()
  isEmpty := len(s.victims) == 0
  s.mu.RUnlock()

  var tickc <-chan time.Time
  if !isEmpty {
   tickc = time.After(10 * time.Millisecond)
  }

  select {
  case <-tickc:
  case <-s.victimc:
  case <-s.stopc:
   return
  }
 }
}

// moveVictims tries to update watches with already pending event data
func (s *watchableStore) moveVictims() (moved int) {
 s.mu.Lock()
 victims := s.victims
 s.victims = nil
 s.mu.Unlock()

 var newVictim watcherBatch
 for _, wb := range victims {
  // try to send responses again
  for w, eb := range wb {
   // watcher has observed the store up to, but not including, w.minRev
   rev := w.minRev - 1
   if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
    pendingEventsGauge.Add(float64(len(eb.evs)))
   } else {
    if newVictim == nil {
     newVictim = make(watcherBatch)
    }
    newVictim[w] = eb
    continue
   }
   moved++
  }

  // assign completed victim watchers to unsync/sync
  s.mu.Lock()
  s.store.revMu.RLock()
  curRev := s.store.currentRev
  for w, eb := range wb {
   if newVictim != nil && newVictim[w] != nil {
    // couldn't send watch response; stays victim
    continue
   }
   w.victim = false
   if eb.moreRev != 0 {
    w.minRev = eb.moreRev
   }
   if w.minRev <= curRev {
    s.unsynced.add(w)
   } else {
    slowWatcherGauge.Dec()
    s.synced.add(w)
   }
  }
  s.store.revMu.RUnlock()
  s.mu.Unlock()
 }

 if len(newVictim) > 0 {
  s.mu.Lock()
  s.victims = append(s.victims, newVictim)
  s.mu.Unlock()
 }

 return moved
}

此时 syncWatchersLoop 协程就开始起作用。由于在受损状态下,这个 watcher 已经错过了很多消息。为了追回进度,协程会根据 watcher 保存的 Revision,找出受损之后所有的消息,将关于 foo 的消息全部给 watcher,当 watcher 将这些消息都发送出去后。watcher 就脱离了 unsynced,成为了 synced。

至此就解决了 Chan 满导致的问题。同时也阐明了 Watch 的设计实现。

小结

watch 可以用来监听一个或一组 key,key 的任何变化都会发出事件消息。某种意义上讲,etcd 也是一种发布订阅模式。

我们通过介绍 watch 的用法,引入对 etcd watch 机制实现的分析和讲解。watchableStore 负责了注册、管理以及触发 Watcher 的功能。watchableStore 将 watcher 划分为 synced 、unsynced 以及异常状态下的 victim 三类。在 etcd 启动时,WatchableKV 模块启动了 syncWatchersLoop 和 syncVictimsLoop 异步 goroutine,用以负责不同场景下的事件推送,并提供了事件重试机制,保证事件都能发送出去给到客户端。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-04-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 aoho求索 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Watch 的用法
  • watchableStore 存储
  • syncWatchers 同步监听
  • 客户端监听事件
  • 服务端处理监听
  • 服务端 recvLoop
  • 服务端 sendLoop
  • Watch 异常场景
  • syncVictimsLoop 清除 victims
相关产品与服务
命令行工具
腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档