前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Codis源码分析之Slots迁移异步篇

Codis源码分析之Slots迁移异步篇

作者头像
心平气和
发布2020-12-30 16:08:06
1.1K0
发布2020-12-30 16:08:06
举报
文章被收录于专栏:程序员升级之路

前面分析了Codis关于Slots迁移同步整体内容及同步迁移的一些分析:Codis源码分析之Slots迁移篇,再回顾同步迁移的逻辑:

1、向后端Redis Server发送SLOTSMGRTTAGSLOT命令随机迁移一个key到目标Redis Server中,这个过程会一直持续;

2、如果在迁移过程中如果要针对某个key进行修改,这个key正好在迁移,同步迁移的逻辑是先调用SLOTSMGRTTAGONE 将这个key迁移完成才能处理请求,以保证数据的一致性。

今天我们来分析下什么时候是同步/异步迁移如何设置,异步迁移的流程是怎么样的。

一、迁移方式是在哪里设置的

迁移是同步还是异步是保存在Slot的method字段中:

代码语言:javascript
复制
type Slot struct {
  id   int
  lock struct {
    hold bool
    sync.RWMutex
  }
  refs sync.WaitGroup

  switched bool

  backend, migrate struct {
    id int
    bc *sharedBackendConn
  }
  replicaGroups [][]*sharedBackendConn

  method forwardMethod
}

可以设置为forwardSync和forwardSemiAsync,前者对应同步,后者对应异步,Proxy在初始化时会设置为同步:

代码语言:javascript
复制
func NewRouter(config *Config) *Router {
  s := &Router{config: config}
  s.pool.primary = newSharedBackendConnPool(config, config.BackendPrimaryParallel)
  s.pool.replica = newSharedBackendConnPool(config, config.BackendReplicaParallel)
  for i := range s.slots {
    s.slots[i].id = i
    //默认同步转发
    s.slots[i].method = &forwardSync{}
  }
  return s
}

在context的toSlot方法中会根据ctx.method设置ForwardMethod:

代码语言:javascript
复制
func (ctx *context) toSlot(m *models.SlotMapping, p *models.Proxy) *models.Slot {
  slot := &models.Slot{
    Id:     m.Id,
    Locked: ctx.isSlotLocked(m),

    ForwardMethod: ctx.method,
  }

而ctx.method字段在context初始化时根据配置MigrationMethod生成:

代码语言:javascript
复制
func (s *Topom) newContext() (*context, error) {
  if s.online {
    if err := s.refillCache(); err != nil {
      return nil, err
    } else {
      ctx := &context{}
      ctx.slots = s.cache.slots
      //读取配置
      ctx.method, _ = models.ParseForwardMethod(s.config.MigrationMethod)
      return ctx, nil
    }
  } else {
    return nil, ErrNotOnline
  }
}

这个对应dashboard配置文件中的migration_method:

代码语言:javascript
复制
 # Set arguments for data migration (only accept 'sync' & 'semi-async').
 25 migration_method = "semi-async"

二、同步和异步的处理逻辑的差异

1、处理客户端请求的差别

上面分析了同步还是异步转发取决于配置文件,这个配置是在Slot一级,关于Slot相关操作,包括如何转发后端命令都是有区别的,为了详细地分析差别, 我们看两个实现:forwardSync和forwardSemiAsync的区别,两者的区别如下:

1、forwardSemiAsync增加了重试,因为需要异步等待key迁移完成,对应的是Forward方法;

2、最主要的是process方法,这个方法都由Forward调用;

再回顾下前面讲的内容,一个正常请求的调用链大概如下:

代码语言:javascript
复制
Session::loopReader
Session::handleRequest
Router::dispatch
slot.forward
slot.method.Forward

即请求首先处理读事件,读取客户发送过来的请求数据,按Redis协议编码、解码;然后将给Session的handleRequest,后者再转给Router,Router再交给Slot,而Slot最后交由上面说的两个转发方法。

关于同步的处理上一篇文章 Codis源码分析之Slots迁移篇 已经分析了会检查当前Slot是否在迁移中,如果是则调用SLOTSMGRTTAGONE命令迁移当前key,并且必须等待迁移完成才往下处理请求,以保证数据的一致性:

代码语言:javascript
复制
func (d *forwardSync) process(s *Slot, r *Request, hkey []byte) (*BackendConn, error) {
  //如果正在迁移,查询这个key是否迁移完成
  if s.migrate.bc != nil && len(hkey) != 0 {
    if err := d.slotsmgrt(s, hkey, r.Database, r.Seed16()); err != nil {
      log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', database = %d, error = %s",
        s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, r.Database, err)
      return nil, err
    }
  }
  r.Group = &s.refs
  r.Group.Add(1)
  return d.forward2(s, r), nil
}

