前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >golang源码分析:time/rate

golang源码分析:time/rate

作者头像
golangLeetcode
发布2022-08-02 19:27:19
4570
发布2022-08-02 19:27:19
举报
文章被收录于专栏:golang算法架构leetcode技术php

这是golang 源码中实现的限流器,是基于令牌桶算法的:

官方地址: golang.org/x/time/rate

github地址:github.com/golang/time/rate

代码语言:javascript
复制
   r := rate.Every(100 * time.Millisecond)
   limit := rate.NewLimiter(r, 20)
   for {
       if limit.AllowN(time.Now(), 8) {
           log.Info("log:event happen")
       } else {
           log.Info("log:event not allow")
       }

   }

一秒内产生10 个令牌,桶的容量是20,当前时刻取8个token

源码很简单只有两个文件:

代码语言:javascript
复制
rate.go
rate_test.go

1,NewLimiter

代码语言:javascript
复制
// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(r Limit, b int) *Limiter {
  return &Limiter{
    limit: r,
    burst: b,
  }
}

简单构造了一个limiter对象

代码语言:javascript
复制
type Limiter struct {
  mu     sync.Mutex
  limit  Limit
  burst  int
  tokens float64
  // last is the last time the limiter's tokens field was updated
  last time.Time
  // lastEvent is the latest time of a rate-limited event (past or future)
  lastEvent time.Time
}

记录了上一次分发token的时间,和上一次请求token的时间

代码语言:javascript
复制
func Every(interval time.Duration) Limit {
  if interval <= 0 {
    return Inf
  }
  return 1 / Limit(interval.Seconds())
}

仅仅做了从时间间隔向频率的转换。

2,AllowN/Allow

代码语言:javascript
复制
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
  return lim.AllowN(time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
  return lim.reserveN(now, n, 0).ok
}

底层都是调用了reserveN函数,maxFutureReserve参数传的是0

代码语言:javascript
复制
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
  lim.mu.Lock()

  if lim.limit == Inf {
    lim.mu.Unlock()
    return Reservation{
      ok:        true,
      lim:       lim,
      tokens:    n,
      timeToAct: now,
    }
  }

  now, last, tokens := lim.advance(now)

  // Calculate the remaining number of tokens resulting from the request.
  tokens -= float64(n)

  // Calculate the wait duration
  var waitDuration time.Duration
  if tokens < 0 {
    waitDuration = lim.limit.durationFromTokens(-tokens)
  }

  // Decide result
  ok := n <= lim.burst && waitDuration <= maxFutureReserve

  // Prepare reservation
  r := Reservation{
    ok:    ok,
    lim:   lim,
    limit: lim.limit,
  }
  if ok {
    r.tokens = n
    r.timeToAct = now.Add(waitDuration)
  }

  // Update state
  if ok {
    lim.last = now
    lim.tokens = tokens
    lim.lastEvent = r.timeToAct
  } else {
    lim.last = last
  }

  lim.mu.Unlock()
  return r
}

1,如果lim.limit == Inf,返回Reservation对象

代码语言:javascript
复制
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
  ok        bool
  lim       *Limiter
  tokens    int
  timeToAct time.Time
  // This is the Limit at reservation time, it can change later.
  limit Limit
}

2,获取当前时间,上一次产生token的时间和,产生的token

代码语言:javascript
复制
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
  last := lim.last
  if now.Before(last) {
    last = now
  }

  // Calculate the new number of tokens, due to time that passed.
  elapsed := now.Sub(last)
  delta := lim.limit.tokensFromDuration(elapsed)
  tokens := lim.tokens + delta
  if burst := float64(lim.burst); tokens > burst {
    tokens = burst
  }
  return now, last, tokens
}

A,如果当前时间比上一次获取token时间早(说明有请求在等待获取token),那么更新当前时间为上一次获取token时间(和上一个请求一起等)

B,计算从上一次获取token到现在的时间间隔 C,计算产生的token增量

