前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Seata分布式事务之TM、RM、TC源码分析

Seata分布式事务之TM、RM、TC源码分析

作者头像
spilledyear
发布2022-05-13 17:32:54
2K0
发布2022-05-13 17:32:54
举报
文章被收录于专栏:小白鼠小白鼠

引言 本篇文章着重点在于调用流程分析,根据业务的发起到结束对Seata的TM、RM、TC模块进行源码调用过程分析,选用Seata版本为0.7.1版本,本篇文章分析均为Seata的AT事务,TM、RM模块分析的比较单一,只分析了逻辑调用,在分析TC模块时候才具体的结合TM、RM模块进行逻辑调用的全过程交互分析

时序图

笔者通过绘制时序图,我们可以清晰的知道在集成Seata、ShardingSphere、Dubbo之后,我们插入一条数据的整个内部调用链逻辑

TM模块分析

比较重要的类TransactionalTemplate、DefaultGlobalTransaction、DefaultTransactionManager

查看TransactionalTemplate源码可知

在全局事务拦截器GlobalTransactionalInterceptor中调用excute后,调用过程如上图所示,虽然注释信息已经解释的很详细了,首先获取全局事务GlobalTransaction,默认实现类为DefaultGlobalTransaction,然后开启全局事务beginTransaction

TransactionManager是一个自定义SPI接口

默认实现类为DefaultTransactionManager.继续跟踪begin过程

通过将消息发给TC,最终调用到AbstractRpcRemoting类下面的sendAsyncRequest消息发送接口,该方法中重要的地方,消息未发送就唤醒mergeLock同步的对象

然后就能找到AbstractRpcRemotingClient类下面的MergedSendRunnable消息发送线程,因为该线程是在初始化TMClient(TmRpcClient)和RMClient(RmRpcClient)的时候一并进行初始化

知道了消息发送,我们看看消息的返回结果是怎么得到的

消息的获取运用了Future模式,消息返回与赋值在哪?因为我们消息发送和接收肯定都是异步发送和接收,如果对Netty比较熟悉的同学可能会直接知道消息切入点,消息接收位于AbstractRpcRemotingClient类channelRead接口,消息的返回和赋值都是在这里进行,不熟悉Netty的同学也不必纠结,可以MessageFuture类查看哪里进行setResultMessage的

根据消息ID从futures获取MessageFuture并设置消息返回,因为是Future模式,所以就拿到了消息的返回结果

我们已开启全局事务为例分析了消息的收发,在TC/Seata-server模块中收发消息基本都一致,所以不在进一步分析TC模块中的消息收发。

我们着重分析TM模块中的以下部分

我们通过时序图可知,当我们执行业务时候,如果业务出现异常,那么该异常会被捕获,然后通知TC进行全局回滚操作,如果没有异常,那么就进行全局二阶段提交操作。但是可能会有同学比较好奇上图中的rs = business.execute();这个是个回调接口,回调实现类

这个简单理解就是Spring AOP,会继续调用增强链中的下一个(调用下一个拦截器),最后调用到我们的目标执行方法,如下图所示

比如我们使用了Dubbo,那么当我们执行orderService.insertOrder(orderEntity);在这个方法体中,如果有异常,那么completeTransactionAfterThrowing会执行,这里简单说明下,无论是当前模块的异常还是调用到下游出现异常,如使用Dubbo调用,我们都可以理解成这些都是属于同步调用,上游模块获取到的异常信息可能会是rpc异常,也可能是反射异常,也可能是下游主动抛出的异常,所以上游模块只能获取当前模块的完整异常信息,获取到下游的异常不是完全正确的。TM模块分析完毕

RM模块分析

比较重要的类DataSourceProxy、ConnectionProxy、PreparedStatementProxy、ExecuteTemplate,大致调用关系为DataSourceProxy获取连接ConnectionProxy,ConnectionProxy获取预编译PreparedStatementProxy,PreparedStatementProxy获取SQL执行器ExecuteTemplate。我们已Insert举例,查看ExecuteTemplate源码

