这篇文章会重点讲述Codis Proxy的初始化,即启动的过程,其实也不完全是Codis Proxy的初始化,应该说是Codis面向用户请求链路的初始化,即经过这一系列的初始化,Codis Proxy才能对外提供服务。
在分析之前先回顾Codis Proxy处理一个请求的大概过程:
1、 Codis Proxy处理客户端连接,解析客户端发送过来的命令;
2、Codis Proxy根据Slot的配置将key进行路由,转发到相应的Redis Group;
3、Redis Group的服务器处理请求后返回,大部分情况是由Group的主节点处理请求,当然如果开启了读、写分离则从节点也会处理读的请求。
所以整个链路的初始化包含以下部分:
1、Proxy的初始化;
2、Redis Group的建立及节点的加入;
3、Slot的分配,即将Slot分配到相应的Redis Group;
今天重点讲下Proxy的初始化。
一、Proxy激活方式
Proxy启动后不会马上变成有效的状态,必须上报自己的信息给Storage(一般配置为Zookeeper),然后才能变成有效的状态。
switch {
case dashboard != "":
go AutoOnlineWithDashboard(s, dashboard)
case coordinator.name != "":
go AutoOnlineWithCoordinator(s, coordinator.name, coordinator.addr, coordinator.auth)
case slots != nil:
go AutoOnlineWithFillSlots(s, slots)
}
上面就是proxy启动的代码,其中有几个条件,如dashboard、coordinator.name(Zookeeper)或slots只有一个不为空就会自动激活(Online),这些参数都是解析命令行得到的:
var dashboard string
if s, ok := utils.Argument(d, "--dashboard"); ok {
dashboard = s
log.Warnf("option --dashboard = %s", s)
}
像下面这样是不会自动激活的:
./codis-proxy --ncpu=4 --config=../config/proxy_19001.toml --log=/log/codis/proxy_19001.log --log-level=INFO
下面是会自动激活的:
./codis-proxy --ncpu=4 --config=../config/proxy_19001.toml --log=/log/codis/proxy_19001.log --log-level=INFO --dashboard=127.0.0.1:18081
两种方式没有好坏,建议不要自动激活;
二、手动激活流程
如果是在Fe中激活的话,大概流程如下:
1、在后台手动添加Proxy
注意:后端的端口号不是Proxy处理客户端请求的地址,而是admin_addr;
添加后前端会调用Fe的/api/topom/proxy/create接口,其中后面两串是用于安全的token和参数
http://127.0.0.1:9090/api/topom/proxy/create/95b62887719520f17e312eaa76d28f2b/172.21.107.77:11080?forward=codis-demo
2、Fe处理逻辑
Fe会将这个请求转发给Dashboard,最终会调用Topom::CreateProxy:
func (s *Topom) CreateProxy(addr string) error {
//调用proxy的/api/proxy/model接口
p, err := proxy.NewApiClient(addr).Model()
if err != nil {
return errors.Errorf("proxy@%s fetch model failed, %s", addr, err)
}
c := s.newProxyClient(p)
if err := s.storeCreateProxy(p); err != nil {
return err
} else {
return s.reinitProxy(ctx, p, c)
}
}
挑重点,Dashboard会先调用/api/proxy/model接口得到admin_addr地址,接下来会根据上一步返回的的admin_addr生成客户端:
func (s *Topom) newProxyClient(p *models.Proxy) *proxy.ApiClient {
c := proxy.NewApiClient(p.AdminAddr)
c.SetXAuth(s.config.ProductName, s.config.ProductAuth, p.Token)
return c
}
然后是更新Zookeeper里的信息:
func (s *Topom) storeCreateProxy(p *models.Proxy) error {
log.Warnf("create proxy-[%s]:\n%s", p.Token, p.Encode())
if err := s.store.UpdateProxy(p); err != nil {
log.ErrorErrorf(err, "store: create proxy-[%s] failed", p.Token)
return errors.Errorf("store: create proxy-[%s] failed", p.Token)
}
return nil
}
func (s *Store) UpdateProxy(p *Proxy) error {
return s.client.Update(s.ProxyPath(p.Token), p.Encode())
}
其中ProxyPath格式为 /Codis3/{product_name}/proxy/proxy-{token}
func ProxyPath(product string, token string) string {
return filepath.Join(CodisDir, product, "proxy", fmt.Sprintf("proxy-%s", token))
}
在我开发机上是这样的:
3、reinitProxy分析
reinitProxy向Proxy发送请求让Proxy建立和后端的连接及设置相关状态:
func (s *Topom) reinitProxy(ctx *context, p *models.Proxy, c *proxy.ApiClient) error {
log.Warnf("proxy-[%s] reinit:\n%s", p.Token, p.Encode())
if err := c.FillSlots(ctx.toSlotSlice(ctx.slots, p)...); err != nil {
//
}
if err := c.Start(); err != nil {
//
}
if err := c.SetSentinels(ctx.sentinel); err != nil {
//
}
return nil
}
这个函数做了3件事:
1)、调用FillSlots方法;
2)、调用Start方法;
3)、设置Sentinel,这个我们先不分析;
3.1 FillSlots方法
先看FillSlots方法,看方法名像是设置Slots,但设置Slots不应该由Proxy处理才怪;通过代码分析,这个最终会调用Proxy的/api/proxy/fillslots接口,对应文件为proxy_api.go中FillSlots方法,后者调用Proxy来处理:
func (s *Proxy) FillSlots(slots []*models.Slot) error {
//省略无关代码
for _, m := range slots {
if err := s.router.FillSlot(m); err != nil {
return err
}
}
return nil
}
s.router.FillSlot最终会调用fillSlot:
func (s *Router) fillSlot(m *models.Slot, switched bool, method forwardMethod) {
//省略一些代码
if addr := m.BackendAddr; len(addr) != 0 {
slot.backend.bc = s.pool.primary.Retain(addr)
slot.backend.id = m.BackendAddrGroupId
}
//省略一些代码
}
可以看到,这里Proxy只是建立每个Slot对应的后端Server的连接:
func (p *sharedBackendConnPool) Retain(addr string) *sharedBackendConn {
if bc := p.pool[addr]; bc != nil {
return bc.Retain()
} else {
bc = newSharedBackendConn(addr, p)
p.pool[addr] = bc
return bc
}
}
3.2 Start方法
最后是Start方法,它会调用Proxy的/api/proxy/start接口,后者会调用Proxy的Start方法:
func (s *Proxy) Start() error {
if s.online {
return nil
}
s.online = true
s.router.Start()
if s.jodis != nil {
s.jodis.Start()
}
return nil
}
这里只是将Proxy的online设为true,当然如果有jodis则会初始化jodis,这 个后面再分析;
而s.router.Start也只是设置一个标志位:
func (s *Router) Start() {
s.mu.Lock()
defer s.mu.Unlock()
if s.closed {
return
}
s.online = true
}
三、总结
1、Proxy激活有自动和手动激活两种方式,自动激活通过在命令行加上dashboard或zookeeper参数;
2、手动激活流程:
Fe转发给Dashboard;
Dashboard先更新Zookeeper里的信息,然后调用Proxy的接口通知其初始化,Proxy做了几件事:
建立每个Slot对应后端Server的连接;
将Proxy和Router的状态设置为激活;
设置Sentinel信息,这个今天我们没有分析。