再来看异步的处理方法:

代码语言:javascript
复制
func (d *forwardSemiAsync) process(s *Slot, r *Request, hkey []byte) (_ *BackendConn, retry bool, _ error) {
  if s.migrate.bc != nil && len(hkey) != 0 {
    resp, moved, err := d.slotsmgrtExecWrapper(s, hkey, r.Database, r.Seed16(), r.Multi)
    switch {
    case err != nil:
      log.Debugf("slot-%04d migrate from = %s to %s failed: hash key = '%s', error = %s",
        s.id, s.migrate.bc.Addr(), s.backend.bc.Addr(), hkey, err)
      return nil, false, err
    case !moved:
      switch {
      case resp != nil:
        r.Resp = resp
        return nil, false, nil
      }
      return nil, true, nil
    }
  }
  r.Group = &s.refs
  r.Group.Add(1)
  return d.forward2(s, r), false, nil
}

调用slotsmgrtExecWrapper来处理命令:

代码语言:javascript
复制
func (d *forwardHelper) slotsmgrtExecWrapper(s *Slot, hkey []byte, database int32, seed uint, multi []*redis.Resp) (_ *redis.Resp, moved bool, _ error) {
  m := &Request{}
  m.Multi = make([]*redis.Resp, 0, 2+len(multi))
  m.Multi = append(m.Multi,
    redis.NewBulkBytes([]byte("SLOTSMGRT-EXEC-WRAPPER")),
    redis.NewBulkBytes(hkey),
  )
  //省略代码
  }

可以看到是调用redis命令SLOTSMGRT-EXEC-WRAPPER来处理请求,再看下这个命令的C实现:

再跟进去,看slotsmgrtExecWrapperCommand函数

代码语言:javascript
复制
void
slotsmgrtExecWrapperCommand(client *c) {
    //查找命令是否存在
    struct redisCommand *cmd = lookupCommand(c->argv[2]->ptr);
    if (cmd == NULL) {
        addReplyLongLong(c, -1);
        addReplyErrorFormat(c,"invalid command specified (%s)",
                (char *)c->argv[2]->ptr);
        return;
    }
    if ((cmd->arity > 0 && cmd->arity != c->argc - 2) || (c->argc - 2 < -cmd->arity)) {
        addReplyLongLong(c, -1);
        addReplyErrorFormat(c, "wrong number of arguments for command (%s)",
                (char *)c->argv[2]->ptr);
        return;
    }
    if (lookupKeyWrite(c->db, c->argv[1]) == NULL) {
        addReplyLongLong(c, 0);
        addReplyError(c, "the specified key doesn't exist");
        return;
    }
    //如果正在迁移并且当前命令是写命令则返回错误
    if (!(cmd->flags & CMD_READONLY) && getSlotsmgrtAsyncClientMigrationStatusOrBlock(c, c->argv[1], 0) != 0) {
        addReplyLongLong(c, 1);
        addReplyError(c, "the specified key is being migrated");
        return;
    } else {
        addReplyLongLong(c, 2);
        robj **argv = zmalloc(sizeof(robj *) * (c->argc - 2));
        for (int i = 2; i < c->argc; i ++) {
            argv[i - 2] = c->argv[i];
            incrRefCount(c->argv[i]);
        }
        for (int i = 0; i < c->argc; i ++) {
            decrRefCount(c->argv[i]);
        }
        zfree(c->argv);
        c->argc = c->argc - 2;
        c->argv = argv;
        c->cmd = cmd;
        call(c, CMD_CALL_FULL & ~CMD_CALL_PROPAGATE);
    }
}

