前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Codis源码分析之Sentinel

Codis源码分析之Sentinel

作者头像
心平气和
发布2020-12-30 16:08:24
5700
发布2020-12-30 16:08:24
举报
文章被收录于专栏:程序员升级之路

Sentinel是Redis官方的高可用方案,支持静态配置和运行时动态配置,Sentinel本身的机制不是本文重点,有兴趣可以直达官网:

https://redis.io/topics/sentinel

Codis也集成了Sentinel的方案,下面从如何添加Sentinel及Sentiel是如何工作的角度介绍整体流程。

一、添加Sentinel

在Fe中添加Sentinel需要分2步,先是添加:

然后是Sync

添加对应入口为Topom::AddSentinel

代码语言:javascript
复制
func (s *Topom) AddSentinel(addr string) error {
  //
  ctx, err := s.newContext()
  if err != nil {
    return err
  }

  p := ctx.sentinel

  for _, x := range p.Servers {
    if x == addr {
      return errors.Errorf("sentinel-[%s] already exists", addr)
    }
  }

  sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
  if err := sentinel.FlushConfig(addr, s.config.SentinelClientTimeout.Duration()); err != nil {
    return err
  }

  p.Servers = append(p.Servers, addr)
  p.OutOfSync = true
  //更新Zk上保存数据
  return s.storeUpdateSentinel(p)
}

可以看到在做完一些基本的检测后,调用storeUpdateSentinel直接更新Zk中的数据,没有其它逻辑。

再看Sync的逻辑,对应入口为Topom::ResyncSentinels

代码语言:javascript
复制
func (s *Topom) ResyncSentinels() error {
  p := ctx.sentinel
  p.OutOfSync = true
  if err := s.storeUpdateSentinel(p); err != nil {
    return err
  }

  config := &redis.MonitorConfig{
    Quorum:               s.config.SentinelQuorum,
    ParallelSyncs:        s.config.SentinelParallelSyncs,
    DownAfter:            s.config.SentinelDownAfter.Duration(),
    FailoverTimeout:      s.config.SentinelFailoverTimeout.Duration(),
    NotificationScript:   s.config.SentinelNotificationScript,
    ClientReconfigScript: s.config.SentinelClientReconfigScript,
  }

  sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
  //移除所有Master
  if err := sentinel.RemoveGroupsAll(p.Servers, s.config.SentinelClientTimeout.Duration()); err != nil {
    log.WarnErrorf(err, "remove sentinels failed")
  }
  //监听Group
  if err := sentinel.MonitorGroups(p.Servers, s.config.SentinelClientTimeout.Duration(), config, ctx.getGroupMasters()); err != nil {
    log.WarnErrorf(err, "resync sentinels failed")
    return err
  }
  //设置Group Master
  s.rewatchSentinels(p.Servers)

  var fut sync2.Future
  for _, p := range ctx.proxy {
    fut.Add()
    go func(p *models.Proxy) {
      //通知Proxy更新
      err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
      if err != nil {
        log.ErrorErrorf(err, "proxy-[%s] resync sentinel failed", p.Token)
      }
      fut.Done(p.Token, err)
    }(p)
  }
  for t, v := range fut.Wait() {
    switch err := v.(type) {
    case error:
      if err != nil {
        return errors.Errorf("proxy-[%s] sentinel failed", t)
      }
    }
  }

  p.OutOfSync = false
  return s.storeUpdateSentinel(p)
}

重点看注释的几段代码:

1、移除所有Master

对应代码为:

代码语言:javascript
复制
sentinel.RemoveGroupsAll

再跟进去:

代码语言:javascript
复制
func (s *Sentinel) RemoveGroupsAll(sentinels []string, timeout time.Duration) error {
 //
  for i := range sentinels {
    go func(sentinel string) {
      err := s.removeGroupsAllDispatch(cntx, sentinel, timeout)
      if err != nil {
        s.errorf(err, "sentinel-[%s] remove failed", sentinel)
      }
      results <- err
    }(sentinels[i])
  }

 //
}

