前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Codis源码分析之Slots迁移篇

Codis源码分析之Slots迁移篇

作者头像
心平气和
发布2020-12-14 10:55:24
1.7K0
发布2020-12-14 10:55:24
举报
文章被收录于专栏:程序员升级之路

一、Slots迁移的场景&主要面临的问题

为什么需要Slots迁移,或者说在什么场景下需要迁移?主要是为了扩容,Codis以Slot为单位将整个集群分成了1024个Slots,因此如果在运行过程中想增加服务器,就需要将原有的一些Slots迁移到新的服务器上。

迁移主要的问题:

1、Slot中Key的处理

一个Slot下可能有很多key,因此整个Slot迁移是需要时间的,因此整个Slot在迁移过程中key就有不同的情况,有的正在迁,有的还没迁,有的则已经迁走,针对这些不同状态的key, Codis是如何保证数据的一致性的。

2、大key的处理

像一个list或hash,可能成员成千上万,如何保证迁移的原子性和一致性。

3、如何保障系统的可用性

因为迁移是耗时的,是用同步还是异步,如何在系统可用性和数据一致性做权衡?

二、迁移代码分析

1、入口

迁移一般在Fe界面上由管理员发起,一般来说是迁移一个范围:

后端入口为apiServer::SlotCreateActionRange,这个函数只做一些基本的参数验证,实际调用Topom的SlotCreateActionRange:

代码语言:javascript
复制
func (s *Topom) SlotCreateActionRange(beg, end int, gid int, must bool) error {
  //省略一些代码

  var pending []int
  for sid := beg; sid <= end; sid++ {
    m, err := ctx.getSlotMapping(sid)
    if err != nil {
      return err
    }
    if m.Action.State != models.ActionNothing {
      if !must {
        continue
      }
      return errors.Errorf("slot-[%d] action already exists", sid)
    }
    if m.GroupId == g.Id {
      if !must {
        continue
      }
      return errors.Errorf("slot-[%d] already in group-[%d]", sid, g.Id)
    }
    pending = append(pending, m.Id)
  }

  for _, sid := range pending {
    m, err := ctx.getSlotMapping(sid)
    if err != nil {
      return err
    }
    defer s.dirtySlotsCache(m.Id)
    //更改状态
    m.Action.State = models.ActionPending
    m.Action.Index = ctx.maxSlotActionIndex() + 1
    m.Action.TargetId = g.Id
    //更新Zookeeper的状态
    if err := s.storeUpdateSlotMapping(m); err != nil {
      return err
    }
  }
  return nil
}

先会检查一些状态,如该Slot是否正在迁移,目标Group和当前Group是否一致,后面重点逻辑是将状态改为ActionPending,然后保存到Zk中就返回给用户了。

到上面肯定还没迁完,应该是有后台程序扫描这个状态然后进行迁移,这个入口为Topom::ProcessSlotAction,这个协程随着Dashboard启动的时候启动:

代码语言:javascript
复制
go func() {
    for !s.IsClosed() {
      if s.IsOnline() {
        if err := s.ProcessSlotAction(); err != nil {
          log.WarnErrorf(err, "process slot action failed")
          time.Sleep(time.Second * 5)
        }
      }
      time.Sleep(time.Second)
    }
  }()

具体代码如下:

代码语言:javascript
复制
func (s *Topom) ProcessSlotAction() error {
  for s.IsOnline() {
    var (
      marks = make(map[int]bool)
      plans = make(map[int]bool)
    )
    var accept = func(m *models.SlotMapping) bool {
      if marks[m.GroupId] || marks[m.Action.TargetId] {
        return false
      }
      if plans[m.Id] {
        return false
      }
      return true
    }
    var update = func(m *models.SlotMapping) bool {
      if m.GroupId != 0 {
        marks[m.GroupId] = true
      }
      marks[m.Action.TargetId] = true
      plans[m.Id] = true
      return true
    }
    var parallel = math2.MaxInt(1, s.config.MigrationParallelSlots)
    for parallel > len(plans) {
    //状态转移在这里完成
      _, ok, err := s.SlotActionPrepareFilter(accept, update)
      if err != nil {
        return err
      } else if !ok {
        break
      }
    }
    if len(plans) == 0 {
      return nil
    }
    var fut sync2.Future
    for sid, _ := range plans {
      fut.Add()
      go func(sid int) {
        log.Warnf("slot-[%d] process action", sid)
        //重点,真正的数据迁移
        var err = s.processSlotAction(sid)
        if err != nil {
          status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err)
          s.action.progress.status.Store(status)
        } else {
          s.action.progress.status.Store("")
        }
        fut.Done(strconv.Itoa(sid), err)
      }(sid)
    }
    for _, v := range fut.Wait() {
      if v != nil {
        return v.(error)
      }
    }
    time.Sleep(time.Millisecond * 10)
  }
  return nil
}
代码语言:javascript
复制
func (s *Topom) SlotActionPrepareFilter(accept, update func(m *models.SlotMapping) bool) (int, bool, error) {
  //省略一些代码

  switch m.Action.State {

  case models.ActionPending:
    m.Action.State = models.ActionPreparing
    if err := s.storeUpdateSlotMapping(m); err != nil {
      return 0, false, err
    }

    fallthrough
  case models.ActionPreparing:
    m.Action.State = models.ActionPrepared
    if err := s.resyncSlotMappings(ctx, m); err != nil {
      return 0, false, err
    }
    if err := s.storeUpdateSlotMapping(m); err != nil {
      return 0, false, err
    }

    fallthrough
  case models.ActionPrepared:
    m.Action.State = models.ActionMigrating
    if err := s.resyncSlotMappings(ctx, m); err != nil {
      log.Warnf("slot-[%d] resync to migrating failed", m.Id)
      return 0, false, err
    }
    if err := s.storeUpdateSlotMapping(m); err != nil {
      return 0, false, err
    }

    fallthrough
  case models.ActionMigrating:
    return m.Id, true, nil
  case models.ActionFinished:
    return m.Id, true, nil
  default:
    return 0, false, errors.Errorf("slot-[%d] action state is invalid", m.Id)
  }
}

