Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >seata RM源码分析

seata RM源码分析

作者头像
luoxn28
发布于 2021-02-26 07:13:01
发布于 2021-02-26 07:13:01
67200
代码可运行
举报
文章被收录于专栏:TopCoderTopCoder
运行总次数:0
代码可运行

Seata 是一款阿里开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案,github地址:https://github.com/seata/seata。

RM模块是seata中全局事务参与者,其核心逻辑有:

  • 启动netty客户端:会启动RM客户端与TC通信
  • 数据源切面代理:SQL解析、分支事务注册/提交、undolog保存、分支事务状态上报
  • Rpc代理:在RPC流程中传递seata上下文(xid等,非本文分析重点)

同TM类似,RM侧也是有一个GlobalTransactionScanner类,来进行初始化的动作,GlobalTransactionScanner实现了InitializingBean,其afterPropertiesSet方法中会执行netty客户端初始化工作,逻辑如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void initClient() {
    //初始化TM
    TMClient.init(applicationId, txServiceGroup);
    ...
    //初始化RM
    RMClient.init(applicationId, txServiceGroup);
    ... 
    // 注册Spring shutdown的回调,用来释放资源
    registerSpringShutdownHook();
 }

RM对应的客户端为RMClient,其初始化代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static void init(String applicationId, String transactionServiceGroup) {
    // 设置资源管理和事务消息处理器
    RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
    rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());
    rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
    rmNettyRemotingClient.init();
}

RMClient初始化首先注册各种类型的处理器(全局事务相关、分支事务相关、心跳相关),然后启动netty客户端:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
private void registerProcessor() {
    // 1.registry rm client handle branch commit processor
    RmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);
    // 2.registry rm client handle branch commit processor
    RmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);
    // 3.registry rm handler undo log processor 其目前已经不再起任何作用了,undolog的删除作为AT模式下的commit处理器是会自动处理
    RmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);
    // 4.registry TC response processor
    ClientOnResponseProcessor onResponseProcessor =
        new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);
    super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);
    // 5.registry heartbeat message processor
    ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);
}

public void init() {
    // 定时重连任务
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            clientChannelManager.reconnect(getTransactionServiceGroup());
        }
    }, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
    // 请求future定时器
    super.init();
    // 标准的netty client初始化
    clientBootstrap.start();
}

启动netty完成之后,RM侧的业务SQL执行时,就会进行对应的分支事务的执行操作了,这块内容和TM原理解析中的SQL执行流程是一样的,这里就不在赘述。下面就重点关注下RM侧特有的一些消息处理器类,也就是在方法io.seata.core.rpc.netty.RmNettyRemotingClient#registerProcessor中注册的各种处理器。

分支事务提交回滚

分支事务的提交和回滚分别对应处理器类RmBranchCommitProcessorRmBranchRollbackProcessor,二者对应的处理逻辑如下:

事务提交在AT模式下最后会返回BranchStatus.PhaseTwo_Committed,分支事务提交首先获取xid和resouceId,然后执行ResourceManager.branchCommit进行提交操作。

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
    throws TransactionException {
    String xid = request.getXid();
    long branchId = request.getBranchId();
    String resourceId = request.getResourceId();
    String applicationData = request.getApplicationData();

    BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
        applicationData);
    response.setXid(xid);
    response.setBranchId(branchId);
    response.setBranchStatus(status);
}

// 异步任务doBranchCommitSafely逻辑
scheduledExecutor.scheduleAtFixedRate(this::doBranchCommitSafely, 10, 1000, TimeUnit.MILLISECONDS);

注意,AT模式下的提交目前只是简单的往asyncWorker中提交一个异步任务,后续会进行undolog的清除操作,注意:RM侧的事务提交已经在阶段一就已经完成了,因此这里主要就是进行undolog的清除操作了。

相对于事务提交操作,事务回滚要做的事情会多一些,比如找到对应的undolog然后进行数据的恢复(这里会开启一个新的事务来完成恢复操作)。事务回滚核心逻辑是调用ResourceManager.branchRollback来完成的,代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    try {
        // 找到undolog,然后进行undo操作
     UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
    } catch (TransactionException te) {
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;
}

