首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >并发编程--用SingleFlight合并重复请求

并发编程--用SingleFlight合并重复请求

作者头像
KevinYan
发布2020-12-31 10:13:07
1.2K0
发布2020-12-31 10:13:07
举报
文章被收录于专栏:网管叨bi叨网管叨bi叨

大家好啊,今天网管想给大家介绍一下Gosingleflight包,当然它不是直译过来的单飞的意思~~!SingleFlightGo语言sync扩展库提供的另一种并发原语,那么SingleFlight是用于解决什么问题的呢?官方文档里的解释是:

Package singleflight provides a duplicate function call suppression mechanism. 翻译过来就是:singleflight包提供了一种抑制重复函数调用的机制。

具体到Go程序运行的层面来说,SingleFlight的作用是在处理多个goroutine同时调用同一个函数的时候,只让一个goroutine去实际调用这个函数,等到这个goroutine返回结果的时候,再把结果返回给其他几个同时调用了相同函数的goroutine,这样可以减少并发调用的数量。在实际应用中也是,它能够在一个服务中减少对下游的并发重复请求。还有一个比较常见的使用场景是用来防止缓存击穿。

Go提供的SingleFlight

Go扩展库里用singleflight.Group结构体类型提供了SingleFlight并发原语的功能。

singleflight.Group类型提供了三个方法:

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

func (g *Group) Forget(key string)
  • Do方法,接受一个字符串Key和一个待调用的函数,会返回调用函数的结果和错误。使用Do方法的时候,它会根据提供的Key判断是否去真正调用fn函数。同一个 key,在同一时间只有第一次调用Do方法时才会去执行fn函数,其他并发的请求会等待调用的执行结果。
  • DoChan方法:类似Do方法,只不过是一个异步调用。它会返回一个通道,等fn函数执行完,产生了结果以后,就能从这个 chan 中接收这个结果。
  • Forget方法:在SingleFlight中删除一个Key。这样一来,之后这个Key的Do方法调用会执行fn函数,而不是等待前一个未完成的fn 函数的结果。

应用场景

了解了Go语言提供的 SingleFlight并发原语都有哪些方法可以调用后 ,下面介绍两个它的应用场景。

查询DNS记录

Go语言的net标准库里使用的lookupGroup结构,就是Go扩展库提供的原语singleflight.Group

type Resolver struct {
  ......
 // 源码地址 https://github.com/golang/go/blob/master/src/net/lookup.go#L151
 // lookupGroup merges LookupIPAddr calls together for lookups for the same
 // host. The lookupGroup key is the LookupIPAddr.host argument.
 // The return values are ([]IPAddr, error).
 lookupGroup singleflight.Group
}

它的作用是将对相同域名的DNS记录查询合并成一个查询,下面是net库提供的DNS记录查询方法LookupIp使用lookupGroup这个SingleFlight进行合并查询的相关源码,它使用的是异步查询的方法DoChan

func LookupIP(host string) ([]IP, error) {
 addrs, err := DefaultResolver.LookupIPAddr(context.Background(), host)
  ......
}

func (r *Resolver) lookupIPAddr(ctx context.Context, network, host string) ([]IPAddr, error) {
  ......
  // 使用SingleFlight的DoChan合并多个查询请求
 ch, called := r.getLookupGroup().DoChan(lookupKey, func() (interface{}, error) {
  defer dnsWaitGroup.Done()
  return testHookLookupIP(lookupGroupCtx, resolverFunc, network, host)
 })
 if !called {
  dnsWaitGroup.Done()
 }
 
 select {
 case <-ctx.Done():
  ......
 case r := <-ch:
  lookupGroupCancel()
  if trace != nil && trace.DNSDone != nil {
   addrs, _ := r.Val.([]IPAddr)
   trace.DNSDone(ipAddrsEface(addrs), r.Shared, r.Err)
  }
  return lookupIPReturn(r.Val, r.Err, r.Shared)
 }
}

上面的源码做了很多删减,只留了SingleFlight合并查询的部分,如果有兴趣可以去GitHub上看一下完整的源码,访问链接https://github.com/golang/go/blob/master/src/net/lookup.go#L261 ,可直接定位到这部分的源码。

网管是不是很贴心,记得三连啊~!

防止缓存击穿

在项目里使用缓存时,一个常见的用法是查询一个数据先去查询缓存,如果没有就去数据库里查到数据并缓存到Redis里。那么缓存击穿问题是指,高并发的系统中,大量的请求同时查询一个缓存Key 时,如果这个 Key 正好过期失效,就会导致大量的请求都打到数据库上,这就是缓存击穿。用 SingleFlight 来解决缓存击穿问题再合适不过,这个时候只要这些对同一个 Key 的并发请求的其中一个到数据库中查询就可以了,这些并发的请求可以共享同一个结果。

下面是一个模拟用SingleFlight并发原语合并查询Redis缓存的程序,你可以自己动手测试一下,开10个goroutine去查询一个固定的Key,观察一下返回结果就会发现最终只执行了一次Redis查询。

// 模拟一个Redis客户端
type client struct {
 // ... 其他的配置省略
 requestGroup singleflight.Group
}

