本文基于 redis v6 中 WrapProcess
方法,可以对 redis 前后做操作。
redis.go 中WrapProcess
&修改Process
方法 支持自己在 redis 操作前后,对操作前后进行处理,类似 Java
中的切片。
func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
c.process = fn(c.defaultProcess)
}
func (c *baseClient) Process(cmd Cmder) error {
if c.process != nil {
return c.process(cmd)
}
return c.defaultProcess(cmd)
}
同时 增加文件 wrap_middleware.go
/*
Wrappers with chains
*/
package goredis
import (
"strings"
"time"
"kv/redis-v6"
)
type ProcessFunc func(cmd redis.Cmder) error
// ----- user custom WrapProcess Middleware
/*
Example:
type middleware1 struct{}
func (m *middleware1) ProcessRequest(ctx *goredis.WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error){
fmt.Printf("TEST_1_PROCESS_REQUEST: redis[%v] cmdName[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name())
return cmder, nil
}
func (m *middleware1) ProcessResponse(ctx *goredis.WrapProcessContext, cmder redis.Cmder, err error) {
key := "hello"
value, _ := ctx.Get(key)
fmt.Printf("TEST_1_PROCESS_RESPONSE: key[%v] value[%v]\n", key, value)
fmt.Printf("TEST_1_PROCESS_RESPONSE: redis[%v] cmdName[%v] err[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name(), err)
}
type middleware2 struct{}
func (m *middleware2) ProcessRequest(ctx *goredis.WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error){
ctx.Set("hello", "kitty")
fmt.Printf("TEST_2_PROCESS_REQUEST: redis[%v] cmdName[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name())
return cmder, nil
}
func (m *middleware2) ProcessResponse(ctx *goredis.WrapProcessContext, cmder redis.Cmder, err error){
key := "hello"
value, _ := ctx.Get(key)
fmt.Printf("TEST_2_PROCESS_RESPONSE: key[%v] value[%v]\n", key, value)
fmt.Printf("TEST_2_PROCESS_RESPONSE: redis[%v] cmdName[%v] err[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name(), err)
}
type middleware3 struct{}
func (m middleware3) ProcessRequest(ctx *goredis.WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error){
fmt.Printf("TEST_3_PROCESS_REQUEST: redis[%v] cmdName[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name())
return cmder, nil
}
func (m middleware3) ProcessResponse(ctx *goredis.WrapProcessContext, cmder redis.Cmder, err error){
key := "hello"
value, _ := ctx.Get(key)
fmt.Printf("TEST_3_PROCESS_RESPONSE: key[%v] value[%v]\n", key, value)
fmt.Printf("TEST_3_PROCESS_RESPONSE: redis[%v] cmdName[%v] err[%v]\n", ctx.Client().MetricsServiceName(), cmder.Name(), err)
}
goredis.UseMiddleWare(&middleware1{}, &middleware2{})
goredis.UseMiddleWare(middleware3{})
}
*/
type processContext interface {
Set(key string, value interface{})
Get(key string) (interface{}, bool)
}
// -- WrapProcessContext
type WrapProcessContext struct {
client *Client
data map[string]interface{}
}
func (ctx *WrapProcessContext) Client() *Client {
return ctx.client
}
// Ctx Set: non-concurrent safety
func (ctx *WrapProcessContext) Set(key string, value interface{}) {
if ctx.data == nil {
// startTime,stressTag
ctx.data = make(map[string]interface{}, 2)
}
ctx.data[key] = value
}
// Ctx Get: non-concurrent safety
func (ctx *WrapProcessContext) Get(key string) (interface{}, bool) {
if ctx.data == nil {
return nil, false
}
value, exists := ctx.data[key]
return value, exists
}
// -- WrapProcessMiddleWare
type WrapProcessMiddleWare interface {
ProcessRequest(ctx *WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error)
ProcessResponse(ctx *WrapProcessContext, cmder redis.Cmder, err error)
}
// -- internal WrapProcessMiddleWare Start --
type metricsWrapProcessMiddleWare struct{}
var (
ctxCmdStartTimeKey = "cmd_st"
ctxStressTagKey = "stress_tag"
)
func (m *metricsWrapProcessMiddleWare) ProcessRequest(ctx *WrapProcessContext, cmd redis.Cmder) (redis.Cmder, error) {
// degradate
/*
if cmdDegredated(c.metricsServiceName, cmd.Name()) {
cmd.SetErr(ErrDegradated)
return ErrDegradated
}
*/
// if stress rpc, hack args
if prefix, ok := isStressTest(ctx.client.ctx); ok {
isAbase := isAbaseCluster(ctx.client.Cluster())
cmd = convertStressCMD(isAbase, prefix, cmd)
}
return cmd, nil
}
func (m *metricsWrapProcessMiddleWare) ProcessResponse(ctx *WrapProcessContext, cmd redis.Cmder, err error) {
v, exists := ctx.Get(ctxCmdStartTimeKey)
if !exists {
return
}
startTime := v.(time.Time)
latency := time.Now().Sub(startTime).Microseconds()
addCallMetrics(ctx, cmd.Name(), latency, cmd.Err(), ctx.client.cluster, ctx.client.psm, ctx.client.metricsServiceName, 1, false)
}
// -- internal WrapProcessMiddleWare End --
var wrapProcessMiddleWares []WrapProcessMiddleWare
func UseMiddleWare(middleWares ...WrapProcessMiddleWare) {
wrapProcessMiddleWares = append(wrapProcessMiddleWares, middleWares...)
}
func middleWareWrapProcess(client *Client, next ProcessFunc) ProcessFunc {
middleWares := make([]WrapProcessMiddleWare, 0)
// 1. force metrics middleWare at first
middleWares = append(middleWares, &metricsWrapProcessMiddleWare{})
// 2. bytedtrace middleware
middleWares = append(middleWares, newCompatibleTraceMiddleWare())
// 3. id set readonly, add readonly
if client.IsReadOnly() {
middleWares = append(middleWares, &readonlyProcessMiddleWare{})
}
// 4. if have abase table, add tableWrapProcessMiddleWare
if len(strings.TrimSpace(client.Table())) > 0 {
middleWares = append(middleWares, &tableWrapProcessMiddleWare{})
}
// 5. add user defined middleWares
if len(wrapProcessMiddleWares) > 0 {
middleWares = append(middleWares, wrapProcessMiddleWares...)
}
return func(cmd redis.Cmder) (err error) {
ctx := &WrapProcessContext{client: client}
// set the start time of cmd
ctx.Set(ctxCmdStartTimeKey, time.Now())
// set stress tag
if v, exist := getStressTag(client.ctx); exist {
ctx.Set(ctxStressTagKey, v)
}
processIndex := 0
// process request
for ; err == nil && processIndex < len(middleWares); processIndex++ {
cmd, err = middleWares[processIndex].ProcessRequest(ctx, cmd)
}
cmd.SetErr(err)
if err == nil {
err = next(cmd)
}
// process response
for index := processIndex - 1; index >= 0; index-- {
middleWares[index].ProcessResponse(ctx, cmd, err)
}
return err
}
}
func chainWrapProcessMiddleWares(client *Client) func(func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(next func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
next = middleWareWrapProcess(client, next)
return next
}
}
中间件使用方式
goredis.UseMiddleWare(&staging.RedisStagingMW{})
goredis
包中 wrap_middleware.go
中间件接口WrapProcessMiddleWare
// -- WrapProcessMiddleWare
type WrapProcessMiddleWare interface {
ProcessRequest(ctx *WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error)
ProcessResponse(ctx *WrapProcessContext, cmder redis.Cmder, err error)
}
goredis
中间件处理包wrap_middleware
中
var wrapProcessMiddleWares []WrapProcessMiddleWare
func UseMiddleWare(middleWares ...WrapProcessMiddleWare) {
wrapProcessMiddleWares = append(wrapProcessMiddleWares, middleWares...)
}
func middleWareWrapProcess(client *Client, next ProcessFunc) ProcessFunc {
middleWares := make([]WrapProcessMiddleWare, 0)
// 1. force metrics middleWare at first
middleWares = append(middleWares, &metricsWrapProcessMiddleWare{})
// 2. bytedtrace middleware
middleWares = append(middleWares, newCompatibleTraceMiddleWare())
// 3. id set readonly, add readonly
if client.IsReadOnly() {
middleWares = append(middleWares, &readonlyProcessMiddleWare{})
}
// 4. if have abase table, add tableWrapProcessMiddleWare
if len(strings.TrimSpace(client.Table())) > 0 {
middleWares = append(middleWares, &tableWrapProcessMiddleWare{})
}
// 5. add user defined middleWares
if len(wrapProcessMiddleWares) > 0 {
middleWares = append(middleWares, wrapProcessMiddleWares...)
}
return func(cmd redis.Cmder) (err error) {
ctx := &WrapProcessContext{client: client}
// set the start time of cmd
ctx.Set(ctxCmdStartTimeKey, time.Now())
// set stress tag
if v, exist := getStressTag(client.ctx); exist {
ctx.Set(ctxStressTagKey, v)
}
processIndex := 0
// process request
for ; err == nil && processIndex < len(middleWares); processIndex++ {
cmd, err = middleWares[processIndex].ProcessRequest(ctx, cmd)
}
cmd.SetErr(err)
if err == nil {
err = next(cmd)
}
// process response
for index := processIndex - 1; index >= 0; index-- {
middleWares[index].ProcessResponse(ctx, cmd, err)
}
return err
}
}
因此可以通过实现 WrapProcessMiddleWare
接口实现,自己的中间件处理方法。
看个例子,比如往 redis
中 key
加入一个前缀。
type RedisStagingMW struct {
UseStress bool
}
func (m RedisStagingMW) ProcessRequest(ctx *goredis.WrapProcessContext, cmder redis.Cmder) (redis.Cmder, error) {
var stressPrefix string
if m.UseStress {
var isStress bool
stressPrefix, isStress = getStressTag(ctx.Client().Context())
if !isStress {
return cmder, nil
}
if !strings.HasSuffix(stressPrefix, "_") {
stressPrefix = stressPrefix + "_"
}
}
stagingPrefix, isStaging := getStagingPrefix(ctx.Client().Context())
if !isStaging {
return cmder, nil
}
cmd := convertStagingCMD(cmder, stressPrefix, stagingPrefix)
return cmd, nil
}
func (RedisStagingMW) ProcessResponse(*goredis.WrapProcessContext, redis.Cmder, error) {}