前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >读猿码系列——6.Golang中用幂等思路解决缓存击穿的方案:singleflight

读猿码系列——6.Golang中用幂等思路解决缓存击穿的方案:singleflight

作者头像
才浅Coding攻略
发布2022-12-12 18:14:48
5870
发布2022-12-12 18:14:48
举报
文章被收录于专栏:才浅coding攻略才浅coding攻略

今天我们来看一个日常工作中会遇到的问题:实际开发中常见的做法是在查数据库前先去查缓存,如果缓存Miss(未命中)就去数据库中查到数据并放到缓存里。这是正常情况,然而缓存击穿则是指在高并发系统中,大量请求同时查询一个缓存的key,假如这个key刚好过期就会导致大量的请求都打到数据库上。在绝大多数情况下,可以考虑使用singleflight来抑制重复函数调用。

https://pkg.go.dev/golang.org/x/sync@v0.0.0-20220929204114-8fcdb60fdcc0/singleflight

上面是singleflight包的地址,其中对外提供了以下几个方法:

代码语言:javascript
复制
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool)

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result

func (g *Group) Forget(key string)

我们跟到源码中看下这几个方法singleflight.go。其中Group是由一个互斥锁和一个映射表组成;call结构体中保存了当前调用对应的信息。

代码语言:javascript
复制
type call struct {
    wg sync.WaitGroup
    val interface{}
    err error
    dups  int
    chans []chan<- Result
}

type Group struct {
    mu sync.Mutex       // protects m
    m  map[string]*call // lazily initialized
}

type Result struct {
    Val    interface{}
    Err    error
    Shared bool
}

Do()方法,传入字符串key和fn回调函数,如果key相同,在同一时间只有第一次调用Do方法时才会去执行fn回调函数, 其他请求等待释放锁及执行结果。返回值v表示fn的执行结果;err表示fn返回的err;shared表示key是否是共享的。

代码语言:javascript
复制
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        // 存在相同的key, 增加计数
        c.dups++
        g.mu.Unlock()
        c.wg.Wait() // 等待这个key对应的fn调用完成

        if e, ok := c.err.(*panicError); ok {
            panic(e)
        } else if c.err == errGoexit {
            runtime.Goexit()
        }
        return c.val, c.err, true
    }
    c := new(call) // 不存在key是第一个请求, 创建一个call结构体
    c.wg.Add(1)
    g.m[key] = c // 加入到映射表中
    g.mu.Unlock()

    g.doCall(c, key, fn) // 调用方法
    return c.val, c.err, c.dups > 0
}

DoChan()方法和Do()方法类似,不同的是它维护了一个chan Result通道,这使得调用者不用阻塞等待调用返回,而是通过消费通道就可以拿到fn函数的执行结果。这个chan Result通道,在返回给调用者前会先放到call结构体的维护的通知队列里,待fn函数返回结果后DoChan方法会把结果发送给通知队列中的每个通道。

代码语言:javascript
复制
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
    ch := make(chan Result, 1)
    g.mu.Lock()
    if g.m == nil {
        g.m = make(map[string]*call)
    }
    if c, ok := g.m[key]; ok {
        c.dups++
        c.chans = append(c.chans, ch)
        g.mu.Unlock()
        return ch
    }
    c := &call{chans: []chan<- Result{ch}}
    c.wg.Add(1)
    g.m[key] = c
    g.mu.Unlock()

    go g.doCall(c, key, fn)

    return ch
}

我们通过例子来演示下singleflight包的使用方法。下面这段代码对比使用多个goroutine直接调用100次同一函数和使用singleflight包的Do()方法处理后再调用100次同一函数两者的耗时。

代码语言:javascript
复制
package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"

    "golang.org/x/sync/singleflight"
)

var count int32

func main() {
    var wg sync.WaitGroup
    now := time.Now()
    sg := &singleflight.Group{}
    for i := 0; i < 100; i++ {
      wg.Add(1)
      go func() {
        // Getcontent(1)
        SingleGetcontent(sg, 1)
        wg.Done()
      }()
    }
    wg.Wait()

    fmt.Printf("耗时:%s", time.Since(now))
}

func Getcontent(id int) (string, error) {
    atomic.AddInt32(&count, 1)
    time.Sleep(time.Duration(count) * time.Millisecond)
    return fmt.Sprintf("获取第 %d 个内容", id), nil
}

func SingleGetcontent(sg *singleflight.Group, id int) (string, error) {
    v, err, ok := sg.Do(fmt.Sprintf("%d", id), func() (interface{}, error) {
      return Getcontent(id)
    })
    fmt.Println(ok)
    return v.(string), err
}

得到的结果是直接调用Getcontent 100次耗时100.486554ms;使用singleflight后耗时1.782066ms。我们还在SingleGetcontent中输出了sg.Do()方法的第三个返回值,就是之前提到的shared,表示返回数据是调用 fn 得到的还是其他相同 key 调用返回的,这里输出得到的结果都是true,如果我们将循环次数改为i<1表示没有其他协程共享,就会返回false。

我们再规范下写法,使用 Context 来控制超时,通常使用方式如下:

