前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go-Redis Client 配置

Go-Redis Client 配置

原创
作者头像
王小明_HIT
发布2022-02-13 14:36:13
1.3K0
发布2022-02-13 14:36:13
举报
文章被收录于专栏:程序员奇点程序员奇点

前言

Go Redis 采用的是 new Client 方式初始化一个 client 实例, 一个 client 示例会给每个后端 proxy(codis-proxy) 建立一个连接池 connPool, 每次调用 client 的方法时, client 会轮训选择一个 connPool, 然后再选择一个 conn 来请求真正的 redis proxy

创建一个 Redis 客户端
代码语言:javascript
复制
rdb := redis.NewClient(&redis.Options{
  Addr:     "172.31.1.135:7000",
  Password: "",
  DB:       0,
 })
 
func NewClient(opt *Options) *Client {
 opt.init()

 c := Client{
  baseClient: newBaseClient(opt, newConnPool(opt)),
  ctx:        context.Background(),
 }
 c.cmdable = c.Process

 return &c
}

Option 说明

goredis 目录下 optiion.go

opt.init()options 初始化方法,PoolSize, ReadTimeOut, WriteTimeOut 有初始化设置值

代码语言:javascript
复制
func (opt *Options) init() {
 if opt.Addr == "" {
  opt.Addr = "localhost:6379"
 }
 if opt.Network == "" {
  if strings.HasPrefix(opt.Addr, "/") {
   opt.Network = "unix"
  } else {
   opt.Network = "tcp"
  }
 }
 if opt.DialTimeout == 0 {
  opt.DialTimeout = 5 * time.Second
 }
 if opt.Dialer == nil {
  opt.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
   netDialer := &net.Dialer{
    Timeout:   opt.DialTimeout,
    KeepAlive: 5 * time.Minute,
   }
   if opt.TLSConfig == nil {
    return netDialer.DialContext(ctx, network, addr)
   }
   return tls.DialWithDialer(netDialer, network, addr, opt.TLSConfig)
  }
 }
 if opt.PoolSize == 0 {
  opt.PoolSize = 10 * runtime.NumCPU()
 }
 switch opt.ReadTimeout {
 case -1:
  opt.ReadTimeout = 0
 case 0:
  opt.ReadTimeout = 3 * time.Second
 }
 switch opt.WriteTimeout {
 case -1:
  opt.WriteTimeout = 0
 case 0:
  opt.WriteTimeout = opt.ReadTimeout
 }
 if opt.PoolTimeout == 0 {
  opt.PoolTimeout = opt.ReadTimeout + time.Second
 }
 if opt.IdleTimeout == 0 {
  opt.IdleTimeout = 5 * time.Minute
 }
 if opt.IdleCheckFrequency == 0 {
  opt.IdleCheckFrequency = time.Minute
 }

 if opt.MaxRetries == -1 {
  opt.MaxRetries = 0
 } else if opt.MaxRetries == 0 {
  opt.MaxRetries = 3
 }
 switch opt.MinRetryBackoff {
 case -1:
  opt.MinRetryBackoff = 0
 case 0:
  opt.MinRetryBackoff = 8 * time.Millisecond
 }
 switch opt.MaxRetryBackoff {
 case -1:
  opt.MaxRetryBackoff = 0
 case 0:
  opt.MaxRetryBackoff = 512 * time.Millisecond
 }
}

option 结构体如下:

代码语言:javascript
复制
type Options struct {
 Dialer  func(context.Context) (net.Conn, error)
 OnClose func(*Conn) error

 PoolSize           int
 MinIdleConns       int
 MaxConnAge         time.Duration
 PoolTimeout        time.Duration
 IdleTimeout        time.Duration
 IdleCheckFrequency time.Duration
}
option 参数说明
  1. PoolSize: 单个connPool最大连接数。建议根据业务qps计算合理值配置。可以简单计算如:单业务实例需要抗瞬时最大请求数量 500、访问耗时常态10ms、超时50ms、后端10个proxy。则合理配置PoolSize为:500 / 10 proxy / (50ms / 10ms) = 10。
  2. SetPoolInitSize。每个connPool初始化时创建的连接数,脉冲型业务建议为(proxy数量 * PoolSize),即初始化时就创建全部连接,避免尖峰流量到来时突发建连(会导致一波超时) 3.LiveTimeout。单个连接建立后超过liveTimeout时client会主动断链。建议使用默认值0。
  3. IdleTimeout。单个连接空闲IdleTimeout时长后client会主动断链。建议值为25min。
  4. DialTimeoutReadTimeoutWriteTimeout。分别为建连、读、写超时时间。建议值均为100ms。
  5. PoolTimeout。连接数达到上线PoolSize时等待的最长时间,建议设置为(业务超时时间 - 访问abase耗时)
  6. MaxRetries。最大重试次数,会对连接错误以及timeout错误做重试。建议使用默认配置0。
