前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:etcd(18)

golang源码分析:etcd(18)

作者头像
golangLeetcode
发布2023-09-20 08:30:30
1380
发布2023-09-20 08:30:30
举报

介绍完一个个函数实现后,我们分析下完整的etcd的读写流程。有没有觉得很奇怪既然bolt是采用b+树存储的持久化存储来存储kv,为什还需要一个Btree结构来存储key的信息?

其实在etcd内部,从一个可以找到一个value分为两个步骤:1,通过key找到所有的版本号,从版本号里筛选需要查找的版本。2,根据版本号到bolt里面查找对应的k/v对,从而获得value值。Revision 中定义了一个全局递增的主版本号main,发生 put、txn、del 操作会递增,一个事务内的 main 版本号是唯一的;事务内的子版本号定义为sub,事务发生 put 和 del 操作时,从 0 开始递增。

这样存储的好处是在btree里面key是唯一的,通过key可以找到所有版本号;在bolt里面版本号是唯一的,查找过程和key完全解耦了,再加上写过程中版本号的递增特性,可以实现近乎顺序写,整个写的过程非常迅速。

我们先看下读的过程,在server/storage/mvcc/kvstore_txn.go中有个Read函数

代码语言:javascript
复制
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
  var tx backend.ReadTx
  if mode == ConcurrentReadTxMode {
  tx = s.b.ConcurrentReadTx()
  } else {
  tx = s.b.ReadTx()
  }
  return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})

返沪TxRead对象,对象有一个Range方法

代码语言:javascript
复制
func (tr *storeTxnRead) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
  return tr.rangeKeys(ctx, key, end, tr.Rev(), ro)
}

其核心代码如下:

