前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:etcd(17)mvcc

golang源码分析:etcd(17)mvcc

作者头像
golangLeetcode
发布2023-09-20 08:30:13
1730
发布2023-09-20 08:30:13
举报

mvcc目录的server/storage/mvcc/hash.go定义了哈希方法

代码语言:javascript
复制
type kvHasher struct {
  hash            hash.Hash32
  compactRevision int64
  revision        int64
  keep            map[revision]struct{}
}
代码语言:javascript
复制
func newKVHasher(compactRev, rev int64, keep map[revision]struct{}) kvHasher {

向buf里面写入k/v,然后计算对应的哈希值

代码语言:javascript
复制
func (h *kvHasher) WriteKeyValue(k, v []byte) {
        h.hash.Write(k)
  h.hash.Write(v)
代码语言:javascript
复制
func (h *kvHasher) Hash() KeyValueHash {
代码语言:javascript
复制
type KeyValueHash struct {
  Hash            uint32
  CompactRevision int64
  Revision        int64
}

计算整个后端存储的hash接口,主要是用在测试的时候来做下验证,生产中用有性能问题

代码语言:javascript
复制
type HashStorage interface {
  // Hash computes the hash of the whole backend keyspace,
  // including key, lease, and other buckets in storage.
  // This is designed for testing ONLY!
  // Do not rely on this in production with ongoing transactions,
  // since Hash operation does not hold MVCC locks.
  // Use "HashByRev" method instead for "key" bucket consistency checks.
  Hash() (hash uint32, revision int64, err error)


  // HashByRev computes the hash of all MVCC revisions up to a given revision.
  HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error)


  // Store adds hash value in local cache, allowing it can be returned by HashByRev.
  Store(valueHash KeyValueHash)


  // Hashes returns list of up to `hashStorageMaxSize` newest previously stored hashes.
  Hashes() []KeyValueHash
}
代码语言:javascript
复制
type hashStorage struct {
  store  *store
  hashMu sync.RWMutex
  hashes []KeyValueHash
  lg     *zap.Logger
}
代码语言:javascript
复制
func (s *hashStorage) Hashes() []KeyValueHash {

server/storage/mvcc/index.go的index接口定义了索引的增删改查约定。

代码语言:javascript
复制
type index interface {
  Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
  Range(key, end []byte, atRev int64) ([][]byte, []revision)
  Revisions(key, end []byte, atRev int64, limit int) ([]revision, int)
  CountRevisions(key, end []byte, atRev int64) int
  Put(key []byte, rev revision)
  Tombstone(key []byte, rev revision) error
  Compact(rev int64) map[revision]struct{}
  Keep(rev int64) map[revision]struct{}
  Equal(b index) bool


  Insert(ki *keyIndex)
  KeyIndex(ki *keyIndex) *keyIndex
}

具体实现采用了google实现的BTree("github.com/google/btree")的范型版本,BTree里存的元素类型是keyIndex的指针

代码语言:javascript
复制
type treeIndex struct {
  sync.RWMutex
  tree *btree.BTreeG[*keyIndex]
  lg   *zap.Logger
}

初始化方法如下

代码语言:javascript
复制
func newTreeIndex(lg *zap.Logger) inde
  return &treeIndex{
    tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
      return aki.Less(bki)
    }),
    lg: lg,
  }x {

如果存在在索引里就不用插入,否则调用ReplaceOrInsert方法插入

代码语言:javascript
复制
func (ti *treeIndex) Put(key []byte, rev revision) {
      okeyi, ok := ti.tree.Get(keyi)
      if !ok {
    keyi.put(ti.lg, rev.main, rev.sub)
    ti.tree.ReplaceOrInsert(keyi)
      okeyi.put(ti.lg, rev.main, rev.sub)
代码语言:javascript
复制
func (ti *treeIndex) Get(key []byte, atRev int64) (modified, created revision, ver int64, err error) {
代码语言:javascript
复制
func (ti *treeIndex) unsafeGet(key []byte, atRev int64) (modified, created revision, ver int64, err error) {

其它方法类似,不再详细介绍

代码语言:javascript
复制
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
代码语言:javascript
复制
func (ti *treeIndex) Range(key, end []byte, atRev int64) (keys [][]byte, revs []revision) {
代码语言:javascript
复制
func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {

插入方法Insert和Put类似,只是没有前面查一遍的逻辑

代码语言:javascript
复制
func (ti *treeIndex) Insert(ki *keyIndex) {
  ti.Lock()
  defer ti.Unlock()
  ti.tree.ReplaceOrInsert(ki)
}

server/storage/mvcc/key_index.go实现了keyIndex,用于上面的treeIndex的索引的具体存储

代码语言:javascript
复制
type keyIndex struct {
  key         []byte
  modified    revision // the main rev of the last modification
  generations []generation
}

它有一个核心属性generation,增删改查都是围绕它来操作的

代码语言:javascript
复制
func (ki *keyIndex) put(lg *zap.Logger, main int64, sub int64) {
代码语言:javascript
复制
func (ki *keyIndex) restore(lg *zap.Logger, created, modified revision, ver int64) {
代码语言:javascript
复制
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
代码语言:javascript
复制
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
代码语言:javascript
复制
func (ki *keyIndex) compact(lg *zap.Logger, atRev int64, available map[revision]struct{}) {
代码语言:javascript
复制
func (ki *keyIndex) keep(atRev int64, available map[revision]struct{}) {
代码语言:javascript
复制
type generation struct {
  ver     int64
  created revision // when the generation is created (put in first revision).
  revs    []revision
}

其中的revision定义了版本,包括主次两个版本号

server/storage/mvcc/kv_view.go定义了mvcc的读视图

代码语言:javascript
复制
    type readView struct{ kv KV }

所有的方法都是对KV的方法的一个包装

代码语言:javascript
复制
func (rv *readView) FirstRev() int64 {
  tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())
代码语言:javascript
复制
func (rv *readView) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
  tr := rv.kv.Read(ConcurrentReadTxMode, traceutil.TODO())

写视图也是一样的

代码语言:javascript
复制
type writeView struct{ kv KV }
代码语言:javascript
复制
func (wv *writeView) DeleteRange(key, end []byte) (n, rev int64) {
  tw := wv.kv.Write(traceutil.TODO())
代码语言:javascript
复制
func (wv *writeView) Put(key, value []byte, lease lease.LeaseID) (rev int64) {
  tw := wv.kv.Write(traceutil.TODO())

对应的KV是一个接口定义于server/storage/mvcc/kv.go,它内嵌了两个接口ReadView和WriteView。这个文件里同样包含了这两个接口的定义

代码语言:javascript
复制
 type KV interface {
  ReadView
  WriteView


  // Read creates a read transaction.
  Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead


  // Write creates a write transaction.
  Write(trace *traceutil.Trace) TxnWrite


  // HashStorage returns HashStorage interface for KV storage.
  HashStorage() HashStorage


  // Compact frees all superseded keys with revisions less than rev.
  Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error)


  // Commit commits outstanding txns into the underlying backend.
  Commit()


  // Restore restores the KV store from a backend.
  Restore(b backend.Backend) error
  Close() error
}

其中读视图

代码语言:javascript
复制
type ReadView interface {
  // FirstRev returns the first KV revision at the time of opening the txn.
  // After a compaction, the first revision increases to the compaction
  // revision.
  FirstRev() int64

  // Rev returns the revision of the KV at the time of opening the txn.
  Rev() int64

  // Range gets the keys in the range at rangeRev.
  // The returned rev is the current revision of the KV when the operation is executed.
  // If rangeRev <=0, range gets the keys at currentRev.
  // If `end` is nil, the request returns the key.
  // If `end` is not nil and not empty, it gets the keys in range [key, range_end).
  // If `end` is not nil and empty, it gets the keys greater than or equal to key.
  // Limit limits the number of keys returned.
  // If the required rev is compacted, ErrCompacted will be returned.
  Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error)
}

写视图,包含批量删除和写入

代码语言:javascript
复制
type WriteView interface {
  // DeleteRange deletes the given range from the store.
  // A deleteRange increases the rev of the store if any key in the range exists.
  // The number of key deleted will be returned.
  // The returned rev is the current revision of the KV when the operation is executed.
  // It also generates one event for each key delete in the event history.
  // if the `end` is nil, deleteRange deletes the key.
  // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end).
  DeleteRange(key, end []byte) (n, rev int64)

  // Put puts the given key, value into the store. Put also takes additional argument lease to
  // attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease
  // id.
  // A put also increases the rev of the store, and generates one event in the event history.
  // The returned rev is the current revision of the KV when the operation is executed.
  Put(key, value []byte, lease lease.LeaseID) (rev int64)
}
代码语言:javascript
复制
type WatchableKV interface {
  KV
  Watchable
}
代码语言:javascript
复制
type Watchable interface {
  // NewWatchStream returns a WatchStream that can be used to
  // watch events happened or happening on the KV.
  NewWatchStream() WatchStream
}

server/storage/mvcc/kvstore_compaction.go定义了压缩操作

代码语言:javascript
复制
    func (s *store) scheduleCompaction(compactMainRev, prevCompactRev int64) (KeyValueHash, error) {
    h := newKVHasher(prevCompactRev, compactMainRev, keep)
    tx := s.b.BatchTx()

server/storage/mvcc/kvstore_txn.go里面具体实现了kv.go里面的接口,其属性store定义位于kvstore.go

代码语言:javascript
复制
type storeTxnRead struct {
  s  *store
  tx backend.ReadTx


  firstRev int64
  rev      int64


  trace *traceutil.Trace
}

Read方法只不过调用了store的Read方法而已

代码语言:javascript
复制
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
        if mode == ConcurrentReadTxMode {
    tx = s.b.ConcurrentReadTx()
  } else {
    tx = s.b.ReadTx()
  }

范围读也一样

代码语言:javascript
复制
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {

写也是一样

代码语言:javascript
复制
type storeTxnWrite struct {
  storeTxnRead
  tx backend.BatchTx
  // beginRev is the revision where the txn begins; it will write to the next revision.
  beginRev int64
  changes  []mvccpb.KeyValue
}
代码语言:javascript
复制
func (s *store) Write(trace *traceutil.Trace) TxnWrite {    
代码语言:javascript
复制
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
  tw.put(key, value, lease)

调用了store的kvindex的Put方法,把key写入BTree

代码语言:javascript
复制
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
      tw.s.kvindex.Put(key, idxRev)

server/storage/mvcc/kvstore.go里的store最终实现了上述复杂的各种方法:

代码语言:javascript
复制
type store struct {
  ReadView
  WriteView


  cfg StoreConfig


  // mu read locks for txns and write locks for non-txn store changes.
  mu sync.RWMutex


  b       backend.Backend
  kvindex index


  le lease.Lessor


  // revMuLock protects currentRev and compactMainRev.
  // Locked at end of write txn and released after write txn unlock lock.
  // Locked before locking read txn and released after locking.
  revMu sync.RWMutex
  // currentRev is the revision of the last completed transaction.
  currentRev int64
  // compactMainRev is the main revision of the last compaction.
  compactMainRev int64


  fifoSched schedule.Scheduler


  stopc chan struct{}


  lg     *zap.Logger
  hashes HashStorage
}

初始化的核心方法包括持久化存储b(bolt),索引kvindex,ha shes,最后初始化了读写视图

代码语言:javascript
复制
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
        s := &store{
    cfg:     cfg,
    b:       b,
    kvindex: newTreeIndex(lg),
        s.hashes = newHashStorage(lg, s)
  s.ReadView = &readView{s}
  s.WriteView = &writeView{s}    
代码语言:javascript
复制
func (s *store) compactBarrier(ctx context.Context, ch chan struct{}) {
            f := schedule.NewJob("kvstore_compactBarrier", func(ctx context.Context) { s.compactBarrier(ctx, ch) })
      s.fifoSched.Schedule(f)
代码语言:javascript
复制
  func (s *store) compact(trace *traceutil.Trace, rev, prevCompactRev int64) (<-chan struct{}, error) {  
代码语言:javascript
复制
func (s *store) restore() error {
      rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)

revKeyValue里存储了key和原始的kv

代码语言:javascript
复制
type revKeyValue struct {
  key  []byte
  kv   mvccpb.KeyValue
  kstr string
}
代码语言:javascript
复制
func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int64) {

server/storage/mvcc/metrics_txn.go里定义监控指标的事物实现

代码语言:javascript
复制
type metricsTxnWrite struct {
  TxnWrite
  ranges  uint
  puts    uint
  deletes uint
  putSize int64
}

server/storage/mvcc/metrics.go定义了监控指标

代码语言:javascript
复制
func init() {
        prometheus.MustRegister(rangeCounter)
  prometheus.MustRegister(putCounter)
  prometheus.MustRegister(deleteCounter)

server/storage/mvcc/revision.go里定义了版本号

代码语言:javascript
复制
type revision struct {
  // main is the main revision of a set of changes that happen atomically.
  main int64


  // sub is the sub revision of a change in a set of changes that happen
  // atomically. Each change has different increasing sub revision in that
  // set.
  sub int64
}
代码语言:javascript
复制
func bytesToRev(bytes []byte) revision {
  return revision{
    main: int64(binary.BigEndian.Uint64(bytes[0:8])),
    sub:  int64(binary.BigEndian.Uint64(bytes[9:])),
  }
}

server/storage/mvcc/store.go

代码语言:javascript
复制
func UnsafeReadFinishedCompact(tx backend.ReadTx) (finishedComact int64, found bool) {

server/storage/mvcc/watchable_store_txn.go

代码语言:javascript
复制
func (tw *watchableStoreTxnWrite) End() {
        tw.s.notify(rev, evs)
  tw.TxnWrite.End()
代码语言:javascript
复制
type watchableStoreTxnWrite struct {
  TxnWrite
  s *watchableStore
}

server/storage/mvcc/watchable_store.go

代码语言:javascript
复制
type watchable interface {
  watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc)
  progress(w *watcher)
  progressAll(watchers map[WatchID]*watcher) bool
  rev() int64
}
代码语言:javascript
复制
type watchableStore struct {
  *store


  // mu protects watcher groups and batches. It should never be locked
  // before locking store.mu to avoid deadlock.
  mu sync.RWMutex


  // victims are watcher batches that were blocked on the watch channel
  victims []watcherBatch
  victimc chan struct{}


  // contains all unsynced watchers that needs to sync with events that have happened
  unsynced watcherGroup


  // contains all synced watchers that are in sync with the progress of the store.
  // The key of the map is the key that the watcher watches on.
  synced watcherGroup


  stopc chan struct{}
  wg    sync.WaitGroup
}
代码语言:javascript
复制
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {

支持watch,本质是后台启动了两个goroutine,不断同步改动

代码语言:javascript
复制
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
      s := &watchableStore{
      go s.syncWatchersLoop()
  go s.syncVictimsLoop()
代码语言:javascript
复制
func (s *watchableStore) watch(key, end []byte, startRev int64, id WatchID, ch chan<- WatchResponse, fcs ...FilterFunc) (*watcher, cancelFunc) {
      wa := &watcher{
代码语言:javascript
复制
type watcher struct {
  // the watcher key
  key []byte
  // end indicates the end of the range to watch.
  // If end is set, the watcher is on a range.
  end []byte


  // victim is set when ch is blocked and undergoing victim processing
  victim bool


  // compacted is set when the watcher is removed because of compaction
  compacted bool


  // restore is true when the watcher is being restored from leader snapshot
  // which means that this watcher has just been moved from "synced" to "unsynced"
  // watcher group, possibly with a future revision when it was first added
  // to the synced watcher
  // "unsynced" watcher revision must always be <= current revision,
  // except when the watcher were to be moved from "synced" watcher group
  restore bool


  // minRev is the minimum revision update the watcher will accept
  minRev int64
  id     WatchID


  fcs []FilterFunc
  // a chan to send out the watch response.
  // The chan might be shared with other watchers.
  ch chan<- WatchResponse
}  
代码语言:javascript
复制
func (w *watcher) send(wr WatchResponse) bool {

server/storage/mvcc/watcher_group.go实现了批量的事件监听

代码语言:javascript
复制
type eventBatch struct {
  // evs is a batch of revision-ordered events
  evs []mvccpb.Event
  // revs is the minimum unique revisions observed for this batch
  revs int
  // moreRev is first revision with more events following this batch
  moreRev int64
}
代码语言:javascript
复制
  type watcherSetByKey map[string]watcherSet
代码语言:javascript
复制
type watcherGroup struct {
  // keyWatchers has the watchers that watch on a single key
  keyWatchers watcherSetByKey
  // ranges has the watchers that watch a range; it is sorted by interval
  ranges adt.IntervalTree
  // watchers is the set of all watchers
  watchers watcherSet
}

server/storage/mvcc/watcher.go

代码语言:javascript
复制
type WatchStream interface {
  // Watch creates a watcher. The watcher watches the events happening or
  // happened on the given key or range [key, end) from the given startRev.
  //
  // The whole event history can be watched unless compacted.
  // If "startRev" <=0, watch observes events after currentRev.
  //
  // The returned "id" is the ID of this watcher. It appears as WatchID
  // in events that are sent to the created watcher through stream channel.
  // The watch ID is used when it's not equal to AutoWatchID. Otherwise,
  // an auto-generated watch ID is returned.
  Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)


  // Chan returns a chan. All watch response will be sent to the returned chan.
  Chan() <-chan WatchResponse


  // RequestProgress requests the progress of the watcher with given ID. The response
  // will only be sent if the watcher is currently synced.
  // The responses will be sent through the WatchRespone Chan attached
  // with this stream to ensure correct ordering.
  // The responses contains no events. The revision in the response is the progress
  // of the watchers since the watcher is currently synced.
  RequestProgress(id WatchID)


  // RequestProgressAll requests a progress notification for all
  // watchers sharing the stream.  If all watchers are synced, a
  // progress notification with watch ID -1 will be sent to an
  // arbitrary watcher of this stream, and the function returns
  // true.
  RequestProgressAll() bool


  // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
  // returned.
  Cancel(id WatchID) error


  // Close closes Chan and release all related resources.
  Close()


  // Rev returns the current revision of the KV the stream watches on.
  Rev() int64
}
代码语言:javascript
复制
type WatchResponse struct {
  // WatchID is the WatchID of the watcher this response sent to.
  WatchID WatchID


  // Events contains all the events that needs to send.
  Events []mvccpb.Event


  // Revision is the revision of the KV when the watchResponse is created.
  // For a normal response, the revision should be the same as the last
  // modified revision inside Events. For a delayed response to a unsynced
  // watcher, the revision is greater than the last modified revision
  // inside Events.
  Revision int64


  // CompactRevision is set when the watcher is cancelled due to compaction.
  CompactRevision int64
}
代码语言:javascript
复制
type watchStream struct {
  watchable watchable
  ch        chan WatchResponse


  mu sync.Mutex // guards fields below it
  // nextID is the ID pre-allocated for next new watcher in this stream
  nextID   WatchID
  closed   bool
  cancels  map[WatchID]cancelFunc
  watchers map[WatchID]*watcher
}
代码语言:javascript
复制
func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-09-15 00:00,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档