通过解析SQL得出不同的执行器,这里我们会执行InsertExecutor,唯一可能需要注意的就是SelectForUpdateExecutor这个执行器,简单说下这个执行器的业务场景,因为RM是当前模块若没有异常就会提交一阶段数据入库,但是,往往我们当前模块可能会有某些业务接口,这些业务接口需要的是二阶段的最终数据,所以这里我们就可以使用Seata的@GlobalLock全局锁,这个会一直轮训直到获取到二阶段最终数据.有兴趣的同学可以仔细研究SelectForUpdateExecutor执行器.返回来,我们还是继续分析InsertExecutor,查看rs = executor.execute(args);源码,最终会调用到以下接口

判断是否全局事务,绑定XID,以及判断是否有全局锁注解,全局锁注解使用上文已介绍

判断是否自动提交事务,一般都是自动,若是自动提交事务会进行设置为false,会调用到以下代码

这个地方beforeImage主要是通过解析用户执行SQL,然后记录执行SQL前的快照,然后执行statementCallback.execute继而调用到以下方法执行SQL语句

afterImage同理类推,记录SQL执行后的快照,prepareUndoLog准备好undolog数据,也就是回滚表undo_log中即将写入的数据

connectionProxy.commit();调用到如下方法

首先判断是否是全局事务,若不是全局事务,也不是全局锁,那么直接提交数据,直接分析全局事务方法

首先注册当前分支事务,然后判断当前上下文是否有undolog数据,数据就是上文介绍的,在执行完SQL之后会执行prepareUndoLog,内部就会拼装undo_log回滚表数据

最后执行commit提交数据,然后上报分支事务状态,至此整个RM分支本地事务一阶段已完成,写到这里,可能还是会有同学有疑问,比如register注册分支事务是如何和TC模块交互的,TC模块的调用流程又是什么!这些我们在后文讲TC模块一起分析下流程

TC模块分析

比较重要的类AbstractTCInboundHandler、DefaultCoordinator、DefaultCore

我们看看Server类都做了什么事情.因为TC模块基于Netty框架,所以在研究TC模块的时候,若对Netty框架完全不了解的同学,可以先阅读下Netty框架相关文档,比如定义消息接收类,每个回调代表什么意思,发送消息给客户端如何进行。有了一定了解之后在阅读TC模块会更清晰。

言归正传,通过Server类,我们看到通过创建和启动RpcServer来进行和TM和RM交互,继续分析,设置RpcServer的handler-> DefaultCoordinator协调器.DefaultCoordinator继承AbstractTCInboundHandler。然后RpcServer继承AbstractRpcRemotingServer,然后设置Netty框架所需eventLoopGroupWorker、eventLoopGroupBoss,最后启动

所有消息入口为AbstractRpcRemoting类中channelRead方法,如果有同学说为什么是这个,Netty框架规则就是这样!

下面我们通过RM模块注册一阶段本地事务举例,详细分析整个RM和TC交互过程

通过查看源码最终调用到以下方法

这个resourceManagers在哪里赋值的?跟踪发现是静态初始化实例通过自定义SPI扩展接口通过BranchType进行赋值

我们上面获取的是AT类型ResourceManager,所以实现类为DataSourceManager,最终调用到AbstractResourceManager类下branchRegister方法

消息发送sendMsgWithResponse在TM模块已经介绍过,忘了的同学可以阅读上文中的分析。

我们着重讲解消息发送到TC之后,TC模块中的业务处理。

我们简要查看下查看下TC模块的UML类图

我们通过类图发现,我们比较关心的就RpcServer、AbstractRpcRemotingServer、AbstractRpcRemoting这3个类,那我们查看下我们分支事务注册是如何进行的

RM模块发送请求到TC最终执行到以下方法

然后TC模块接收消息入口为RpcServer类中以下方法

因为我们是注册RM分支事务事件,所以直接会执行super父类进行消息解析和分发,查看父类如何处理的。

因为我们的RM分支事务注册属于请求事件,所以最终会执行以下方法

messageExecutor为Server启动类中定义的线程池执行器

最终调用到RpcServer中dispatch方法进行消息的分发处理

查看RpcServer初始化接口可知