对于MySQL来说,进行undo的类为MySQLUndoLogManager,其undo方法主要逻辑如下:

  • 获取DB连接,根据xid和branchId获取对应的undolog
  • 首先判断undolog状态,然后将undolog反解析为sqlUndoLogs
  • 逆序排列,然后遍历执行每个undolog进行undo操作
  • 执行完毕后删除undolog,然后进行commit事务,最后返回

RM侧处理响应结果的逻辑大都较为简洁明了,这里就不再赘述了。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-01-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TopCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
详解分布式事务之 Seata-Client 原理及流程
在分布式系统中,分布式事务是一个必须要解决的问题,目前使用较多的是最终一致性方案。自年初阿里开源了Fescar(四月初更名为Seata)后,该项目受到了极大的关注,目前已接近 8000 Star。Seata 以高性能和零侵入的特性为目标解决微服务领域的分布式事务难题,目前正处于快速迭代中,近期小目标是生产可用的 Mysql 版本。
程序猿DD
2019/05/10
2.2K0
详解分布式事务之 Seata-Client 原理及流程
Seata 源码篇之AT模式启动流程 - 下 - 04
上一篇文章,我们看了Seata AT模式一阶段提交流程,本文我们来看看AT模式的二阶段流程和全局事务提交回滚逻辑的实现。
大忽悠爱学习
2023/10/11
3070
Seata 源码篇之AT模式启动流程 - 下 - 04
分布式事务之Seata中间件原理及流程详细分析
原文链接:https://blog.csdn.net/f4761/article/details/89077400
天涯泪小武
2019/10/22
2.2K0
分布式事务之Seata中间件原理及流程详细分析
深度剖析一站式分布式事务方案Seata-Cient
在之前的文章中已经介绍过Seata的总体介绍,如何使用以及Seata-Server的原理分析,有兴趣的可以阅读下面的文章:
用户5397975
2019/10/14
8080
深度剖析一站式分布式事务方案Seata-Cient
seata TC 请求处理流程
TC的业务channelHandler为类 io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler,注意到达该类的请求都是经过编解码的了,请求类型为RpcMessage。ServerHandler类处理方法有:
luoxn28
2021/01/28
8960
seata TC 请求处理流程
Seata分布式事务之TM、RM、TC源码分析
笔者通过绘制时序图,我们可以清晰的知道在集成Seata、ShardingSphere、Dubbo之后,我们插入一条数据的整个内部调用链逻辑
spilledyear
2022/05/13
2.4K0
Seata分布式事务之TM、RM、TC源码分析
seata TM源码分析
下面就一起来看下TM模块的实现原理,TM模块是seata中全局事务发起者和掌控者,其核心逻辑有:业务逻辑切面代理:对全局事务注册/提交操作。启动netty客户端:会启动TM/RM客户端与TC通信。数据源切面代理:SQL解析、分支事务注册/提交、undolog保存、分支事务状态上报。Rpc代理:在RPC流程中传递seata上下文(xid等,非本文分析重点)。
luoxn28
2021/01/28
1.2K0
seata TM源码分析
分布式事务中间件 Fescar - RM 模块源码解读
在SOA、微服务架构流行的年代,许多复杂业务上需要支持多资源占用场景,而在分布式系统中因为某个资源不足而导致其它资源占用回滚的系统设计一直是个难点。我所在的团队也遇到了这个问题,为解决这个问题上,团队采用的是阿里开源的分布式中间件Fescar的解决方案,并详细了解了Fescar内部的工作原理,解决在使用Fescar中间件过程中的一些疑虑的地方,也为后续团队在继续使用该中间件奠定了理论基础。
张乘辉
2019/07/10
6260
分布式事务:Seata框架AT模式及TCC模式执行流程剖析
管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚,在分支事务执行的客户端。
朝雨忆轻尘
2020/11/24
2K0
分布式事务:Seata框架AT模式及TCC模式执行流程剖析
seata TC启动流程分析
seata-server 启动方法 io.seata.server.Server#main,默认启动端口 SERVER_DEFAULT_PORT = 8091。main方法主要是解析并设置一些配置,初始化几个线程池,启动DefaultCoordinator和Netty服务等,源码如下:
luoxn28
2021/01/28
8180
seata TC启动流程分析
深度剖析 Seata TCC 模式【图解 + 源码分析】
Seata 目前支持 AT 模式、XA 模式、TCC 模式和 SAGA 模式,之前文章更多谈及的是非侵入式的 AT 模式,今天带大家认识一下同样是二阶段提交的 TCC 模式。
张乘辉
2022/01/24
2.6K0
深度剖析 Seata TCC 模式【图解 + 源码分析】
fescar分布式事务实现原理解析探秘
fescar发布已有时日,分布式事务一直是业界备受关注的领域,fescar发布一个月左右便受到了近5000个star足以说明其热度。当然,在fescar出来之前,已经有比较成熟的分布式事务的解决方案开源了,比较典型的方案如LCN(https://github.com/codingapi/tx-lcn)的2pc型无侵入事务,目前lcn已发展到5.0,已支持和fescar事务模型类似的TCX型事务。还有如TCC型事务实现hmily(https://github.com/yu199195/hmily)、tcc-transaction(https://github.com/changmingxie/tcc-transaction)等。在微服务架构流行的当下、阿里这种开源大户背景下,fescar的发布无疑又掀起了研究分布式事务的热潮。fescar脱胎于阿里云商业分布式事务服务GTS,在线上环境提供这种公共服务其模式肯定经受了非常严苛的考验。其分布式事务模型TXC又仿于传统事务模型XA方案,主要区别在于资源管理器的定位一个在应用层一个在数据库层。博主觉得fescar的txc模型实现非常有研究的价值,所以今天我们来好好翻一翻fescar项目的代码。本文篇幅较长,浏览并理解本文大概耗时30~60分钟左右。
kl博主
2023/11/18
1860
Seata AT 模式启动源码分析
从上一篇文章「分布式事务中间件Seata的设计原理」讲了下 Seata AT 模式的一些设计原理,从中也知道了 AT 模式的三个角色(RM、TM、TC),接下来我会更新 Seata 源码分析系列文章。今天就来分析 Seata AT 模式在启动的时候都做了哪些操作。
张乘辉
2019/12/02
6220
Apache ShardingSphere sharding-jdbc 分布式事务小练习
[技术标准] https://pubs.opengroup.org/onlinepubs/009680699/toc.pdf
前Thoughtworks-杨焱
2021/12/07
7990
Seata-Saga模式 原理
状态机设计组件:seata-saga-statemachine-designer 状态机在线画图工具:saga_designer
全栈程序员站长
2022/11/04
6380
Seata-Saga模式 原理
springcloud+eureka整合seata-tcc模式
分布式事务中的tcc模式理论介绍的文章非常多,但是网上找到一个代码实现的demo很难,包括阿里的seata官方示例都没有TCC模式的具体实现。今天我们来看一下微服务环境下使用seata TCC模式解决分布式事务的场景,同时提供一个详细的实现。
jinjunzhu
2020/08/28
1.8K0
springcloud+eureka整合seata-tcc模式
深入了解分布式事务组件 Seata :AT 模式(二)
在前面一篇文章,我们介绍了阿里开源的分布式事务组件 Seata 的相关概念,重点介绍了 Seata 的 AT 模式。并通过一个 Spring-Cloud-JPA 的案例,演示了 AT 模式的使用入门。本文将会结合 Spring-Cloud-JPA 的案例,深入了解 Seata AT 模式的工作流程。本文基于 v0.8.1。
aoho求索
2019/12/15
1.5K0
[图文] Seata AT 模式分布式事务源码分析
AT 模式是 Seata 主推的分布式事务解决方案,最早来源于阿里中间件团队发布的 TXC服务,后来成功上云改名 GTS。相较于TCC而言,Seata的AT模式业务侵入性更低,易于接入。
田守枝
2019/07/11
2.5K0
[图文] Seata AT 模式分布式事务源码分析
seata TC 模块分析
Discovery模块就是服务发现模块,TC启动后需要将自己的信息注册到服务中心,这样才能暴露给其他使用者,Discovery接口定义如下:
luoxn28
2021/01/28
1.3K0
seata TC 模块分析
mac 上学习k8s系列(53)seata-go
https://github.com/seata/seata-go是https://github.com/seata/seata的golang客户端,目前支持at和tcc两种分布式事务的实现。虽然官方的文档给的例子描述了如何处理分布式事务,但是如何部署和使用语焉不详。下面介绍下两种部署方式file、db
golangLeetcode
2022/12/17
6730
相关推荐
详解分布式事务之 Seata-Client 原理及流程
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验