前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Go Redis 中间件

Go Redis 中间件

作者头像
王小明_HIT
发布2022-06-14 16:31:19
3710
发布2022-06-14 16:31:19
举报
文章被收录于专栏:程序员奇点程序员奇点

本文基于 redis v6 中 WrapProcess 方法,可以对 redis 前后做操作。

  • https://github.com/go-redis/redis/blob/v6.15.9/redis.go

Go Redis 中间件

redis.go 中WrapProcess&修改Process方法 支持自己在 redis 操作前后,对操作前后进行处理,类似 Java 中的切片。

代码语言:javascript
复制
func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
 c.process = fn(c.defaultProcess)
}
func (c *baseClient) Process(cmd Cmder) error {
 if c.process != nil {
  return c.process(cmd)
 }
 return c.defaultProcess(cmd)
}

同时 增加文件 wrap_middleware.go

代码语言:javascript
复制
/*
 Wrappers with chains
*/

package goredis

import (
 "strings"
 "time"

 "kv/redis-v6"
)

type ProcessFunc func(cmd redis.Cmder) error

// ----- user custom WrapProcess Middleware
/*
 Example:
  type middleware1 struct{}

  func (m *middleware1) ProcessRequest(ctx *goredis.WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error){
   fmt.Printf("TEST_1_PROCESS_REQUEST: redis[%v] cmdName[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name())
   return cmder, nil
  }

  func (m *middleware1) ProcessResponse(ctx *goredis.WrapProcessContext, cmder redis.Cmder, err error) {
   key := "hello"
   value, _ := ctx.Get(key)
   fmt.Printf("TEST_1_PROCESS_RESPONSE: key[%v] value[%v]\n", key, value)
   fmt.Printf("TEST_1_PROCESS_RESPONSE: redis[%v] cmdName[%v] err[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name(), err)
  }

  type middleware2 struct{}

  func (m *middleware2) ProcessRequest(ctx *goredis.WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error){
   ctx.Set("hello", "kitty")
   fmt.Printf("TEST_2_PROCESS_REQUEST: redis[%v] cmdName[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name())
   return cmder, nil
  }

  func (m *middleware2) ProcessResponse(ctx *goredis.WrapProcessContext, cmder redis.Cmder, err error){
   key := "hello"
   value, _ := ctx.Get(key)
   fmt.Printf("TEST_2_PROCESS_RESPONSE: key[%v] value[%v]\n", key, value)
   fmt.Printf("TEST_2_PROCESS_RESPONSE: redis[%v] cmdName[%v] err[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name(), err)

  }

  type middleware3 struct{}

  func (m middleware3) ProcessRequest(ctx *goredis.WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error){
   fmt.Printf("TEST_3_PROCESS_REQUEST: redis[%v] cmdName[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name())
   return cmder, nil
  }

  func (m middleware3) ProcessResponse(ctx *goredis.WrapProcessContext, cmder redis.Cmder, err error){
   key := "hello"
   value, _ := ctx.Get(key)
   fmt.Printf("TEST_3_PROCESS_RESPONSE: key[%v] value[%v]\n", key, value)
   fmt.Printf("TEST_3_PROCESS_RESPONSE: redis[%v] cmdName[%v] err[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name(), err)
  }

  goredis.UseMiddleWare(&middleware1{}, &middleware2{})
  goredis.UseMiddleWare(middleware3{})

}
*/

type processContext interface {
 Set(key string, value interface{})
 Get(key string) (interface{}, bool)
}

// -- WrapProcessContext
type WrapProcessContext struct {
 client *Client
 data   map[string]interface{}
}

func (ctx *WrapProcessContext) Client() *Client {
 return ctx.client
}

// Ctx Set: non-concurrent safety
func (ctx *WrapProcessContext) Set(key string, value interface{}) {
 if ctx.data == nil {
  // startTime,stressTag
  ctx.data = make(map[string]interface{}, 2)
 }
 ctx.data[key] = value
}