消息被派发到DefaultServerMessageListenerImpl类中进行处理,继续查看

我们的消息属于MergedWarpMessage类型,继续调用results[i] = transactionMessageHandler.onRequest(subMessage, rpcContext);这个transactionMessageHandler就是DefaultCoordinator(核心类之一,事务协调器)。继续分析

这个地方简单解释下,我们的请求BranchRegisterRequest继承AbstractTransactionRequestToTC,然后就会调用到以下方法体

这个handler就是DefaultCoordinator本身实例,因为DefaultCoordinator继承AbstractTCInboundHandler,所以会执行到AbstractTCInboundHandler类中的以下方法

继而调用到DefaultCoordinator类

最终调用到DefaultCore类方法

这个地方很关键,需要重点分析! 大致调用如下:

1:通过Xid获取全局事务GlobalSession,GlobalSession全局事务加锁调用逻辑

2:判断GlobalSession状态,判断状态是否可用或者有没有发起全局事务(Begin状态)

3:创建BranchSession分支事务,其中lockKeys简单理解就是操作的哪些行数据字段

4:BranchSession分支事务加锁,内部加锁大致为根据pk主键值+操作的行数据进行加锁(行锁),加锁逻辑比较复杂,可以在seata.io官网进行查看原理图,如果分支事务获取锁失败了,那么代表有其他分支事务正在处理业务逻辑,抛出锁冲突异常信息,该异常会被ConnectionProxy的以下代码捕获

然后抛出LockConflictException异常

最终AbstractDMLBaseExecutor.executeAutoCommitTrue捕获这个LockConflictException锁冲突异常.异常处理为:1->回滚数据。2->重试获取分支事务锁,默认重试30次,每次sleep间隔时间10ms,直到成功执行commit操作或者抛出锁超时,若抛出锁超时则会一直上抛SQLException到TM发起全局事务的模块,然后由TM模块发起全局回滚消息到TC,由TC下发分支事务回滚消息

5:保存全局事务GlobalSession对应的分支事务BranchSession数据到store,根据SPI接口判断采用file本地文件保存还是按照db数据库进行保存,以便于TC服务端自检操作

6:返回BranchSession分支事务id

重点:上文对分支事务BranchSession加锁了,还没有释放锁,下文将分析BranchSession分支事务锁的释放

我们在上文中第6步获取到分支事务id后,我们继续分析后续操作

1:注册分支事务register()方法,获取到分支事务id,设置当前ConnectionContext上下文的分支事务id

2:判断当前ConnectionContext上下文是否有保存的undo_log数据,若有数据则将数据写入undo_log表

封装BranchUndoLog分支事务数据

将数据插入undo_log表,状态为Normal状态,该状态有Normal、GlobalFinished 两种状态,后文分支事务回滚时候着重讲解这两种状态

3:提交当前分支事务(本地事务)

4:上报分支事务(本地事务)一阶段完成状态PhaseOne_Done

5:清空上下文数据

假设各个分支事务均未出现异常,各个分支事务均已完成一阶段并且上报PhaseOne_Done状态,那么发起全局事务TM模块将执行以下逻辑(TransactionalTemplate.commitTransaction)

然后调用到DefaultGlobalTransaction.commit,最终调用到DefaultTransactionManager.commit

和begin发起全局事务调用过程基本一致,不清楚流程的同学可以查看上文发起begin的调用过程。调用顺序:

1:DefaultTransactionManager.commit->发送commit消息

2:AbstractRpcRemoting.sendRequest->封装/发送请求数据

3:RpcServer.channelRead->TC/seata-server获取请求数据

4:AbstractRpcRemoting.channelRead->TC/seata-server获取请求数据

5:RpcServer.dispatch->消息分发处理

6:DefaultServerMessageListenerImpl.onTrxMessage->消息监听实现类,如:对RpcMessage消息相关处理

7:DefaultCoordinator.onRequest设置消息Handler处理类

8:AbstractTCInboundHandler.handle消息映射处理,根据消息请求类型进行映射处理,commit对应的请求类型为GlobalCommitRequest

9:DefaultCoordinator.doGlobalCommit

10:DefaultCore.commit分析下