代码语言:javascript
复制
func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
      if ro.Count {
    total := tr.s.kvindex.CountRevisions(key, end, rev)
      revpairs, total := tr.s.kvindex.Revisions(key, end, rev, int(ro.Limit))
      for i, revpair := range revpairs[:len(kvs)] {
          revToBytes(revpair, revBytes)
    _, vs := tr.tx.UnsafeRange(schema.Key, revBytes, nil, 0)

先通过kvindex.Revisions获取所有的版本号,然后再版本里筛选出需要的版本号到bolt里面查询,主要依赖readTx属性

代码语言:javascript
复制
type storeTxnRead struct {
  s  *store
  tx backend.ReadTx

首先看第一步,通过key找到revision,调用的Revisions方法位于server/storage/mvcc/index.go,它通过unsafeVisit获取所有版本号,采用访问者模式,传入的函数用于筛选需要的版本,其中使用了get函数来从索引btree里面获取最终需要的版本列表 rev, _, _, err := ki.get(ti.lg, atRev),核心逻辑如下:

代码语言:javascript
复制
func (ti *treeIndex) Revisions(key, end []byte, atRev int64, limit int) (revs []revision, total int) {
      if end == nil {
    rev, _, _, err := ti.unsafeGet(key, atRev)
        ti.unsafeVisit(key, end, func(ki *keyIndex) bool {
    if rev, _, _, err := ki.get(ti.lg, atRev); err == nil {
      if limit <= 0 || len(revs) < limit {
        revs = append(revs, rev)

btree的初始化方法如下

代码语言:javascript
复制
func newTreeIndex(lg *zap.Logger) index {
        return &treeIndex{
    tree: btree.NewG(32, func(aki *keyIndex, bki *keyIndex) bool {
      return aki.Less(bki)
    }),

查找用到的unsafeVisit方法如下,最终调用了BTree的AscendGreaterOrEqual方法

代码语言:javascript
复制
func (ti *treeIndex) unsafeVisit(key, end []byte, f func(ki *keyIndex) bool) {
        ti.tree.AscendGreaterOrEqual(keyi, func(item *keyIndex) bool {
    if len(endi.key) > 0 && !item.Less(endi) {
      return false
    }
    if !f(item) {
      return false
    }
    return true
  })

server/storage/mvcc/key_index.go里定义了keyIndex,它实现了Less接口因此可以用BTree。

代码语言:javascript
复制
func (ki *keyIndex) Less(bki *keyIndex) bool {
  return bytes.Compare(ki.key, bki.key) == -1
}

它的核心字段有三个,key,最近修改的版本号,历史上所有的版本号。btree里存储的元素,比较的时候比较key,一个key里存了多个reversion。

代码语言:javascript
复制
type keyIndex struct {
  key         []byte
  modified    revision // the main rev of the last modification
  generations []generation
}

完成在b树中找出keyIndex信息,然后就在generations里面找出对应版本的信息调用了get方法

代码语言:javascript
复制
func (ki *keyIndex) get(lg *zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
      g := ki.findGeneration(atRev)
      n := g.walk(func(rev revision) bool { return rev.main > atRev })
代码语言:javascript
复制
func (ki *keyIndex) findGeneration(rev int64) *generation {
      for cg >= 0 {
      g := ki.generations[cg]

以上就是完整的索引查找对应版本号的流程,索引的初始化代码位于server/storage/mvcc/kvstore.go

代码语言:javascript
复制
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
        s := &store{
    cfg:     cfg,
    b:       b,
    kvindex: newTreeIndex(lg),

得到版本号后,可以根据版本号到bolt对应的bucket里查找我们的value值,用到了baseReadTx,其定义位于server/storage/backend/read_tx.go

代码语言:javascript
复制
type baseReadTx struct {

通过rversion查找kv的过程是先缓存里找不到再到bolt里面找,最后缓存到buf

代码语言:javascript
复制
func (baseReadTx *baseReadTx) UnsafeRange(bucketType Bucket, key, endKey []byte, limit int64) ([][]byte, [][]byte) {
      keys, vals := baseReadTx.buf.Range(bucketType, key, endKey, limit)
                  bucket = baseReadTx.tx.Bucket(bucketType.Name())
    baseReadTx.buckets[bn] = bucket
        c := bucket.Cursor()
      k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))

server/storage/backend/batch_tx.go里会调用bolt对应的Seek方法

代码语言:javascript
复制
func unsafeRange(c *bolt.Cursor, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
        for ck, cv := c.Seek(key); ck != nil && isMatch(ck); ck, cv = c.Next() {

分析完查的流程后,我们分析下写的流程,可以从Put这个函数作为入开来进行追踪

代码语言:javascript
复制
func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
  tw.put(key, value, lease)
return tw.beginRev + 1
}

详细的put方法如下,先通过索引找到key最近修改的版本,然后创建用于存在在btree里面的key和用于存储在bolt里面的kv,然后使用UnsafeSeqPut存入bolt,使用kvindex.Put存入kv

代码语言:javascript
复制
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
      _, created, ver, err := tw.s.kvindex.Get(key, rev)
      idxRev := revision{main: rev, sub: int64(len(tw.changes))}
        kv := mvccpb.KeyValue{
    Key:            key,
    Value:          value,
    CreateRevision: c,
    ModRevision:    rev,
    Version:        ver,
    Lease:          int64(leaseID),
  }
      d, err := kv.Marshal()
      tw.tx.UnsafeSeqPut(schema.Key, ibytes, d)
      tw.s.kvindex.Put(key, idxRev)

将数据存入bolt的函数定义如下

代码语言:javascript
复制
func (t *batchTx) UnsafeSeqPut(bucket Bucket, key []byte, value []byte) {
  t.unsafePut(bucket, key, value, true)
}

最终也是调用bolt的Put方法来存储的:

代码语言:javascript
复制
func (t *batchTx) unsafePut(bucketType Bucket, key []byte, value []byte, seq bool) {

      bucket := t.tx.Bucket(bucketType.Name())
      if err := bucket.Put(key, value); err != nil {

readTx的初始化位于server/storage/backend/backend.go

代码语言:javascript
复制
func (b *backend) ReadTx() ReadTx { return b.readTx }
代码语言:javascript
复制
func newBackend(bcfg BackendConfig) *backend {
      b := &backend{
    bopts: bopts,
    db:    db,
          readTx: &readTx{
      baseReadTx: baseReadTx{
        buf: txReadBuffer{
          txBuffer:   txBuffer{make(map[BucketID]*bucketBuffer)},
          bufVersion: 0,
        },
        buckets: make(map[BucketID]*bolt.Bucket),
        txWg:    new(sync.WaitGroup),
        txMu:    new(sync.RWMutex),
      },
    },

它的详细初始化链路追溯如下:server/storage/mvcc/watchable_store.go

代码语言:javascript
复制
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
      s := &watchableStore{
    store:    NewStore(lg, b, le, cfg),
代码语言:javascript
复制
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
  return newWatchableStore(lg, b, le, cfg)
}

server/etcdserver/server.go

代码语言:javascript
复制
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
      srv.be = b.storage.backend.be
      srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)

最后看下etcd在bolt里面是如何分桶的,桶的bucket定义如下server/storage/schema/bucket.go

代码语言:javascript
复制
  var (
  Key     = backend.Bucket(bucket{id: 1, name: keyBucketName, safeRangeBucket: true})
  Meta    = backend.Bucket(bucket{id: 2, name: metaBucketName, safeRangeBucket: false})
  Lease   = backend.Bucket(bucket{id: 3, name: leaseBucketName, safeRangeBucket: false})
  Alarm   = backend.Bucket(bucket{id: 4, name: alarmBucketName, safeRangeBucket: false})
  Cluster = backend.Bucket(bucket{id: 5, name: clusterBucketName, safeRangeBucket: false})
代码语言:javascript
复制
Members        = backend.Bucket(bucket{id: 10, name: membersBucketName, safeRangeBucket: false})
  MembersRemoved = backend.Bucket(bucket{id: 11, name: membersRemovedBucketName, safeRangeBucket: false})  
代码语言:javascript
复制
Auth      = backend.Bucket(bucket{id: 20, name: authBucketName, safeRangeBucket: false})
  AuthUsers = backend.Bucket(bucket{id: 21, name: authUsersBucketName, safeRangeBucket: false})
  AuthRoles = backend.Bucket(bucket{id: 22, name: authRolesBucketName, safeRangeBucket: false})


  Test = backend.Bucket(bucket{id: 100, name: testBucketName, safeRangeBucket: false})
)

etcd里面每种类型一个有一个单独的bucket,但是所有的key都使用的是同一个bucket

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-16 00:00,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档