// 普通查询
func (c *client) Get(key string) (interface{}, error) {
 fmt.Println("Querying Database")
 time.Sleep(time.Second)
 v := "Content of key" + key
 return  v, nil
}

// SingleFlight查询
func (c *client) SingleFlightGet(key string) (interface{}, error) {
 v, err, _ := c.requestGroup.Do(key, func() (interface{}, error) {
  return c.Get(key)

 })
 if err != nil {
  return nil, err
 }
 return v, err
}

完整的测试源码可以点击阅读原文,去我的GitHub仓库下载

实现原理

最后我们来看一下singleflight.Group的实现原理,通过它的源码也是能学到不少用Go语言编程的技巧的。singleflight.Group由一个互斥锁sync.Mutex和一个映射表组成,每一个 singleflight.call结构体都保存了当前调用对应的信息:

type Group struct {
 mu sync.Mutex
 m  map[string]*call
}

type call struct {
 wg sync.WaitGroup

 val interface{}
 err error

 dups  int
 chans []chan<- Result
}

下面我们来看看 Do 和 DoChan 方法是怎么实现的。

Do方法

SingleFlight 定义一个call结构体,每个结构体都保存了fn调用对应的信息。

Do方法的执行逻辑是每次调用Do方法都会先去获取互斥锁,随后判断在映射表里是否已经有Key对应的fn函数调用信息的call结构体。

  • 当不存在时,证明是这个Key的第一次请求,那么会初始化一个call结构体指针,增加SingleFlight内部持有的sync.WaitGroup计数器到1。释放互斥锁,然后阻塞的等待doCall方法执行fn函数的返回结果
  • 当存在时,增加call结构体内代表fn重复调用次数的计数器dups,释放互斥锁,然后使用WaitGroup等待fn函数执行完成。

call结构体的valerr 两个字段只会在 doCall方法中执行fn有返回结果后才赋值,所以当 doCall方法 和 WaitGroup.Wait返回时,函数调用的结果和错误会返回给Do方法的所有调用者。


  func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
      g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
      // 存在相同的key, 增加计数
      c.dups++
      g.mu.Unlock()
      c.wg.Wait() //等待这个key对应的fn调用完成
      return c.val, c.err, true // 返回fn调用的结果
    }
    c := new(call) // 不存在key, 是第一个请求, 创建一个call结构体
    c.wg.Add(1)
    g.m[key] = c //加入到映射表中
    g.mu.Unlock()
  

    g.doCall(c, key, fn) // 调用方法
    return c.val, c.err, c.dups > 0
  }

doCall方法会去实际调用fn函数,因为call结构体初始化后forgotten字段的默认值是falsefn调用有返回后,会把对应的Key删掉。这样这轮请求都返回后,下一轮使用同一的Key的请求会重新调用执行一次fn函数。


  func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
    c.val, c.err = fn()
    c.wg.Done()
  

    g.mu.Lock()
    if !c.forgotten { // 已调用完,删除这个key
      delete(g.m, key)
    }
    for _, ch := range c.chans {
      ch <- Result{c.val, c.err, c.dups > 0}
    }
    g.mu.Unlock()
  }

DoChan方法

SingleFlight还提供了异步调用DoChan方法,它的执行逻辑和Do方法类似,唯一不同的是调用者不用阻塞等待调用的返回, DoChan方法会创建一个chan Result通道返回给调用者,调用者通过这个通道就能接受到fn函数的结果。这个chan Result通道,在返回给调用者前会先放到call结构体的维护的通知队列里,待fn函数返回结果后DoChan方法会把结果发送给通知队列中的每个通道。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
 ch := make(chan Result, 1)
 g.mu.Lock()
 if g.m == nil {
  g.m = make(map[string]*call)
 }
 if c, ok := g.m[key]; ok {
  c.dups++
  c.chans = append(c.chans, ch)
  g.mu.Unlock()
  return ch
 }
 c := &call{chans: []chan<- Result{ch}}
 c.wg.Add(1)
 g.m[key] = c
 g.mu.Unlock()

 go g.doCall(c, key, fn)

 return ch
}

type Result struct {
 Val    interface{}
 Err    error
 Shared bool
}

总结

学会SingleFlight这个并发原语后,下次在遇到类似在高并发情况下查询DNS记录、Redis缓存这样的场景的时候就可以应用上啦。最后我给你留个思考题吧,上面用SingleFlight查询Redis缓存的例子使用的是同步阻塞方法Do,你能不能改成使用异步非阻塞的DoChan方法呢?另外能不能给SingleFlightGet增加一个超时返回错误的功能呢?

提示一下,使用上下文对象,返回的错误是ctx.Err()

可以把在留言里写下你的解决方案,最好能附上源码链接,欢迎把文章分享给你更多的朋友,一起讨论。还没关注公众号「网管叨bi叨」的抓紧关注呀,每周都会有干货技术分享。

- END -

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-12-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 网管叨bi叨 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Go提供的SingleFlight
  • 应用场景
    • 查询DNS记录
      • 防止缓存击穿
      • 实现原理
        • Do方法
          • DoChan方法
          • 总结
          相关产品与服务
          云数据库 Redis
          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档