10-1:根据xid获取全局事务,注意这里是重点,分支事务锁释放第一现场

此步骤会执行globalSession.setActive(false);设置当前全局事务为不可用状态,这样就不会有分支事务继续注册进globalSession。释放当前分支事务锁,还记得上文讲述过在分支事务提交commit时候需要先向TC注册当前分支事务吗?这个注册过程就涉及到分支事务锁,比如某个全局事务中的分支事务和另外一个全局事务中的分支事务,都在操作某一行数据,那么就要等到这个分支事务锁释放后,其他分支事务才能进行Commit操作,然后去获取分支事务锁.然后判断当前xid对应的全局事务是否是发起状态(begin)

10-2:判断当前全局事务是否可以异步执行,判断是否异步就是判断是否是TCC或者是AT事务类型,是AT事务类型则可以异步执行

将全局事务放入Map集合中

10-3:在启动TC/seata-server时初始化了以下方法

然后开启各种类型的定时任务

异步提交全局事务就将在如下方法中执行

10-4:遍历全局事务sessionMap集合,循坏调用DefaultCore.doGlobalCommit,该过程相对复杂,那就继续分析分析

10-5:eventBus主要数据监控统计作用

10-6:遍历分支事务,执行resourceManagerInbound.branchCommit

10-7:调用DefaultCoordinator.branchCommit

10-8:RpcServer.sendSyncRequest发送消息给RM分支事务模块

10-9:AbstractRpcRemotingClient.channelRead接收消息

10-10:AbstractRpcRemotingClient.dispatch消息分发处理

10-11:RmMessageListener.onMessage该消息监听,该监听器在RMClient中进行初始化设置

10-12:RmMessageListener.handleBranchCommit执行以下方法

10-13:这个handler.onRequest,handler在初始化RMClient中进行设置,所以该handler对象即DefaultRMHandler,随后调用过程为DefaultRMHandler父类AbstractRMHandler.onRequest->DefaultRMHandler.handle->RMHandlerAT父类AbstractRMHandler.handle->AbstractRMHandler.doBranchCommit最终调用如下方法

10-14:getResourceManager()为模版方法,实现类为RMHandlerAT类中,最终获取到的ResourceManager实现类为DataSourceManager

10-15:DataSourceManager.branchCommit将分支事务加入asyncWorker定时任务中

asyncWorker在SPI接口加载DataSourceManager时初始化

10-16:最终调用asyncWorker.branchCommit

Offer操作加入队列,然后直接返回分支状态,即PhaseTwo_Committed状态

简要分析下分支事务二阶段提交做了哪些事情吧,便于更好理解流程

10-16-1:判断队列是否有任务,若有任务,则封装mappedContexts集合数据

10-16-2:遍历mappedContexts集合,获取DataSourceProxy数据源代理对象,然后获取Connection数据库连接对象

10-16-3: 最终执行UndoLogManager.batchDeleteUndoLog删除undo_log表中xid和branch_id字段同时出现在xids集合和branchIds集合中的数据行,最终分支事务完成二阶段提交

10-17:在10-16步骤中,分支事务返回PhaseTwo_Committed状态后,在10-12步骤中

RmMessageListener.handleBranchCommit方法中通过sender.sendResponse(request, serverAddress, resultMessage)将handler.onRequest结果发给TC/seata-server

10-18:TC/seata-server接收数据流程简要概括,RpcServer.channelRead->消息赋值

10-19:最终我们10-6步骤就得到了BranchStatus分支事务状态,即PhaseTwo_Committed状态

11:TC/seata-server获取到PhaseTwo_Committed状态之后或者其他状态会按照不同策略进行处理,我们简要分析PhaseTwo_Committed状态

分析globalSession.removeBranch(branchSession)做了哪些事情

12:往sessionStore记录当前分支状态,如file本地文件或者db数据库,释放当前分支事务锁,最后分支事务集合中删除当前分支事务,其实这个地方,笔者也有一些疑虑,就是上文中commit第一现场时候已经释放了分支事务锁,为何这里还要进行释放?笔者根据Rollback回滚初步判断,可能是为了处理Rollback回滚,因为回滚过程第一现场只是将globalSession设置为不可用状态,所以需要在removeBranch中进行锁释放