可以看到,重点是调用removeGroupsAllDispatch:

代码语言:javascript
复制
func (s *Sentinel) removeGroupsAllDispatch(ctx context.Context, sentinel string, timeout time.Duration) error {
  var err = s.dispatch(ctx, sentinel, timeout, func(c *Client) error {
    masters, err := s.mastersCommand(c)
    if err != nil {
      return err
    }
    var names []string
    for gid := range masters {
      names = append(names, s.NodeName(gid))
    }
    return s.removeCommand(c, names)
  })
  //
  return nil
}

先调用mastersCommand得到Master,然后调用removeCommand,mastersCommand调用的是Sentinel masters命令:

代码语言:javascript
复制
values, err := redigo.Values(client.Do("SENTINEL", "masters"))

removeCommand调用SENTINEL remove移除对集群的监控

代码语言:javascript
复制
client.Send("SENTINEL", "remove", name)

2、监听Group

代码语言:javascript
复制
sentinel.MonitorGroups(p.Servers, s.config.SentinelClientTimeout.Duration(), config, ctx.getGroupMasters());

跟进去

代码语言:javascript
复制
func (s *Sentinel) MonitorGroups(sentinels []string, timeout time.Duration, config *MonitorConfig, groups map[int]string) error {
  cntx, cancel := context.WithTimeout(s.Context, timeout)
  defer cancel()

  resolve := make(map[int]*net.TCPAddr)

  var exit = make(chan error, 1)

  go func() (err error) {
    for gid, addr := range groups {
      if err := cntx.Err(); err != nil {
        return errors.Trace(err)
      }
      tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
      resolve[gid] = tcpAddr
    }
    return nil
  }()


  timeout += time.Second * 5
  results := make(chan error, len(sentinels))

  for i := range sentinels {
    go func(sentinel string) {
      err := s.monitorGroupsDispatch(cntx, sentinel, timeout, config, resolve)
      if err != nil {
        s.errorf(err, "sentinel-[%s] monitor failed", sentinel)
      }
      results <- err
    }(sentinels[i])
  }
}

这里先解析地址,然后对sentinel调用monitorGroupsDispatch

代码语言:javascript
复制
  //开启监控
  go func() {
    for gid, tcpAddr := range groups {
      var ip, port = tcpAddr.IP.String(), tcpAddr.Port
      client.Send("SENTINEL", "monitor", s.NodeName(gid), ip, port, config.Quorum)
    }
    if len(groups) != 0 {
      client.Flush()
    }
  }()
  
  //设置参数
  go func() {
    for gid := range groups {
      var args = []interface{}{"set", s.NodeName(gid)}
      if config.ParallelSyncs != 0 {
        args = append(args, "parallel-syncs", config.ParallelSyncs)
      }
      if config.DownAfter != 0 {
        args = append(args, "down-after-milliseconds", int(config.DownAfter/time.Millisecond))
      }
      if config.FailoverTimeout != 0 {
        args = append(args, "failover-timeout", int(config.FailoverTimeout/time.Millisecond))
      }
      if s.Auth != "" {
        args = append(args, "auth-pass", s.Auth)
      }
      if config.NotificationScript != "" {
        args = append(args, "notification-script", config.NotificationScript)
      }
      if config.ClientReconfigScript != "" {
        args = append(args, "client-reconfig-script", config.ClientReconfigScript)
      }
      client.Send("SENTINEL", args...)
    }
    if len(groups) != 0 {
      client.Flush()
    }
  }()

主要做了2件事情况,一是调用Sentinel monitor开启对Group监控,二是设置各项参数,这些都在Codis配置文件中有对应配置项,这里不详述。

3、设置Group Master

代码语言:javascript
复制
 s.rewatchSentinels(p.Servers)

