前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Codis Proxy是如何处理一个请求的

Codis Proxy是如何处理一个请求的

作者头像
心平气和
发布2020-11-25 15:24:09
9320
发布2020-11-25 15:24:09
举报

前面我们分析了Codis各组成部件,其中Proxy是用来处理客户端请求的,今天我们具体分析下一次请求在Codis内部是如何处理的。

一、Proxy启动函数

前面我们讲了Proxy启动是通过以下这行代码来启动的:

代码语言:javascript
复制
go s.serveProxy()

这个里面会有接受连接,并处理连接的代码:

代码语言:javascript
复制
go func(l net.Listener) (err error) {
    defer func() {
      eh <- err
    }()
    for {
      c, err := s.acceptConn(l)
      if err != nil {
        return err
      }
      NewSession(c, s.config).Start(s.router)
    }
  }(s.lproxy)

NewSession会返回一个Session的数据结构,重点看下Start方法:

代码语言:javascript
复制
//此处省略无关紧要代码
go func() {
      s.loopWriter(tasks)
      decrSessions()
    }()

    go func() {
      s.loopReader(tasks, d)
      tasks.Close()
    }()

一个Session重点就是上面两个协程,其中一个处理写事件,另一个处理读事件,读、写是相对于数据流的方向的,针对Codis来说,从客户端读取请求数据就是读,把响应返回给客户端就是写。

其中两个协程的函数都有个tasks的参数,这个tasks初始化代码如下:

代码语言:javascript
复制
tasks := NewRequestChanBuffer(1024)

func NewRequestChanBuffer(n int) *RequestChan {
  if n <= 0 {
    n = DefaultRequestChanBuffer
  }
  var ch = &RequestChan{
    buff: make([]*Request, n),
  }
  ch.cond = sync.NewCond(&ch.lock)
  return ch
}

即tasks是一个RequestChan的结构,其核心就是一个buff的数组,读和写的协程就是通过这个来交换数据,作为任务队列来使用的,即从客户端读取响应后发送给后端Redis Server,并且读取后端Redis Server返回的响应后再将请求写回到这个队列,然后由写的协程将响应写回给客户端。

二、细节分析

下面我们来具体分析实现,先看loopReader,前面讲过这个里面要读取客户端请求过来的命令,并且转发到后端Redis Server:

代码语言:javascript
复制
for !s.quit {
    //处理客户端发送过来的数据
    multi, err := s.Conn.DecodeMultiBulk()
    if err != nil {
      return err
    }
    if len(multi) == 0 {
      continue
    }
   //省略一些代码

    r := &Request{}
    r.Multi = multi
    r.Batch = &sync.WaitGroup{}
    r.Database = s.database
    r.UnixNano = start.UnixNano()
    //转发请求
    if err := s.handleRequest(r, d); err != nil {
      r.Resp = redis.NewErrorf("ERR handle request, %s", err)
      tasks.PushBack(r)
      if breakOnFailure {
        return err
      }
    } else {
      tasks.PushBack(r)
    }
  }

其中s.Conn.DecodeMultiBulk即将客户端请求的数据解码后以数组格式返回,举个例子,客户端发送请求:

代码语言:javascript
复制
get ok

则multi是这样的:

可以看到multi第0项成员为get,第1项为ok。

读取到客户端原始请求数据后,Codis然后调用s.handleRequest将数据发送给后端Redis Server,handleRequest里面就是具体的命令转发了:

