接上文 《游戏服务的分布式事务优化(一)- Write Ahead Log(WAL) 模块》
在挺久以前我写过一篇分享 《在游戏服务器中使用分布式事务》 。当时尝试在我们的游戏服务中引入TCC模型的分布式事务机制,以优化好友和公会服务的一致性流程。 最开始的实现并不通用,后来我对这个事务的实现做了进一步的优化和重构,抽离成了较为通用的模块,并对之前没全部完成的功能做了进一步完善。 此篇为重构内容的第二部分,主要聚焦于事务管理。
首先,先简介一下这个分布式事务系统的基本原理。详细一点的可以看我之前一篇文章,《Google去中心化分布式系统论文三件套(Percolator、Spanner、F1)读后感》 。简单地流程如下:
更详细的信息可以直接去看论文Paper,相关性最高的两篇为:
在上述原理的基础上,我们把事务分成了三个角色: 协调者(coordinator) , 参与者(participator) 和 执行者(client) 。
我们把 协调者(coordinator) 设计成独立服务,参与者(participator) 和 执行者(client) 以SDK的形式提供,结构如下:
本来最初的想法是把这些组件也写进 atframe_utils ,还方便写单元测试。但是如果要同时支持协程和非协程异步通信,且要支持自定义IO事件管理的话,整个抽象起来太过于复杂了。而且本身分布式事务模块要接入的规范就比较多,使用起来有一定难度,抽象非常复杂的情况下使用起来就更困难了。所以最终我还是决定集成到我们的开发框架 ( atsf4g-co ) 中,复用我们的IO系统和协程RPC机制。具体位置在 src/component/distributed_transaction 目录。
容灾和一致性方面我们支持三种模型。
第一种模型也是最早实现的模型,协调者(coordinator) 服务作为弱状态服务。首先 协调者(coordinator) 路由使用一致性Hash,它使用数据库的CAS操作做状态管理。即本地有缓存时使用本地的缓存,否则从数据库拉取,缓存采用LRU机制淘汰。如果发生扩缩容或者分布变化,会不同节点发出的消息的路由短暂的不一致。这时候某些节点执行协调者数据保存时,CAS操作会由于冲突而失败。因为 协调者(coordinator) 要处理的消息比较简单,并且状态机是单向的,所以这时候我们可以直接从数据库拉取最新的数据Retry即可。
第二种模型比较简单,纯内存模式,即不写入数据库。这种模式适用于一些不太严格要求一致性,但要求性能更高一些的场景。
第三种模型则是实现了 Read-Your-Writes 一致性 要求。即N为副本数,W为感知到写的副本数,R为读操作需要访问的节点数。当 W+R>N
时,认为符合 Read-Your-Write 一致性要求。
Read-Your-Writes 一致性 是一种以 Client为中心 的一致性模型。 因为对于事务的三个角色( 执行者(client) , 参与者(participator) , 和 协调者(coordinator) )所涉及要修改的数据各不相同。 在无故障时,采用这种模型的开销相对较低。
在这种场景下,不要求主从节点之分。协调者(coordinator) 的逻辑较轻,不需要WAL模块。但是 执行者(client) 和 参与者(participator) 的逻辑比较重。
W+R>N
。W+R>N
才能回包。以下涉及的接口如果被标识为 (同步) , 则表示在事务处理这个模块中是同步的,不应该依赖异步流程来完成该操作。 比如在需要发送消息时,应该由网络层组件来进行排队或等发送完消息后销毁相关资源,事务管理模块不应该关心具体细节。
对于所有的网络消息的传输不要求可靠,如果网络失败,可能会收不到回包,此时认为该节点失败即可。 当
W+R>N
时即可执行事务的下一步流程,所以部分节点失败并不影响事务可靠性。 对于不要求故障时高可用的系统(故障转移期间允许正在进行的事务失败),可以设置 保存完离线数据后才执行 并设置W=1,R=1,N=1
。 这时候数据一致性仍然能够得到保证,但是事务恢复和一致性保证依赖保存离线数据的一致性(通常时保存到KV数据库)。
对于一些涉及等待回包操作但标识为 (同步) 的接口,需要外部的机制来发起任务管理并最终调用 receive_xxx_response(...)
接口 。
比如在 atsf4g-co 中可以使用
rpc::async_invoke(...)
来发起协程任务来处理IO。
协调者主要目的是提供一个事务协调方,保证整个事务是否执行成功都是咦这个协调者内的状态为基准。其中管理逻辑是弱状态的,即使用CAS来保存数据,如果失败直接重新从数据库读取然后Retry。对于单个事务而言,流程图大致如下:
那么,我们需要实现以下协议:
service DtcoordsvrService {
option (atframework.service_options) = {
module_name: "transaction"
};
rpc query(SSDistributeTransactionQueryReq) returns (SSDistributeTransactionQueryRsp) {
option (atframework.rpc_options) = {
api_name: "拉取事务数据"
allow_no_wait: true
warning_log_response_code: "PROJECT_NAMESPACE_ID::err::EN_SYS_NOTFOUND"
};
};
rpc create(SSDistributeTransactionCreateReq) returns (SSDistributeTransactionCreateRsp) {
option (atframework.rpc_options) = {
api_name: "创建分布式事务"
allow_no_wait: true
};
};
rpc commit(SSDistributeTransactionCommitReq) returns (SSDistributeTransactionCommitRsp) {
option (atframework.rpc_options) = {
api_name: "分布式事务-提交"
allow_no_wait: true
warning_log_response_code: "PROJECT_NAMESPACE_ID::err::EN_DB_OLD_VERSION"
};
};
rpc reject(SSDistributeTransactionRejectReq) returns (SSDistributeTransactionRejectRsp) {
option (atframework.rpc_options) = {
api_name: "分布式事务-拒绝"
allow_no_wait: true
warning_log_response_code: "PROJECT_NAMESPACE_ID::err::EN_DB_OLD_VERSION"
};
};
rpc commit_participator(SSDistributeTransactionCommitParticipatorReq)
returns (SSDistributeTransactionCommitParticipatorRsp) {
option (atframework.rpc_options) = {
api_name: "分布式事务-单个参与者提交"
allow_no_wait: true
warning_log_response_code: "PROJECT_NAMESPACE_ID::err::EN_SYS_NOTFOUND"
warning_log_response_code: "PROJECT_NAMESPACE_ID::err::EN_DB_OLD_VERSION"
};
};
rpc reject_participator(SSDistributeTransactionRejectParticipatorReq)
returns (SSDistributeTransactionRejectParticipatorRsp) {
option (atframework.rpc_options) = {
api_name: "分布式事务-单个参与者拒绝"
allow_no_wait: true
warning_log_response_code: "PROJECT_NAMESPACE_ID::err::EN_SYS_NOTFOUND"
warning_log_response_code: "PROJECT_NAMESPACE_ID::err::EN_DB_OLD_VERSION"
};
};
rpc remove(SSDistributeTransactionRemoveReq) returns (SSDistributeTransactionRemoveRsp) {
option (atframework.rpc_options) = {
api_name: "强制删除分布式事务"
allow_no_wait: true
warning_log_response_code: "PROJECT_NAMESPACE_ID::err::EN_DB_OLD_VERSION"
info_log_response_code: "PROJECT_NAMESPACE_ID::err::EN_DB_RECORD_NOT_FOUND"
};
};
}
对于管理器而言,还需要实现以下功能:
tick()
以上功能都已经提供实现了,所以使用者使用的时候只需要部署服务就好了。不需要做额外操作。
参与者(participator) 是事务Log的业务逻辑真正的执行的地方。其数据分为两部分,第一部分是和协调者类似的事务管理层。第二部分是和具体执行的功能相关的逻辑模块,这部分可以使用 WAL 模块。 我们仅仅能提供SDK封装掉一些复杂的流程,让使用者接口尽可能简化。它做的事情大致如下:
vtable
)load/dump
: 用于读取和转储离线数据。tick()
接口,用于处理超时事务。超时不能直接删除,要重新向协调者请求数据更新状态。check_lock()/lock()/unlock()
接口,以便使用 Wound-Wait 的冲突处理策略。(后面会提到,详见 资源锁)lock()/unlock()
接口,另一方面跟随事务本身。参与者(participator) SDK内部的实现流程较繁琐,单对用户侧暴露的接口并不多。同时对用户,我们有几点要求。
tick()
接口vtable
内的必须接口由于我们只提供了SDK,对于RPC操作, 参与者(participator) 服务还必须接入以下服务:
// ====== 事件准备阶段 ======
message SSParticipatorTransactionPrepareReq {
transaction_participator_storage storage = 1;
}
// ====== 事件准备阶段 ======
message SSParticipatorTransactionPrepareRsp {
transaction_participator_failure_reason reason = 1;
}
// ====== 事件提交(成功完成)请求 ======
message SSParticipatorTransactionCommitReq {
bytes transaction_uuid = 1; // 事务UUID
}
// ====== 事件提交(成功完成)回包 ======
message SSParticipatorTransactionCommitRsp {}
// ====== 事件拒绝(失败)请求 ======
message SSParticipatorTransactionRejectReq {
bytes transaction_uuid = 1; // 事务UUID
// 对于 force_commit=true 的事务,拒绝时,需要提供完整信息以便执行undo操作
transaction_participator_storage storage = 2;
}
// ====== 事件拒绝(失败)回包 ======
message SSParticipatorTransactionRejectRsp {}
// ------------ Here is the sample service for participator ------------
service ParticipatorService {
rpc transaction_prepare(SSParticipatorTransactionPrepareReq) returns (SSParticipatorTransactionPrepareRsp) {
option (atframework.rpc_options) = {
api_name: "事务准备阶段(Try)"
router_rpc: true
user_rpc: true
allow_no_wait: true
};
};
rpc transaction_commit(SSParticipatorTransactionCommitReq) returns (SSParticipatorTransactionCommitRsp) {
option (atframework.rpc_options) = {
api_name: "事务提交(Commit)"
router_rpc: true
user_rpc: true
allow_no_wait: true
};
};
rpc transaction_reject(SSParticipatorTransactionRejectReq) returns (SSParticipatorTransactionRejectRsp) {
option (atframework.rpc_options) = {
api_name: "事务拒绝(Commit)"
router_rpc: true
user_rpc: true
allow_no_wait: true
};
};
}
然后收到相应事件消息的时候去调用 transaction_participator_handle
里的 prepare(...)/commit(...)/reject(...)/
接口就可以了。
另外,我们提供了一种额外的模式,在 transaction_participator_storage
的配置选项里有个 force_commit
字段。这表示强制提交,收到Prepare消息后强制提交,直接执行Event Log。这时候如果后面收到 Reject 请求,则直接走 Undo 。用这种方式我们顺带还实现了 2PC(两阶段提交) 事务。
有一些系统中 执行者 是合并进了参与者或者协调者的,我们这里按模块分开了这个功能,如果业务想要合并某些角色,直接在某类服务上同时创建这两个角色的对象即可。
执行者 接入的要求有
vtable
)可以看到,执行者 的接入还是相当简单的,接口也很少。如果用户不定义 通知参与者否决 和 通知参与者提交 的话,
我们采用 标准UUID 的BASE64来作为事务的Key。这样长度在24以内,对于C++11 std::string
还能触发小字符串优化。
执行者 的执行流程大致如下:
前面提到,对于某些互斥的资源我们提供了加锁策略。那么既然涉及加锁,必然也会涉及死锁问题。我们采用了两种方式来解决锁冲突问题。
首先是对使用约定上的,没有程序化的约束。一个事务对所有的要占用的资源按某种固定的规则排序,我们加锁一定是有序的。 其次我们采用 Wound-Wait 策略来解除锁冲突(和死锁)。
在 参与者(participator) 接口和 执行者(client) 协议中,我们提供 lock() 接口把资源锁在事务上,参与者(participator) 在事务结束时,自动释放锁。 按照 Wound-Wait 策略的标准实现,在事务 T_n 和 T_k 中,如果 T_k 先到(先获得锁),采用以下策略:
首先,我们对所有的参与者排序,锁资源约定按照参与者的顺序,以规避 ABBA死锁 问题。 然后,对于AB,BC资源锁,我们在 prepare 阶段加锁。对于上述的第一种场景,可以由 prepare 返回占用来告知 T_k 被终止。在重试时重新 prepare 所有 参与者(participator) 。 而对于第二种场景,最简单地方式也是在某个随机延迟后重试。也可以更复杂地方案,由 参与者(participator) 记录下一个占用锁的 执行者(client) ,然后通知解锁。 执行者(client) 收到所有等待资源的解锁通知后再来重试 prepare 所有 参与者(participator) 。第一阶段我会先实现简单版本,即随机延迟后重试。 最后,我们使用对事务的Key比较来代替时间( Timepoint ),因为我们需要的是顺序而不太在意实际时间。因为我们的库面向线上逻辑业务不面向OLTP或OLAP,不会有长时间的大规模的锁冲突。
我们公司内部的开源版本是有一个公会的例子和好友服务的例子,但是目前这些服务还涉及一些公司内部的组件,所以不便对外开源。 我看看啥时候有空的话可以尝试重写一个最简单功能的版本示例,规划是:
进程名 | WAL 模块 | 订阅机制 | 事务管理 |
---|---|---|---|
gamesvr | ✔️ | ❌ | client |
friendsvr | ✔️ | ❌ | participator |
进程名 | WAL 模块 | 订阅机制 | 事务管理 |
---|---|---|---|
gamesvr | ❌ | Subscriber | ❌ |
guildsvr | ✔️ | Publisher | participator,client |
进程名 | WAL 模块 | 订阅机制 | 事务管理 |
---|---|---|---|
gamesvr | ✔️ | Subscriber | ❌ |
chatsvr | ✔️ | Publisher | ❌ |
这个整套实现开源在了 atsf4g-co 的 src/component/distributed_transaction 目录中。
欢迎有兴趣的同学互相交流。