可以看到SLOTSMGRT-EXEC-WRAPPER会判断当前操作的命令是否为写命令,并且这个key是否在迁移或阻塞中,如果是则返回错误,这种情况下需要Proxy进行重试。

2、迁移数据的差别

前面分析了同步和异步迁移数据调用的方法不同:

代码语言:javascript
复制
switch method {
      case models.ForwardSync:
        do = func() (int, error) {
          return c.MigrateSlot(sid, dest)
        }
      case models.ForwardSemiAsync:
        var option = &redis.MigrateSlotAsyncOption{
          MaxBulks: s.config.MigrationAsyncMaxBulks,
          MaxBytes: s.config.MigrationAsyncMaxBytes.AsInt(),
          NumKeys:  s.config.MigrationAsyncNumKeys,
          Timeout: math2.MinDuration(time.Second*5,
            s.config.MigrationTimeout.Duration()),
        }
        do = func() (int, error) {
          return c.MigrateSlotAsync(sid, dest, option)
        }

同步调用的方法前面已经分析过了,看下异步迁移的方法,即MigrateSlotAsync:

代码语言:javascript
复制
func (c *Client) MigrateSlotAsync(slot int, target string, option *MigrateSlotAsyncOption) (int, error) {
  host, port, err := net.SplitHostPort(target)
  if err != nil {
    return 0, errors.Trace(err)
  }
  if reply, err := c.Do("SLOTSMGRTTAGSLOT-ASYNC", host, port, int(option.Timeout/time.Millisecond),
    option.MaxBulks, option.MaxBytes, slot, option.NumKeys); err != nil {
    return 0, errors.Trace(err)
  } else {
   //
  }
}

可以看到是调用SLOTSMGRTTAGSLOT-ASYNC命令进行迁移,Redis Server实现的逻辑比较复杂这里就不具体分析了,大概过程如下:

1)源Redis对key进行序列化异步发送给目标Redis;

2)目标Redis通过Restore还原后回复给源Redis;

3)源Redis收到目标Redis确认后标记这个key迁移完成,迁移下一个key;

另外说下大key的处理,对于大key,如一个长度为1W的list,Codis会将key分拆成多个命令,因为通过不断的rpush最终的结果一样;

Codis会在每一个拆分后的指令中加上一个临时TTL;

等全部拆分的指令执行成功才会删除本地的key;

因此即使中途迁移失败,已迁移成功的key也会超时自动删除,最终效果就好比迁移没有发生一样。

三、总结

1、同步还是异步迁移取决于dashboard配置文件migration_method;

2、同步和异步有两个区别:

一是处理请求的不同,如果当前要操作的key所属Slot正在迁移,同步处理会发送命令等待后端迁移完成才往下操作,异步则是将当前请求封装成一次SLOTSMGRT-EXEC-WRAPPER调用,并且将操作命令及参数都发送过去,后者会判断这个key是否在迁移或阻塞,如果是并且当前为写命令则直接返回失败,由Proxy重试。

二是迁移逻辑不同,同步会调用SLOTSMGRTTAGSLOT迁移,异步则是调用SLOTSMGRTTAGSLOT-ASYNC,前者每次随机迁移一个key,异步的过程则复杂得多,对于小key需要确认才算迁移完成,对于大key还会分拆成多条命令,以保证不阻塞主流程,并且在拆分后的命令都加上TTL,以保证如果中途失败目标Redis的key会及时清掉而不会产生脏数据。

Codis源码分析之Slots迁移篇

Codis Proxy初始化篇

Codis Proxy是如何处理一个请求的

Raft算法之集群成员变化篇

360 Atlas生产环境使用心得

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-12-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 程序员升级之路 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档