数据处理在 Session-Based 场景中的一大特点是:同一会话产生的数据应投递到同一处理单元中。 不同于无状态服务,其难点在于:当集群发生扩缩容时,除了要保证负载均衡,更要保证数据到处理单元的稳定映射。 本文从经典案例入手,介绍 Sharding 技术是如何解决该问题的,并结合 kafka consumer 浅谈应用方法。
数据流处理面临四个基本问题(Tyler Akidau, Slava Chernyak & Reuven lax. Streaming Systems: The What, Where, When and How of Large-Scale Data Processing):
Session 是 Where 问题的一种表现形式:事件并非相互独立,而是存在层级关系,某些事件隶属同一上下文,需作为整体处理才能得到正确结果。比如,用户在线时的所有操作,属于同一 session;它以用户登录为起点,下线为终点。同一用户同一时刻只能保留唯一 session ,因为一些服务可能依赖于全局状态。下面结合游戏场景举个具体例子。
MOBA 或其它竞技类型游戏,核心玩法以单局形式存在,每个单局可视为一个 session。单局除了基本游戏体验,还有一些旁路功能,比如判定作弊、消极比赛等。
Judge Cluster 是消极比赛裁决服务集群,包含多个进程实例(Instance)。每个 Instance 上运行多个裁决器,裁决器和具体单局 Game i 一一对应。 Game i, Event j 表示单局 i 中的第 j 条事件。消极判定依赖事件上下文,所以单局事件须 完整、有序 交给 同一对应裁决器 处理。因为判定逻辑依赖行为的因果关系,因果关系既和 Event Time 顺序有关,也和事件完整性有关。比如角色在战斗过程中死亡,是正常行为;但如果站着不动让别人攻击,那就有消极嫌疑。这种场景对架构设计提出以下要求:
这几条要求代表了 Session 场景数据投递的技术点,下文对其抽象,尝试提炼通用解决方案。
核心问题只有一个: 确保 Session 中的消息被正确投递到对应的 Entity 中 。看似简单,但 Entity 的载体是 Nodes ,Nodes 受各种条件影响变化几乎是不可避免的。难点就转换为:怎么保证 Entity 的逻辑地址和物理地址(所属 Node 地址)解耦,不变的逻辑地址如何自动化更新对应的物理地址。
我们先来看下,哪些情况会引起 Nodes 和 Entity 映射关系的变化。
Rebalance 时更新 Nodes 和 Entity 的映射关系,需要考虑的因素和应用场景相关,常见的有:
上图展示了 session 场景下数据路由的基本流程,总结起来 核心环节 有两个:
下面我们来看看解决方案。
上述四种方法,各有适用场景,相互间也并非完全独立可以组合使用。为了简化问题,下文将针对 (d) Cluster 内路由 说明 Sharding 的一种实现方案。在这种场景下,外界对 Cluster 访问简化为:
而 Session 消息到 Entity 的稳定路由则全部由 Cluster 内部的 Sharding 机制解决。
Cluster Sharding 在集群内部实现消息路由,无论哪个节点接收到外部投递的消息,都应准确发送到目标 Entity 所属 Node。这需要提根据消息自身内容获取 Node 位置,并随着 Cluster 伸缩自适应更新。这种方式对外屏蔽了路由的复杂性,而且 Cluster 本来就要处理消息,可以做各种自定义逻辑,提高了灵活性。下面本文参考Akka Cluster 介绍 Sharding 的体系结构和术语,然后描述 routing, rebalance 过程。
符号 | 全称 | 说明 |
---|---|---|
ST | Sharding Type | 一个独立的 Sharding 体系,由两个因素决定:处理的消息类型及名称 |
E | Entity | 消息处理实体,一个 session 的所有消息均投递给该 Entity 处理 |
S | Shard | 包含一组相同功能的 Entity,一个 Shard 只能位于同一 Node 上,负责内部 Entity 创建销毁及路由 |
SC | Shard Coordinator | 每个 ST 一个,负责当前体系内 Shard 与 Node 映射关系的维护:Allocate, Rebalance |
SR | Shard Region | 每个 Node 一个,负责 Shard 创建和路由,本地 Shard 直发,远程 Shard 转发 |
一个 Sharding 体系包含:Entity, Shard, Shard Region, Shard Coordinator 四个模块。Sharding 体系的区分取决于逻辑功能。比如:单局战斗事件和聊天事件,处理逻辑显然不同,通过消息类型即可区分;但同样是战斗事件,可用于消极行为裁决,也可用于数据统计,这就需要用名称区分。所以独立的 Sharding 体系可以将 消息类型 和 名称 组合作为唯一标记。Sharding 体系确定后,就定义了一类具有相同功能的 Entity 集合,这里用 EntityTypeKeyMsgType 来表示。MsgType 表示这类 Entity 可以处理的消息类型,name 是字符串标记。
Entity 数量可能很多,百万用户在线时一个场景通常会有数十万。直接对 Entity 管理代价很高,所以模拟现实世界中组织架构的方式 分层 。将 Entity 分成若干组,以组为基本单位管理,这就是 Shard 。对 Entity 的访问退化为 Shard 访问,粒度从细变粗。考虑容灾、扩展等多方面因素,Shards 都不应存放于同一 Node,一定分散在不同 Nodes 上。这时就面临两个问题:
这两个问题就分别需要 Shard Coordinator 和 Shard Region 解决。
Shard 数量通常是固定的,每个 Shard 中 Entity 的数量是动态变化的。这点 非常关键,这为 Entity 到 Shard 的稳定映射提供了可能。因为 Node 是物理存在,它的变化是不可避免的,Shard 是逻辑存在,可以稳定不变。这样可以保证 Session 和 Node 解耦,简化了 Session 到 Shard 映射复杂度。为了保证 Shard 能够较好均匀的分布在所有 Nodes 上,可以将 Shard 数量设置为 Nodes 总数的较大上限,比如 Nodes 数量的十倍。
该体系的具体运作方式,我们结合两个关键流程介绍:路由和再平衡。
路由有两个过程:不存在时创建;存在时转发。根据目的地有两种场景:目标 shard 位于收到消息的 Node 本地;目标 shard 所在 Node 并非收到消息的 Node 。下面分别介绍这两种场景。
上述过程描述了,Node 收到消息并在本地创建 Shard 和 Entity 的过程;如果已经创建好,那么本地路由就不必再和 SC 交互,直接转发给本地 Shard 即可,因为 SR 保留了本地路由信息。
远程路由和本地路由的大概流程类似,区别点在于:当 SR_A 向 SC 查询后,发现目标 Shard(S2)在节点 B 上,那么将消息转发给 Node B。SR_B 在本地完成 S2 的创建和路由。在这种情况下,消息投递多了一次 Hop。如果要考虑优化,有两个方面:
Cluster 内部 Nodes 状态变化,比如增加、移除、不可达,必然涉及 Shard 和 Node 映射关系的调整,这就是再平衡。再平衡具体包含两种情况:
因此再平衡主要考虑 两个问题 :
如果 Shard 中 Entity 是有状态的,且状态不可丢,那么需要将 Entity 的状态同步到新节点。不过同步方式和业务场景密切相关,不应该也没必要由 Shard 底层提供解决方案,只要做好流程控制即可。一般有两种方式:
实际应用时建议结合需求处理:
细心的读者,可能会发现一个问题。如果任何一个 Session 的数据随意发送给任何一个 Node,虽然最终路由给正确的 Entity,但顺序是无法保证的。这实际上是网络通信的基本问题。从 A 到 B,如果有多条通路,那么无法保证 A 发送消息的顺序和 B 接收消息的顺序一致。这时有两种解决方案:
下面结合消极行为裁决,我们来看看如何利用 sharding 解决各种问题。
我们再回顾下游戏单局数据处理的例子。单局数据有多个应用场景,比如:单局结算、玩家生涯指标、大盘数据统计、消极行为裁决。这是典型的 Publish-Subscribe 场景,可以使用 Kafka 保存单局事件,做到不同服务间的解耦。由于消极行为裁决要求保留消息的原始顺序,所以在通信链路上应该保证唯一性。
Game Cluster 是单局服务集群,一个单局只会存在于一个 Node 上,比如 Game 3 在 Node 2 上。Game 3 作为 Session,将 Game ID 作为 Sessioin ID,并作为消息 Key 推送到 Kafka 中。因此可以保证,同一 Game 的所有消息会 按序 进入同一 Partition 。裁决服务集群作为 Consumer Group,从 Kafka 拉数据。每个 Partition 只会由一个固定的 Consumer 消费,在 Consumer Group 稳定的情况下,Partition 到 Consumer 的映射关系是稳定的,到 Entity 的链路也是稳定的。因此可以做到 Entity 3 稳定有序 的消费 Game 3 产生的数据。
这样似乎就够了,但实际生产环境很难保证 Consumer Group 维持不变,比如:
如图所示,Partition 3 本来由 Consumer 1 消费。Game 1 中的事件按先后顺序分别是 M1,M2,M3。当 M1 和 M2 被 C1 消费后,Consumer 3 加入成功,Partition 3 被分配给 C3。那么 Game 1 中的 M3 则转由 C3 处理。显然 G1 数据被截断分拆给不同的 Consumer 处理,结果是不正确的。
我们期望的效果是 Partition 无论被哪个 Consumer 消费,同一 Session 的数据总能转发给同一 Entity 处理。经过前面的介绍,sharding 是非常适合的,应用流程如下所示:
Sharding 技术,旨在实现逻辑处理单元和物理节点的解耦,将复杂的路由逻辑隐藏在集群内部实现,降低外部访问的复杂度。如果配合消息队列以及其它数据恢复技术,还可支持消息有序,节点状态迁移,比较适合做为分布式系统中细粒度有状态服务的路由解决方案。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。