前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:etcd(13)

golang源码分析:etcd(13)

作者头像
golangLeetcode
发布2023-09-20 08:29:07
1480
发布2023-09-20 08:29:07
举报

我们来看下lease目录,了解下租约是如何实现的。首先我们还是从server的初始化地方开始:server/etcdserver/server.go,调用了NewLessor来初始化租约管理器

代码语言:javascript
复制
func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
      srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
    MinLeaseTTL:                int64(math.Ceil(minTTL.Seconds())),
    CheckpointInterval:         cfg.LeaseCheckpointInterval,
    CheckpointPersist:          cfg.LeaseCheckpointPersist,
    ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
  })

对应源码位于server/lease/lessor.go定义了租约管理相关接口和具体实现:

代码语言:javascript
复制
type TxnDelete interface {
  DeleteRange(key, end []byte) (n, rev int64)
  End()
}
代码语言:javascript
复制
type Lessor interface {
  // SetRangeDeleter lets the lessor create TxnDeletes to the store.
  // Lessor deletes the items in the revoked or expired lease by creating
  // new TxnDeletes.
  SetRangeDeleter(rd RangeDeleter)


  SetCheckpointer(cp Checkpointer)


  // Grant grants a lease that expires at least after TTL seconds.
  Grant(id LeaseID, ttl int64) (*Lease, error)
  // Revoke revokes a lease with given ID. The item attached to the
  // given lease will be removed. If the ID does not exist, an error
  // will be returned.
  Revoke(id LeaseID) error


  // Checkpoint applies the remainingTTL of a lease. The remainingTTL is used in Promote to set
  // the expiry of leases to less than the full TTL when possible.
  Checkpoint(id LeaseID, remainingTTL int64) error


  // Attach attaches given leaseItem to the lease with given LeaseID.
  // If the lease does not exist, an error will be returned.
  Attach(id LeaseID, items []LeaseItem) error


  // GetLease returns LeaseID for given item.
  // If no lease found, NoLease value will be returned.
  GetLease(item LeaseItem) LeaseID


  // Detach detaches given leaseItem from the lease with given LeaseID.
  // If the lease does not exist, an error will be returned.
  Detach(id LeaseID, items []LeaseItem) error


  // Promote promotes the lessor to be the primary lessor. Primary lessor manages
  // the expiration and renew of leases.
  // Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
  Promote(extend time.Duration)


  // Demote demotes the lessor from being the primary lessor.
  Demote()


  // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
  // an error will be returned.
  Renew(id LeaseID) (int64, error)


  // Lookup gives the lease at a given lease id, if any
  Lookup(id LeaseID) *Lease


  // Leases lists all leases.
  Leases() []*Lease


  // ExpiredLeasesC returns a chan that is used to receive expired leases.
  ExpiredLeasesC() <-chan []*Lease


  // Recover recovers the lessor state from the given backend and RangeDeleter.
  Recover(b backend.Backend, rd RangeDeleter)


  // Stop stops the lessor for managing leases. The behavior of calling Stop multiple
  // times is undefined.
  Stop()
}
代码语言:javascript
复制
type lessor struct {
  mu sync.RWMutex


  // demotec is set when the lessor is the primary.
  // demotec will be closed if the lessor is demoted.
  demotec chan struct{}


  leaseMap             map[LeaseID]*Lease
  leaseExpiredNotifier *LeaseExpiredNotifier
  leaseCheckpointHeap  LeaseQueue
  itemMap              map[LeaseItem]LeaseID


  // When a lease expires, the lessor will delete the
  // leased range (or key) by the RangeDeleter.
  rd RangeDeleter


  // When a lease's deadline should be persisted to preserve the remaining TTL across leader
  // elections and restarts, the lessor will checkpoint the lease by the Checkpointer.
  cp Checkpointer


  // backend to persist leases. We only persist lease ID and expiry for now.
  // The leased items can be recovered by iterating all the keys in kv.
  b backend.Backend


  // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
  // requests for shorter TTLs are extended to the minimum TTL.
  minLeaseTTL int64


  expiredC chan []*Lease
  // stopC is a channel whose closure indicates that the lessor should be stopped.
  stopC chan struct{}
  // doneC is a channel whose closure indicates that the lessor is stopped.
  doneC chan struct{}


  lg *zap.Logger


  // Wait duration between lease checkpoints.
  checkpointInterval time.Duration
  // the interval to check if the expired lease is revoked
  expiredLeaseRetryInterval time.Duration
  // whether lessor should always persist remaining TTL (always enabled in v3.6).
  checkpointPersist bool
  // cluster is used to adapt lessor logic based on cluster version
  cluster cluster
}  
代码语言:javascript
复制
type LessorConfig struct {
  MinLeaseTTL                int64
  CheckpointInterval         time.Duration
  ExpiredLeasesRetryInterval time.Duration
  CheckpointPersist          bool
}

在初始化租约管理器的时候会传入backend.Backend作为持久化存储器,对应具体实现就是bolt