代码语言:javascript
复制
package main

import (
    "context"
    "fmt"
    "sync/atomic"
    "time"

    "golang.org/x/sync/singleflight"
)

type Result string

func find(ctx context.Context, query string) (Result, error) {
   return Result(fmt.Sprintf("result for %q", query)), nil
}

func main() {
    var sg singleflight.Group
    const n = 5
    waited := int32(n)
    done := make(chan struct{})
    key := "http://cqcoding.site/#/"
    for i := 0; i < n; i++ {
      go func(j int) {
        v, _, shared := sg.Do(key, func() (interface{}, error) {
          ret, err := find(context.Background(), key)
          return ret, err
        })
        if atomic.AddInt32(&waited, -1) == 0 {
          close(done)
        }
        fmt.Printf("index: %d, val: %v, shared: %v\n", j, v, shared)
      }(i)
    }
    select {
    case <-done:
    case <-time.After(time.Second):
      fmt.Println("Do hangs")
    }
}

// index: 4, val: result for "http://cqcoding.site/#/", shared: false
// index: 0, val: result for "http://cqcoding.site/#/", shared: true
// index: 2, val: result for "http://cqcoding.site/#/", shared: true
// index: 3, val: result for "http://cqcoding.site/#/", shared: false
// index: 1, val: result for "http://cqcoding.site/#/", shared: false

如果函数执行一切正常,则所有请求都能顺利获得正确的数据。相反,如果函数执行遇到问题呢?由于 singleflight 是以阻塞读的方式来控制向下游请求的并发量,在第一个下游请求没有返回之前,所有请求都将被阻塞。

假设服务正常情况下处理能力为 1w QPS,每次请求会发起 3 次 下游调用,其中一个下游调用使用 singleflight 获取控制并发获取数据,请求超时时间为 3s。那么在出现请求超时的情况下,会出现以下几个问题:

  • 协程暴增,最小协程数为3w(1w/s * 3s)
  • 内存暴增,内存总大小为:协程内存大小 + 1w/s 3s (3+1)次 * (请求包+响应包)大小
  • 大量超时报错:1w/s * 3s
  • 后续请求耗时增加(调度等待)

如果类似问题出现在重要程度高的接口上,例如:读取游戏配置、获取博主信息等关键接口,那么问题将是非常致命的。出现该情况的根本原因有以下两点:

  • 阻塞读:缺少超时控制,难以快速失败;
  • 单并发:控制了并发量,但牺牲了成功率。

对于上述问题,其中阻塞读可以使用DoChan()方法异步调用,通过channel返回结果;使用select语句实现超时控制。

至于单并发问题,在一些对可用性要求极高的场景下,往往需要一定的请求饱和度来保证业务的最终成功率。一次请求还是多次请求,对于下游服务而言并没有太大区别,此时使用 singleflight 只是为了降低请求的数量级,那么使用 Forget() 提高下游请求的并发:

代码语言:javascript
复制
v, _, shared := g.Do(key, func() (interface{}, error) {
    go func() {
        time.Sleep(10 * time.Millisecond)
        fmt.Printf("Deleting key: %v\n", key)
        g.Forget(key)
    }()
    ret, err := find(context.Background(), key)
    return ret, err
})
代码语言:javascript
复制
func (g *Group) Forget(key string) {
    g.mu.Lock()
    delete(g.m, key)
    g.mu.Unlock()
}

当有一个并发请求超过 10ms,那么将会有第二个请求发起,此时只有 10ms 内的请求最多发起一次请求,即最大并发:100 QPS。单次请求失败的影响大大降低。


除了缓存击穿这类缓存Miss缓解数据库压力的应用场景,singleflight还可以被用于查询DNS记录,Go语言的net标准库里使用的lookupGroup结构,就是Go扩展库提供的原语singleflight.Group。它的作用是将对相同域名的DNS记录查询合并成一个查询,使用的是异步查询的方法DoChan。如果有兴趣可以去GitHub上看一下完整的源码,访问链接可直接定位到这部分的源码:

https://github.com/golang/go/blob/master/src/net/lookup.go#L261

总结

SingleFlight的作用是在处理多个goroutine同时调用同一个函数的时候,只让一个goroutine去实际调用这个函数,等到这个goroutine返回结果的时,再把结果返回给其他几个同时调用了相同函数的goroutine,这样可以减少并发调用的数量。在实际应用中也是,它能够在一个服务中减少对下游的并发重复请求。对于单次的失败无法容忍的情况,在高并发的场景下更好的处理方案是:

  1. 放弃使用同步请求,牺牲数据更新的实时性。
  2. “缓存” 存储准实时的数据 + “异步更新” 数据到缓存。

本文章涉及代码我放到了gitlab上:

https://gitlab.com/893376179/daily-golang-package/-/tree/main/singleflight

参考:

https://pkg.go.dev/golang.org/x/sync@v0.0.0-20220929204114-8fcdb60fdcc0/singleflight

https://www.cyningsun.com/01-11-2021/golang-concurrency-singleflight.html

https://tern.cc/9v6KQ5

END

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-10-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 才浅coding攻略 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档