代码语言:javascript
复制
switch opstr {
  case "SELECT":
    return s.handleSelect(r)
  case "PING":
    return s.handleRequestPing(r, d)
  case "INFO":
    return s.handleRequestInfo(r, d)
  case "MGET":
    return s.handleRequestMGet(r, d)
  case "MSET":
    return s.handleRequestMSet(r, d)
  case "DEL":
    return s.handleRequestDel(r, d)
  case "EXISTS":
    return s.handleRequestExists(r, d)
  case "SLOTSINFO":
    return s.handleRequestSlotsInfo(r, d)
  case "SLOTSSCAN":
    return s.handleRequestSlotsScan(r, d)
  case "SLOTSMAPPING":
    return s.handleRequestSlotsMapping(r, d)
  default:
    return d.dispatch(r)

以一个默认的GET命令来说,会走到dispatch这里,

代码语言:javascript
复制
func (s *Router) dispatch(r *Request) error {
  hkey := getHashKey(r.Multi, r.OpStr)
  var id = Hash(hkey) % MaxSlotNum
  slot := &s.slots[id]
  return slot.forward(r, hkey)
}

dispatch调用slot.forward(r, hkey),然后调用到forwardSync::Forward

代码语言:javascript
复制
代码语言:javascript
复制
func (d *forwardSync) Forward(s *Slot, r *Request, hkey []byte) error {
  s.lock.RLock()
  bc, err := d.process(s, r, hkey)
  s.lock.RUnlock()
  if err != nil {
    return err
  }
  bc.PushBack(r)
  return nil
}

d.process只是返回后端的连接,当然还有一些判断是否迁移的逻辑,我们先跳过,返回类型是BackendConn指针,然后又将请求r通过PushBack发送给b.input这个通道:

代码语言:javascript
复制
func (bc *BackendConn) PushBack(r *Request) {
  if r.Batch != nil {
    r.Batch.Add(1)
  }
  bc.input <- r
}

请求发给input通道后,又是哪里在处理input通道上的数据呢?BackendConn也有个loopWriter会一直处理input通道中的数据:

代码语言:javascript
复制
c, tasks, err := bc.newBackendReader(round, bc.config)
//省略一些代码
for r := range bc.input {
    if r.IsReadOnly() && r.IsBroken() {
      bc.setResponse(r, nil, ErrRequestIsBroken)
      continue
    }
    if err := p.EncodeMultiBulk(r.Multi); err != nil {
      return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
    }
    if err := p.Flush(len(bc.input) == 0); err != nil {
      return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
    } else {
      tasks <- r
    }
  }

这个协程会处理input中通道的请求发送给后端的Redis Server,处理完后,然后丢给tasks通道,tasks通道又有一个协程在处理,就是BackendConn的loopReader:

代码语言:javascript
复制
for r := range tasks {
    resp, err := c.Decode()
    if err != nil {
      return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
    }
    //省略一些代码
    bc.setResponse(r, resp, nil)

它会处理后端的响应,然后设置到请求的相应字段。

所有这些处理完成后,就是Session的loopWriter将数据发送给客户端了:

代码语言:javascript
复制
return tasks.PopFrontAll(func(r *Request) error {
    resp, err := s.handleResponse(r)
    if err := p.Encode(resp); err != nil {
      return s.incrOpFails(r, err)
    }
    fflush := tasks.IsEmpty()
    //将响应发送给客户端
    if err := p.Flush(fflush); err != nil {
      return s.incrOpFails(r, err)
    } else {
      s.incrOpStats(r, resp.Type)
    }
    return nil
  })

三、总结

我们来总结下一个请求处理过程:

1、Session的loopReader协程读取客户端发送过来的数据;

2、在上面读取完数据后,通过slot.forward转发到相应的Server,Codis用BackendConn来表示一个后端连接;

3、BackendConn也有专门处理读、写请求的协程,先由BackendConn::loopWriter将请求发往后端Redis Server;

4、再由BackendConn::loopReader处理后端Redis Server的处理结果;

5、上面处理完后由Session的loopWriter将处理结果发送给客户端。

Proxy请求处理分了2层,一层是前端客户端的连接,由Session模块处理;

第2层是处理与后端Codis Server的连接,由BackendConn处理;

两者都实现了基于读、写事件驱动的异步编程来提高系统的吞吐率,Session是通过队列+锁的方式来传递任务,典型的生产者、消费者模型;而BackendConn则是由通道的方式实现。

Codis源码分析之环境篇

Raft算法之客户端交互篇

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-18,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员升级之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档