// Ctx Get: non-concurrent safety
func (ctx *WrapProcessContext) Get(key string) (interface{}, bool) {
 if ctx.data == nil {
  return nil, false
 }
 value, exists := ctx.data[key]
 return value, exists
}

// -- WrapProcessMiddleWare
type WrapProcessMiddleWare interface {
 ProcessRequest(ctx *WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error)
 ProcessResponse(ctx *WrapProcessContext, cmder redis.Cmder, err error)
}

// -- internal WrapProcessMiddleWare Start --
type metricsWrapProcessMiddleWare struct{}

var (
 ctxCmdStartTimeKey = "cmd_st"
 ctxStressTagKey    = "stress_tag"
)

func (m *metricsWrapProcessMiddleWare) ProcessRequest(ctx *WrapProcessContext, cmd redis.Cmder) (redis.Cmder, error) {
 // degradate
 /*
  if cmdDegredated(c.metricsServiceName, cmd.Name()) {
   cmd.SetErr(ErrDegradated)
   return ErrDegradated
  }
 */
 // if stress rpc, hack args
 if prefix, ok := isStressTest(ctx.client.ctx); ok {
  isAbase := isAbaseCluster(ctx.client.Cluster())
  cmd = convertStressCMD(isAbase, prefix, cmd)
 }

 return cmd, nil
}

func (m *metricsWrapProcessMiddleWare) ProcessResponse(ctx *WrapProcessContext, cmd redis.Cmder, err error) {
 v, exists := ctx.Get(ctxCmdStartTimeKey)
 if !exists {
  return
 }

 startTime := v.(time.Time)

 latency := time.Now().Sub(startTime).Microseconds()
 addCallMetrics(ctx, cmd.Name(), latency, cmd.Err(), ctx.client.cluster, ctx.client.psm, ctx.client.metricsServiceName, 1, false)
}

// -- internal WrapProcessMiddleWare End --

var wrapProcessMiddleWares []WrapProcessMiddleWare

func UseMiddleWare(middleWares ...WrapProcessMiddleWare) {
 wrapProcessMiddleWares = append(wrapProcessMiddleWares, middleWares...)
}

func middleWareWrapProcess(client *Client, next ProcessFunc) ProcessFunc {
 middleWares := make([]WrapProcessMiddleWare, 0)

 // 1. force metrics middleWare at first
 middleWares = append(middleWares, &metricsWrapProcessMiddleWare{})

 // 2. bytedtrace middleware
 middleWares = append(middleWares, newCompatibleTraceMiddleWare())

 // 3. id set readonly, add readonly
 if client.IsReadOnly() {
  middleWares = append(middleWares, &readonlyProcessMiddleWare{})
 }

 // 4. if have abase table, add tableWrapProcessMiddleWare
 if len(strings.TrimSpace(client.Table())) > 0 {
  middleWares = append(middleWares, &tableWrapProcessMiddleWare{})
 }

 // 5. add user defined middleWares
 if len(wrapProcessMiddleWares) > 0 {
  middleWares = append(middleWares, wrapProcessMiddleWares...)
 }

 return func(cmd redis.Cmder) (err error) {
  ctx := &WrapProcessContext{client: client}
  // set the start time of cmd
  ctx.Set(ctxCmdStartTimeKey, time.Now())
  // set stress tag
  if v, exist := getStressTag(client.ctx); exist {
   ctx.Set(ctxStressTagKey, v)
  }

  processIndex := 0

  // process request
  for ; err == nil && processIndex < len(middleWares); processIndex++ {
   cmd, err = middleWares[processIndex].ProcessRequest(ctx, cmd)
  }

  cmd.SetErr(err)
  if err == nil {
   err = next(cmd)
  }

  // process response
  for index := processIndex - 1; index >= 0; index-- {
   middleWares[index].ProcessResponse(ctx, cmd, err)
  }

  return err
 }
}