代码语言:javascript
复制
 delta := lim.limit.tokensFromDuration(elapsed)
代码语言:javascript
复制
type Limit float64
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
  return d.Seconds() * float64(limit)
}

也就是时间间隔的秒数乘以每秒产生的token数量。

D,计算总的token数量

E,如果桶已经满了,丢弃多余的token

3,扣减本次请求需要的token

4,如果token数不够,计算需要等待的时间间隔

5,如果请求的token数量比桶的容量小,并且可以等待的时间大于需要等待的时间说明这个请求是合法的。

代码语言:javascript
复制
ok := n <= lim.burst && waitDuration <= maxFutureReserve

6,构造Reservation对象,存储当前limiter对象到lim

7,如果请求合法,存储当前请求需要的token数量和需要等待的时间(当前时间+等待时间间隔)

8,如果合法,更新当前limiter的上一次获取token时间为当前时间,获取的token数量为扣减后剩余的token数量,获取token时间为将来能够真正获取token的时间点。

9,否则更新limiter的上一次获取token时间为本次计算的上一次获取token时间。

上面就是获取token的所有代码实现。

Limiter提供了三类方法供用户消费Token,用户可以每次消费一个Token,也可以一次性消费多个Token。

1,AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之返回不消费 token,false。也就是前面介绍的方法。

代码语言:javascript
复制
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool

2,当使用 Wait 方法消费 token 时,如果此时桶内 token 数组不足 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 token 满足条件。如果充足则直接返回。

代码语言:javascript
复制
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
代码语言:javascript
复制
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
  lim.mu.Lock()
  burst := lim.burst
  limit := lim.limit
  lim.mu.Unlock()

  if n > burst && limit != Inf {
    return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
  }
  // Check if ctx is already cancelled
  select {
  case <-ctx.Done():
    return ctx.Err()
  default:
  }
  // Determine wait limit
  now := time.Now()
  waitLimit := InfDuration
  if deadline, ok := ctx.Deadline(); ok {
    waitLimit = deadline.Sub(now)
  }
  // Reserve
  r := lim.reserveN(now, n, waitLimit)
  if !r.ok {
    return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
  }
  // Wait if necessary
  delay := r.DelayFrom(now)
  if delay == 0 {
    return nil
  }
  t := time.NewTimer(delay)
  defer t.Stop()
  select {
  case <-t.C:
    // We can proceed.
    return nil
  case <-ctx.Done():
    // Context was canceled before we could proceed.  Cancel the
    // reservation, which may permit other events to proceed sooner.
    r.Cancel()
    return ctx.Err()
  }
}

A,如果请求数量超出了桶的容量,直接报错

B,通过ctx.Deadline()计算允许等待的时间间隔

C,调用r := lim.reserveN(now, n, waitLimit) 获取Reserve对象

D,如果reserve对象表示不能成功(超出桶的容量,超出时间限制),返回错误

E,计算需要等待的时间,timeToAct表示能够获取token的时间。

代码语言:javascript
复制
// DelayFrom returns the duration for which the reservation holder must wait
// before taking the reserved action.  Zero duration means act immediately.
// InfDuration means the limiter cannot grant the tokens requested in this
// Reservation within the maximum wait time.
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
  if !r.ok {
    return InfDuration
  }
  delay := r.timeToAct.Sub(now)
  if delay < 0 {
    return 0
  }
  return delay
}

F,启动定时器等待。

3,ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 token 是否充足,都会返回一个 Reservation * 对象。

你可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间。如果等待时间为 0,则说明不用等待。

必须等到等待时间之后,才能进行接下来的工作。

或者,如果不想等待,可以调用 Cancel() 方法,该方法会将 token 归还。

代码语言:javascript
复制
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

这个方法比较原始直接返回Reserve对象,交给用户处理

代码语言:javascript
复制
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
  r := lim.reserveN(now, n, InfDuration)
  return &r
}
本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-09-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 golang算法架构leetcode技术php 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档