进入函数看具体逻辑:

代码语言:javascript
复制
func (s *Topom) rewatchSentinels(servers []string) {
 
    s.ha.monitor = redis.NewSentinel(s.config.ProductName, s.config.ProductAuth)
 
    go func(p *redis.Sentinel) {
      var trigger = make(chan struct{}, 1)
      delayUntil := func(deadline time.Time) {
        for !p.IsCanceled() {
          var d = deadline.Sub(time.Now())
          if d <= 0 {
            return
          }
          time.Sleep(math2.MinDuration(d, time.Second))
        }
      }
      
      go func() {
        defer close(trigger)
        callback := func() {
          select {
          case trigger <- struct{}{}:
          default:
          }
        }
        
        for !p.IsCanceled() {
          timeout := time.Minute * 15
          retryAt := time.Now().Add(time.Second * 10)
          //订阅切主信息
          if !p.Subscribe(servers, timeout, callback) {
            delayUntil(retryAt)
          } else {
            callback()
          }
        }
      }()
      
      go func() {
        for range trigger {
          var success int
          for i := 0; i != 10 && !p.IsCanceled() && success != 2; i++ {
            timeout := time.Second * 5
            //得到最新的Master
            masters, err := p.Masters(servers, timeout)
            if err != nil {
              log.WarnErrorf(err, "fetch group masters failed")
            } else {
              if !p.IsCanceled() {
                //切主
                s.SwitchMasters(masters)
              }
              success += 1
            }
            delayUntil(time.Now().Add(time.Second * 5))
          }
        }
      }()
    }(s.ha.monitor)
  

先调用Subscribe订阅选主消息:

代码语言:javascript
复制
p.Subscribe(servers, timeout, callback)

跟进去可以看实现:

代码语言:javascript
复制
  var channels = []interface{}{"+switch-master"}
  go func() {
    client.Send("SUBSCRIBE", channels...)
    client.Flush()
  }()

即调用SUBSCRIBE命令来订阅切主信息,订阅的channels是switch-master,官方的解释是主的地址发生变更会往这个通道发送消息:

在订阅之后调用p.Masters,即SENTINEL masters得到最新的Master:

最后更新Master,这里的逻辑主要是更新Zk里的信息和内存中的对象。

4、通知Proxy更新

代码语言:javascript
复制
err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
代码语言:javascript
复制
s.ha.servers = servers
s.rewatchSentinels(s.ha.servers)

这里主要更新ha里的信息,然后调用rewatchSentinels来更新主Redis信息,逻辑和上面rewatchSentinels的逻辑一样,没看明白这里为什么还要来1次,可能只是为了让Proxy有机会更新内存中对象最新的状态。

总结下,Sync的逻辑如下:

1、移除现有Master

先调用SENTINEL masters得到集群信息,再针对集群调用SENTINEL remove移除监控。

2、重新监听Group

通过调用Sentinel monitor完成。

3、重新设置Group Master

订阅切主通道消息,然后SENTINEL masters得到最新Master,再更新ZK中的数据。

4、通知Proxy更新

主要更新Proxy内存中Master一些信息

之所以需要分2步,而不是像Slots迁移那样做到自动根据状态切换,可能是想降低系统复杂度,因为这块不是系统核心流程,并且是后台操作,多点下鼠标而已。

二、自动切主

Sentinel也会在某个Group的Master挂掉的时候自动切换Master,在Dashboard启动时会调用Topom::Start,后者调用rewatchSentinels

代码语言:javascript
复制
s.rewatchSentinels(ctx.sentinel.Servers)

这个方法前面已经分析过,主要订阅Sentinel发送的切主消息,收到相关消息后更新Zk上相关信息,就可以做到主、从自动切换了。

Codis源码分析之Slots迁移异步篇

Codis Proxy是如何处理一个请求的

Raft算法之快照篇

Skywalking Php注册不上问题排查

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员升级之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档