func chainWrapProcessMiddleWares(client *Client) func(func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
 return func(next func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
  next = middleWareWrapProcess(client, next)
  return next
 }
}

中间件使用方式

代码语言:javascript
复制
goredis.UseMiddleWare(&staging.RedisStagingMW{})

goredis 包中 wrap_middleware.go 中间件接口WrapProcessMiddleWare

代码语言:javascript
复制
// -- WrapProcessMiddleWare
type WrapProcessMiddleWare interface {
 ProcessRequest(ctx *WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error)
 ProcessResponse(ctx *WrapProcessContext, cmder redis.Cmder, err error)
}

goredis 中间件处理包wrap_middleware

代码语言:javascript
复制
var wrapProcessMiddleWares []WrapProcessMiddleWare

func UseMiddleWare(middleWares ...WrapProcessMiddleWare) {
 wrapProcessMiddleWares = append(wrapProcessMiddleWares, middleWares...)
}

func middleWareWrapProcess(client *Client, next ProcessFunc) ProcessFunc {
 middleWares := make([]WrapProcessMiddleWare, 0)

 // 1. force metrics middleWare at first
 middleWares = append(middleWares, &metricsWrapProcessMiddleWare{})

 // 2. bytedtrace middleware
 middleWares = append(middleWares, newCompatibleTraceMiddleWare())

 // 3. id set readonly, add readonly
 if client.IsReadOnly() {
  middleWares = append(middleWares, &readonlyProcessMiddleWare{})
 }

 // 4. if have abase table, add tableWrapProcessMiddleWare
 if len(strings.TrimSpace(client.Table())) > 0 {
  middleWares = append(middleWares, &tableWrapProcessMiddleWare{})
 }

 // 5. add user defined middleWares
 if len(wrapProcessMiddleWares) > 0 {
  middleWares = append(middleWares, wrapProcessMiddleWares...)
 }

 return func(cmd redis.Cmder) (err error) {
  ctx := &WrapProcessContext{client: client}
  // set the start time of cmd
  ctx.Set(ctxCmdStartTimeKey, time.Now())
  // set stress tag
  if v, exist := getStressTag(client.ctx); exist {
   ctx.Set(ctxStressTagKey, v)
  }

  processIndex := 0

  // process request
  for ; err == nil && processIndex < len(middleWares); processIndex++ {
   cmd, err = middleWares[processIndex].ProcessRequest(ctx, cmd)
  }

  cmd.SetErr(err)
  if err == nil {
   err = next(cmd)
  }

  // process response
  for index := processIndex - 1; index >= 0; index-- {
   middleWares[index].ProcessResponse(ctx, cmd, err)
  }

  return err
 }
}

因此可以通过实现 WrapProcessMiddleWare 接口实现,自己的中间件处理方法。

看个例子,比如往 rediskey 加入一个前缀。

代码语言:javascript
复制

type RedisStagingMW struct {
 UseStress bool
}

func (m RedisStagingMW) ProcessRequest(ctx *goredis.WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error) {
 var stressPrefix string
 if m.UseStress {
  var isStress bool
  stressPrefix, isStress = getStressTag(ctx.Client().Context())
  if !isStress {
   return cmder, nil
  }

  if !strings.HasSuffix(stressPrefix, "_") {
   stressPrefix = stressPrefix + "_"
  }
 }

 stagingPrefix, isStaging := getStagingPrefix(ctx.Client().Context())
 if !isStaging {
  return cmder, nil
 }
 cmd := convertStagingCMD(cmder, stressPrefix, stagingPrefix)
 return cmd, nil
}

func (RedisStagingMW) ProcessResponse(*goredis.WrapProcessContext, redis.Cmder, error) {}

参考资料

  • https://pkg.go.dev/gopkg.in/go-redis/redis.v6#Client.WrapProcess
  • https://github.com/go-redis/redis
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-05-30,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员奇点 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Go Redis 中间件
  • 参考资料
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档