可以看到整个的状态变换过程如下:

ActionPending =》ActionPreparing =》ActionPrepared

=> ActionMigrating => ActionFinished

在ActionMigrating之前变更都只是更新Zk中的状态,ActionPreparing和ActionPrepared还会调用resyncSlotMappings通过Proxy重连新的Redis Server并且设置slot从哪迁移等信息:

代码语言:javascript
复制
case models.ActionPrepared:
    fallthrough
  case models.ActionMigrating:
    slot.BackendAddr = ctx.getGroupMaster(m.Action.TargetId)
    slot.BackendAddrGroupId = m.Action.TargetId
    slot.MigrateFrom = ctx.getGroupMaster(m.GroupId)
    slot.MigrateFromGroupId = m.GroupId

然后看实际的数据迁移是怎么发生的,回到ProcessSlotAction方法

代码语言:javascript
复制
var err = s.processSlotAction(sid)
代码语言:javascript
复制
func (s *Topom) processSlotAction(sid int) error {
  var db int = 0
  for s.IsOnline() {
    if exec, err := s.newSlotActionExecutor(sid); err != nil {
      return err
    } else if exec == nil {
      time.Sleep(time.Second)
    } else {
      n, nextdb, err := exec(db)
      if err != nil {
        return err
      }
      log.Debugf("slot-[%d] action executor %d", sid, n)
      //迁移完成判断
      if n == 0 && nextdb == -1 {
        return s.SlotActionComplete(sid)
      }
      status := fmt.Sprintf("[OK] Slot[%04d]@DB[%d]=%d", sid, db, n)
      s.action.progress.status.Store(status)

      if us := s.GetSlotActionInterval(); us != 0 {
        time.Sleep(time.Microsecond * time.Duration(us))
      }
      db = nextdb
    }
  }
  return nil
}

通过newSlotActionExecutor得到执行器,