创建连接池代码
代码语言:javascript
复制
func newConnPool(opt *Options) *pool.ConnPool {
 return pool.NewConnPool(&pool.Options{
  Dialer: func(ctx context.Context) (net.Conn, error) {
   ctx, span := internal.StartSpan(ctx, "redis.dial")
   defer span.End()

   if span.IsRecording() {
    span.SetAttributes(
     attribute.String("db.connection_string", opt.Addr),
    )
   }

   cn, err := opt.Dialer(ctx, opt.Network, opt.Addr)
   if err != nil {
    return nil, internal.RecordError(ctx, span, err)
   }

   return cn, nil
  },
  PoolSize:           opt.PoolSize,
  MinIdleConns:       opt.MinIdleConns,
  MaxConnAge:         opt.MaxConnAge,
  PoolTimeout:        opt.PoolTimeout,
  IdleTimeout:        opt.IdleTimeout,
  IdleCheckFrequency: opt.IdleCheckFrequency,
 })
}
Redis 保活

发现问题没,goredis 中,并没有最小连接数的设置,也没有 KeepAlive 。 业务如果长时间没有流量,会导致 和 Redis 实例断开链接,因为已经到达了 IdelTimeout 。 当尖峰流量到来的时候,也会导致突发建联, 导致一波超时,因为建立链接是比较耗时的。典型场景: 红包雨。直播间喊麦倒计时开始 ... 等,此时流量会突刺上涨。

如何解决上面说的问题: 保活

保活方法:

后台增加一个定时周期通过 ping 命令来保持链接, ping 是从链接池队尾获取链接, ping 完之后放回队尾, 这样需要足够的并发来实现保活,建议设置并发数 PoolInitSize*实例个数 , 周期设置为 IdleTimeOut/2

原理:定时去 ping 即可

代码语言:javascript
复制
//  ping function
go func() {
    ping := func() {
        cli, err := libabase.GetABaseClient(psm)
        if err == nil {
            cli.Ping()
        }
    }

    var funcs = make([]func(), parallelism)
    for i := int64(0); i < parallelism; i++ {
        funcs[i] = ping
    }

    for {
        libs.Dispatch(context.Background(), funcs, parallelism, false)
        time.Sleep(time.Minute + time.Duration(libs.RandInt63()%100)*5*time.Second)
    }
}()

//  libs.dispatch
func Dispatch(ctx context.Context, funcs []func(), parallelism int64, isBlock bool) error {
 if parallelism <= 0 {
  return liberror.InvalidValueError("parallelism should > 0, but", parallelism)
 }

 var runningCnt = int64(0)
 var wg = sync.WaitGroup{}
 var lock = sync.Mutex{}
 var cond = sync.NewCond(&lock)

 for _, f := range funcs {
  wg.Add(1)
  go func(f func()) {
   defer func() {
    if r := recover(); r != nil {
     logs.CtxWarn(ctx, "Dispatch, recover, %+v", r)
    }
    cond.L.Lock()
    runningCnt--
    cond.L.Unlock()
    cond.Signal()
    wg.Done()
   }()

   cond.L.Lock()
   for runningCnt >= parallelism { // 按照设定的并发度执行
    cond.Wait()
   }
   runningCnt++
   cond.L.Unlock()

   f()
  }(f)
 }

 // 需要阻塞的话, 需要wait
 if isBlock {
  wg.Wait()
 }

 return nil
}

可以看下简单的写法

启动一个协程,定时去 ping 即可保活

代码语言:javascript
复制
//  ping function
go func() {
    ping := func() {
        cli, err := libabase.GetABaseClient(psm)
        if err == nil {
            cli.Ping()
        }
    }

    var funcs = make([]func(), parallelism)
    for i := int64(0); i < parallelism; i++ {
        funcs[i] = ping
    }

    for {
        libs.Dispatch(context.Background(), funcs, parallelism, false)
        time.Sleep(time.Minute + time.Duration(libs.RandInt63()%100)*5*time.Second)
    }
}()

//  libs.dispatch
func Dispatch(ctx context.Context, funcs []func(), parallelism int64, isBlock bool) error {
 if parallelism <= 0 {
  return liberror.InvalidValueError("parallelism should > 0, but", parallelism)
 }

 var runningCnt = int64(0)
 var wg = sync.WaitGroup{}
 var lock = sync.Mutex{}
 var cond = sync.NewCond(&lock)

 for _, f := range funcs {
  wg.Add(1)
  go func(f func()) {
   defer func() {
    if r := recover(); r != nil {
     logs.CtxWarn(ctx, "Dispatch, recover, %+v", r)
    }
    cond.L.Lock()
    runningCnt--
    cond.L.Unlock()
    cond.Signal()
    wg.Done()
   }()

   cond.L.Lock()
   for runningCnt >= parallelism { // 按照设定的并发度执行
    cond.Wait()
   }
   runningCnt++
   cond.L.Unlock()

   f()
  }(f)
 }

 // 需要阻塞的话, 需要wait
 if isBlock {
  wg.Wait()
 }

 return nil
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
    • 创建一个 Redis 客户端
    • Option 说明
      • option 参数说明
        • 创建连接池代码
          • Redis 保活
          • 保活方法:
          相关产品与服务
          云数据库 Redis
          腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档