前面我们分析了Codis各组成部件,其中Proxy是用来处理客户端请求的,今天我们具体分析下一次请求在Codis内部是如何处理的。
一、Proxy启动函数
前面我们讲了Proxy启动是通过以下这行代码来启动的:
go s.serveProxy()
这个里面会有接受连接,并处理连接的代码:
go func(l net.Listener) (err error) {
defer func() {
eh <- err
}()
for {
c, err := s.acceptConn(l)
if err != nil {
return err
}
NewSession(c, s.config).Start(s.router)
}
}(s.lproxy)
NewSession会返回一个Session的数据结构,重点看下Start方法:
//此处省略无关紧要代码
go func() {
s.loopWriter(tasks)
decrSessions()
}()
go func() {
s.loopReader(tasks, d)
tasks.Close()
}()
一个Session重点就是上面两个协程,其中一个处理写事件,另一个处理读事件,读、写是相对于数据流的方向的,针对Codis来说,从客户端读取请求数据就是读,把响应返回给客户端就是写。
其中两个协程的函数都有个tasks的参数,这个tasks初始化代码如下:
tasks := NewRequestChanBuffer(1024)
func NewRequestChanBuffer(n int) *RequestChan {
if n <= 0 {
n = DefaultRequestChanBuffer
}
var ch = &RequestChan{
buff: make([]*Request, n),
}
ch.cond = sync.NewCond(&ch.lock)
return ch
}
即tasks是一个RequestChan的结构,其核心就是一个buff的数组,读和写的协程就是通过这个来交换数据,作为任务队列来使用的,即从客户端读取响应后发送给后端Redis Server,并且读取后端Redis Server返回的响应后再将请求写回到这个队列,然后由写的协程将响应写回给客户端。
二、细节分析
下面我们来具体分析实现,先看loopReader,前面讲过这个里面要读取客户端请求过来的命令,并且转发到后端Redis Server:
for !s.quit {
//处理客户端发送过来的数据
multi, err := s.Conn.DecodeMultiBulk()
if err != nil {
return err
}
if len(multi) == 0 {
continue
}
//省略一些代码
r := &Request{}
r.Multi = multi
r.Batch = &sync.WaitGroup{}
r.Database = s.database
r.UnixNano = start.UnixNano()
//转发请求
if err := s.handleRequest(r, d); err != nil {
r.Resp = redis.NewErrorf("ERR handle request, %s", err)
tasks.PushBack(r)
if breakOnFailure {
return err
}
} else {
tasks.PushBack(r)
}
}
其中s.Conn.DecodeMultiBulk即将客户端请求的数据解码后以数组格式返回,举个例子,客户端发送请求:
get ok
则multi是这样的:
可以看到multi第0项成员为get,第1项为ok。
读取到客户端原始请求数据后,Codis然后调用s.handleRequest将数据发送给后端Redis Server,handleRequest里面就是具体的命令转发了:
switch opstr {
case "SELECT":
return s.handleSelect(r)
case "PING":
return s.handleRequestPing(r, d)
case "INFO":
return s.handleRequestInfo(r, d)
case "MGET":
return s.handleRequestMGet(r, d)
case "MSET":
return s.handleRequestMSet(r, d)
case "DEL":
return s.handleRequestDel(r, d)
case "EXISTS":
return s.handleRequestExists(r, d)
case "SLOTSINFO":
return s.handleRequestSlotsInfo(r, d)
case "SLOTSSCAN":
return s.handleRequestSlotsScan(r, d)
case "SLOTSMAPPING":
return s.handleRequestSlotsMapping(r, d)
default:
return d.dispatch(r)
以一个默认的GET命令来说,会走到dispatch这里,
func (s *Router) dispatch(r *Request) error {
hkey := getHashKey(r.Multi, r.OpStr)
var id = Hash(hkey) % MaxSlotNum
slot := &s.slots[id]
return slot.forward(r, hkey)
}
dispatch调用slot.forward(r, hkey),然后调用到forwardSync::Forward
func (d *forwardSync) Forward(s *Slot, r *Request, hkey []byte) error {
s.lock.RLock()
bc, err := d.process(s, r, hkey)
s.lock.RUnlock()
if err != nil {
return err
}
bc.PushBack(r)
return nil
}
d.process只是返回后端的连接,当然还有一些判断是否迁移的逻辑,我们先跳过,返回类型是BackendConn指针,然后又将请求r通过PushBack发送给b.input这个通道:
func (bc *BackendConn) PushBack(r *Request) {
if r.Batch != nil {
r.Batch.Add(1)
}
bc.input <- r
}
请求发给input通道后,又是哪里在处理input通道上的数据呢?BackendConn也有个loopWriter会一直处理input通道中的数据:
c, tasks, err := bc.newBackendReader(round, bc.config)
//省略一些代码
for r := range bc.input {
if r.IsReadOnly() && r.IsBroken() {
bc.setResponse(r, nil, ErrRequestIsBroken)
continue
}
if err := p.EncodeMultiBulk(r.Multi); err != nil {
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
}
if err := p.Flush(len(bc.input) == 0); err != nil {
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
} else {
tasks <- r
}
}
这个协程会处理input中通道的请求发送给后端的Redis Server,处理完后,然后丢给tasks通道,tasks通道又有一个协程在处理,就是BackendConn的loopReader:
for r := range tasks {
resp, err := c.Decode()
if err != nil {
return bc.setResponse(r, nil, fmt.Errorf("backend conn failure, %s", err))
}
//省略一些代码
bc.setResponse(r, resp, nil)
它会处理后端的响应,然后设置到请求的相应字段。
所有这些处理完成后,就是Session的loopWriter将数据发送给客户端了:
return tasks.PopFrontAll(func(r *Request) error {
resp, err := s.handleResponse(r)
if err := p.Encode(resp); err != nil {
return s.incrOpFails(r, err)
}
fflush := tasks.IsEmpty()
//将响应发送给客户端
if err := p.Flush(fflush); err != nil {
return s.incrOpFails(r, err)
} else {
s.incrOpStats(r, resp.Type)
}
return nil
})
三、总结
我们来总结下一个请求处理过程:
1、Session的loopReader协程读取客户端发送过来的数据;
2、在上面读取完数据后,通过slot.forward转发到相应的Server,Codis用BackendConn来表示一个后端连接;
3、BackendConn也有专门处理读、写请求的协程,先由BackendConn::loopWriter将请求发往后端Redis Server;
4、再由BackendConn::loopReader处理后端Redis Server的处理结果;
5、上面处理完后由Session的loopWriter将处理结果发送给客户端。
Proxy请求处理分了2层,一层是前端客户端的连接,由Session模块处理;
第2层是处理与后端Codis Server的连接,由BackendConn处理;
两者都实现了基于读、写事件驱动的异步编程来提高系统的吞吐率,Session是通过队列+锁的方式来传递任务,典型的生产者、消费者模型;而BackendConn则是由通道的方式实现。