至此,整个Commit过程分析完毕,涵盖分支事务一阶段,分支事务二阶段和TC服务端的一系列数据交互过程

上文分析了Commit过程,我们接着分析全局回滚Rollback过程,触发全局回滚Rollback大致分为两类:1:全局事务发起端内部异常被捕获。2:发起端调用下游业务端,下游业务端主动上抛各种异常信息被发起端捕获.

最终在发起端TransactionalTemplate.completeTransactionAfterThrowing进行异常捕获

最终发起全局回滚请求

和上文讲述的TransactionalTemplate.commitTransaction全局提交流程基本完全一致

大致调用过程为:

1:DefaultGlobalTransaction.rollback,最终调用到DefaultTransactionManager.rollback

2:AbstractRpcRemoting.sendRequest->封装/发送请求数据

3:RpcServer.channelRead->TC/seata-server获取请求数据

4:AbstractRpcRemoting.channelRead->TC/seata-server获取请求数据

5:RpcServer.dispatch->消息分发处理

6:DefaultServerMessageListenerImpl.onTrxMessage->消息监听实现类,如:对RpcMessage消息相关处理

7:DefaultCoordinator.onRequest设置消息Handler处理类

8:AbstractTCInboundHandler.handle消息映射处理,根据消息请求类型进行映射处理,rollback对应的请求类型为GlobalRollbackRequest

9:DefaultCoordinator.doGlobalRollback

10:DefaultCore.rollback分析下

11:根据xid获取全局事务,然后设置当前globalSession不可用,然后判断当前xid对应的全局事务是否是发起状态(begin)

12:DefaultCore.doGlobalRollback

13:遍历分支事务,执行resourceManagerInbound.branchRollback

14:调用DefaultCoordinator.branchRollback

15:RpcServer.sendSyncRequest发送消息给RM分支事务模块

16:AbstractRpcRemotingClient.channelRead接收消息

17:AbstractRpcRemotingClient.dispatch消息分发处理

18:RmMessageListener.onMessage该消息监听,该监听器在RMClient中进行初始化设置

19:RmMessageListener.handleBranchRollback执行以下方法

20:这个handler.onRequest,handler在初始化RMClient中进行设置,所以该handler对象即DefaultRMHandler,随后调用过程为DefaultRMHandler父类

AbstractRMHandler.onRequest->DefaultRMHandler.handle->RMHandlerAT父类AbstractRMHandler.handle->AbstractRMHandler.doBranchRollback最终调用如下方法

21:getResourceManager()为模版方法,实现类为RMHandlerAT类中,最终获取到的ResourceManager实现类为DataSourceManager

22:DataSourceManager.branchRollback

23:UndoLogManager.undo回滚步骤着重分析

24:查询undo_log表是否存在branchId、xid对应的数据

25:canUndo判断当前数据状态是否是Normal正常状态,如不是Normal状态则跳出while循坏,Normal状态是分支事务插入,是正常执行流程、GlobalFinished状态为异常状态,是全局事务发起的防御性插入,比如全局回滚时,分支事务还没执行,此时就需要插入防御性数据,用主键冲突来防止异常分支事务的插入,起了一个占位作用

26:解析undo_log数据行,获取TableMeta表数据,主要封装数据为表的所有列名字段信息allColumns,表的所有索引信息allIndexes

27:根据执行器进行执行,如:Insert操作会使用afterImage数据进行删除当前数据库中的数据进行数据回滚操作

28:如果undo_log表中存在branchId、xid对应的数据则删除分支事务undo_log表中branchId、xid对应的数据,如果undo_log中不存在branchId、xid对应的数据则防御性插入一条branchId、xid数据

注释信息已经解释的很清晰insertUndoLogWithGlobalFinished的作用,主要起防御性的作用,因为分支事务和全局事务都是异步RPC调用,所以为了防止一些异常情况进行的占位操作

至此对Seata AT模式整个TM,RM,TC调用流程分析完毕,若有不正确的地方欢迎指出!

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-13,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档