代码语言:javascript
复制
func NewLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) Lessor {
  return newLessor(lg, b, cluster, cfg)
}
代码语言:javascript
复制
func newLessor(lg *zap.Logger, b backend.Backend, cluster cluster, cfg LessorConfig) *lessor 

      l := &lessor{
        b:                         b,
      l.initAndRecover()
      go l.runLoop()

然后会起一个协程不断回收过期的租约,通知给channel

代码语言:javascript
复制
func (le *lessor) runLoop() {
      for {
    le.revokeExpiredLeases()
    le.checkpointScheduledLeases()
代码语言:javascript
复制
func (le *lessor) revokeExpiredLeases() {
        if le.isPrimary() {
    ls = le.findExpiredLeases(revokeLimit)
  }
        if len(ls) != 0 {
    select {
    case <-le.stopC:
      return
    case le.expiredC <- ls:
代码语言:javascript
复制
func (le *lessor) checkpointScheduledLeases() {
          if le.isPrimary() {
      cps = le.findDueScheduledCheckpoints(maxLeaseCheckpointBatchSize)
    }
      le.cp(context.Background(), &pb.LeaseCheckpointRequest{Checkpoints: cps})
代码语言:javascript
复制
func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
  return le.expiredC
}

当然这里也实现了核心授予租期的函数,通过persistTo 来持久化

代码语言:javascript
复制
func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
      l := &Lease{
      if _, ok := le.leaseMap[id]; ok {
        if le.isPrimary() {
    l.refresh(0)
  } else {
    l.forever()
  }
        le.leaseMap[id] = l
  l.persistTo(le.b)    
代码语言:javascript
复制
func (l *Lease) persistTo(b backend.Backend) {

回收租约

代码语言:javascript
复制
func (le *lessor) Revoke(id LeaseID) error {
      delete(le.leaseMap, id)
      txn := le.rd()
      keys := l.Keys()
  sort.StringSlice(keys).Sort()
  for _, key := range keys {
    txn.DeleteRange([]byte(key), nil)
  }

刷新租约(续约)

代码语言:javascript
复制
func (le *lessor) Renew(id LeaseID) (int64, error) {
      l := le.leaseMap[id]
        l.refresh(0)
  item := &LeaseWithTime{id: l.ID, time: l.expiry}
  le.leaseExpiredNotifier.RegisterOrUpdate(item)
代码语言:javascript
复制
func (le *lessor) Promote(extend time.Duration) {

文件里同时包含一个没有实现的FakeLessor

代码语言:javascript
复制
type FakeLessor struct{}
代码语言:javascript
复制
func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
代码语言:javascript
复制
type FakeTxnDelete struct {
  backend.BatchTx
}

看完租约管理器,我们看下租约的具体实现server/lease/lease.go

代码语言:javascript
复制
type Lease struct {
  ID           LeaseID
  ttl          int64 // time to live of the lease in seconds
  remainingTTL int64 // remaining time to live in seconds, if zero valued it is considered unset and the full ttl should be used
  // expiryMu protects concurrent accesses to expiry
  expiryMu sync.RWMutex
  // expiry is time when lease should expire. no expiration when expiry.IsZero() is true
  expiry time.Time


  // mu protects concurrent accesses to itemSet
  mu      sync.RWMutex
  itemSet map[LeaseItem]struct{}
  revokec chan struct{}
}

它是通过persistTo,函数持久租约到backend存储的

代码语言:javascript
复制
func (l *Lease) persistTo(b backend.Backend) {
      tx := b.BatchTx()
      tx.LockInsideApply()
      schema.MustUnsafePutLease(tx, &lpb)

租约本身是一个带ttl的k/v

代码语言:javascript
复制
type LeaseItem struct {
  Key string
}

目录下实现了一个http协议的租约服务和对应的客户端操作函数:server/lease/leasehttp/http.go提供了俩路径

代码语言:javascript
复制
LeasePrefix         = "/leases"
LeaseInternalPrefix = "/leases/internal"
代码语言:javascript
复制
type leaseHandler struct {
  l      lease.Lessor
  waitch func() <-chan struct{}
}

分别对应了租约的刷新和查找

代码语言:javascript
复制
func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
      switch r.URL.Path {
        case LeasePrefix:
          ttl, rerr := h.l.Renew(lease.LeaseID(lreq.ID))
        case LeaseInternalPrefix:
          l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
代码语言:javascript
复制
func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
      req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(lreq))    
代码语言:javascript
复制
func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) {
      req, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(lreq))

server/lease/leasepb/lease.pb.go定义了租约结构的protoserver/lease/leasepb/lease.proto

代码语言:javascript
复制
message Lease {
  int64 ID = 1;
  int64 TTL = 2;
  int64 RemainingTTL = 3;
}
代码语言:javascript
复制
message LeaseInternalRequest {
  etcdserverpb.LeaseTimeToLiveRequest LeaseTimeToLiveRequest = 1;
}
代码语言:javascript
复制
message LeaseInternalResponse {
  etcdserverpb.LeaseTimeToLiveResponse LeaseTimeToLiveResponse = 1;
}

server/lease/lease_queue.go里面实现了一个队列,用来维护租约的有序性

代码语言:javascript
复制
type LeaseWithTime struct {
  id    LeaseID
  time  time.Time
  index int
}
代码语言:javascript
复制
type LeaseQueue []*LeaseWithTime
代码语言:javascript
复制
func (pq *LeaseQueue) Push(x interface{}) {
代码语言:javascript
复制
type LeaseExpiredNotifier struct {
  m     map[LeaseID]*LeaseWithTime
  queue LeaseQueue
}

通过map加队列的形式实现了类似lru的能力

代码语言:javascript
复制
func (mq *LeaseExpiredNotifier) RegisterOrUpdate(item *LeaseWithTime) {
        if old, ok := mq.m[item.id]; ok {
    old.time = item.time
    heap.Fix(&mq.queue, old.index)
  } else {
    heap.Push(&mq.queue, item)
    mq.m[item.id] = item
  }

server/lease/metrics.go里面提供了监控的能力,把租约相关指标提供给promrtheus来进行监控

代码语言:javascript
复制
leaseGranted = prometheus.NewCounter(prometheus.CounterOpts{
    Namespace: "etcd_debugging",
    Subsystem: "lease",
    Name:      "granted_total",
    Help:      "The total number of granted leases.",
  })

以上就是etcd的lease目录下相关代码的核心实现。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-09-11 00:00,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档