代码语言:javascript
复制
switch method {
      case models.ForwardSync:
        do = func() (int, error) {
          return c.MigrateSlot(sid, dest)
        }
      case models.ForwardSemiAsync:
        var option = &redis.MigrateSlotAsyncOption{
          MaxBulks: s.config.MigrationAsyncMaxBulks,
          MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(),
          NumKeys:  s.config.MigrationAsyncNumKeys,
          Timeout: math2.MinDuration(time.Second*5,
            s.config.MigrationTimeout.Duration()),
        }
        do = func() (int, error) {
          return c.MigrateSlotAsync(sid, dest, option)
        }

可以看到迁移分同步和异步,看同步:

代码语言:javascript
复制
func (c *Client) MigrateSlot(slot int, target string) (int, error) {
  host, port, err := net.SplitHostPort(target)
  if err != nil {
    return 0, errors.Trace(err)
  }
  mseconds := int(c.Timeout / time.Millisecond)
  if reply, err := c.Do("SLOTSMGRTTAGSLOT", host, port, mseconds, slot); err != nil {
    return 0, errors.Trace(err)
  } else {
    p, err := redigo.Ints(redigo.Values(reply, nil))
    if err != nil || len(p) != 2 {
      return 0, errors.Errorf("invalid response = %v", reply)
    }
    return p[1], nil
  }
}

可以看到如果是同步迁移会调用SLOTSMGRTTAGSLOT命令进行迁移,这是一个Codis对Redis改造的命令,会随机迁移Slot下一个Key,所以在上面有判断是否迁移完成的:

代码语言:javascript
复制
func (s *Topom) processSlotAction(sid int) error {
  var db int = 0
  for s.IsOnline() {
    if exec, err := s.newSlotActionExecutor(sid); err != nil {
      return err
    } else {
      n, nextdb, err := exec(db)
      //迁移完成判断
      if n == 0 && nextdb == -1 {
        return s.SlotActionComplete(sid)
      }
  
    }
  }
  return nil
}

即命令返回2个参数(第3个异常忽略),第1个表示迁移的数量,第2个表示下一个要迁移的数据库,如果前者为0后者为-1则表示迁移完成。迁移完成后调用SlotActionComplete标记迁移完成

代码语言:javascript
复制
case models.ActionFinished:

    log.Warnf("slot-[%d] resync to finished", m.Id)

    if err := s.resyncSlotMappings(ctx, m); err != nil {
      log.Warnf("slot-[%d] resync to finished failed", m.Id)
      return err
    }
    defer s.dirtySlotsCache(m.Id)

    m = &models.SlotMapping{
      Id:      m.Id,
      GroupId: m.Action.TargetId,
    }
    return s.storeUpdateSlotMapping(m)

2、迁移过程中Slot的key的读写处理

前面分析Proxy代码的时候讲过,一个请求会进入到Session的handleRequest:

代码语言:javascript
复制
func (s *Session) handleRequest(r *Request, d *Router) error {
  opstr, flag, err := getOpInfo(r.Multi)
  if err != nil {
    return err
  }
  r.OpStr = opstr
  r.OpFlag = flag
  r.Broken = &s.broken

  if flag.IsNotAllowed() {
    return fmt.Errorf("command '%s' is not allowed", opstr)
  }

  switch opstr {
  case "QUIT":
    return s.handleQuit(r)
  case "AUTH":
    return s.handleAuth(r)
  }

  if !s.authorized {
    if s.config.SessionAuth != "" {
      r.Resp = redis.NewErrorf("NOAUTH Authentication required")
      return nil
    }
    s.authorized = true
  }

  switch opstr {
  case "SELECT":
    return s.handleSelect(r)
  case "PING":
    return s.handleRequestPing(r, d)
  case "INFO":
    return s.handleRequestInfo(r, d)
  case "MGET":
    return s.handleRequestMGet(r, d)
  case "MSET":
    return s.handleRequestMSet(r, d)
  case "DEL":
    return s.handleRequestDel(r, d)
  case "EXISTS":
    return s.handleRequestExists(r, d)
  case "SLOTSINFO":
    return s.handleRequestSlotsInfo(r, d)
  case "SLOTSSCAN":
    return s.handleRequestSlotsScan(r, d)
  case "SLOTSMAPPING":
    return s.handleRequestSlotsMapping(r, d)
  default:
    return d.dispatch(r)
  }
}

默认会走到d.dispatch,如果是同步的话会走下面的逻辑:

代码语言:javascript
复制
func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) {
  if s.migrate.bc != nil && len(hkey) != 0 {
    if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil {
      log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
        s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
      return nil, err
    }
  }
  r.Group = &s.refs
  r.Group.Add(1)
  return d.forward2(s, r), nil
}

如果slot正在迁移会调用slotsmgrt处理,

代码语言:javascript
复制
unc (d *forwardHelper) slotsmgrt(s *Slot, hkey []byte, database int32, seed uint) error {
  m := &Request{}
  m.Multi = []*redis.Resp{
    redis.NewBulkBytes([]byte("SLOTSMGRTTAGONE")),
    redis.NewBulkBytes(s.backend.bc.host),
    redis.NewBulkBytes(s.backend.bc.port),
    redis.NewBulkBytes([]byte("3000")),
    redis.NewBulkBytes(hkey),
  }

可以看到,如果当前处理的key所属的slot正在迁移,则调用SLOTSMGRTTAGSLOT命令将这个key迁移完成再返回给客户端,即必须要迁移这个key完成才返回给客户端。

三、总结

1、Slots迁移由管理员在Fe手动发起,发起后Codis只是将Slot状态变成

ActionPending;

2、Codis后台线程会扫描上述状态的Slots,依次进行以下状态的转换:

ActionPending => ActionPreparing => ActionPrepared =>

ActionMigrating;

3、ActionMigrating状态的Slots由Codis向Redis Server发送SLOTSMGRTTAGSLOT命令随机迁移一个key,这个过程会一直持续,直到Slot下所有Key迁移完成;

4、迁移过程中的Slot下的操作如果是同步则会先等待key迁移操作完成才往下操作,只要下层Redis Server执行是原子的,则可以保证整个过程的原子性。

可以看到,整个过程还是比较复杂的,特别是一些核心逻辑在Redis Server了,在Redis Server层如何保证操作的原子性和一致性,这个和异步迁移后面另外再讲述。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员升级之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档