以下代码基于seata和seata-example
在配车的业务中,我们使用了Seata的分布式事务来保证配车的业务逻辑能够正常时,才会做订单信息推送到财务系统。我们的系统配车业务一开始使用Seata的TCC模式来实现的,需要自己实现try和confirm或者cancel的逻辑。之后,由于seata推出了AT模式,之后系统采用的分布式事务使用的是AT模式。
相比如之前的TCC模式,AT模式只需要添加 @GlobalTransactional就可以实现分布式事务。
@GlobalTransactional(rollbackFor = Exception.class)
@Override
public Integer confirmCarSubmit(MatchConfirmCarDTO matchConfirmCarDTO, LoginInfoDto loginInfoDto) {
return retailOrderService.matchConfirmCarSubmit(null, matchConfirmCarDTO, loginInfoDto);
}
配车的过程中,如果配了车,则需要修改库存中车辆库存状态为已锁定,配车状态为已配车,且释放原车。同时做财务信息推送。由于涉及到库存和财务信息,因此需要用到分布式事务。
那么为啥加了 @GlobalTransactional,它就可以实现分布式事务呢?
首先Seata分为两端,Seata Server和Seata Client。TC作为Seata的Server端,而RM和TM作为客户端。由于其是注解,因此,我们可以想象得到应该是基于全局事务注解。
下面我们下载seata的源码,基于seata的源码进行学习。
首先启动Seata Server:
可以看到Seata Server主要做了这样几件事:
1)初始化监控度量信息
2)初始化改良版雪花算法UUID
3) 初始化SessionHolder
4) 初始化协调器TC,并将其设置为handler
5) 初始化netty服务端
public static void main(String[] args) throws IOException {
//initialize the metrics
LOGGER.info("------初始化监控信息-----");
MetricsManager.get().init();
LOGGER.info("------初始化改良版雪花算法UUID-----");
UUIDGenerator.init(parameterParser.getServerNode());
LOGGER.info("------初始化SessionHolder-----");
SessionHolder.init(parameterParser.getStoreMode());
// 事务协调器,相当于netty的handler角色
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
LOGGER.info("------初始化协调器TC-----");
coordinator.init();
nettyRemotingServer.setHandler(coordinator);
LOGGER.info("------初始化netty服务端-----");
nettyRemotingServer.init();
}
其中最重点的是coordinator.init(); 事务协调器初始化。
其主要启动了5个定时任务:
handleRetryRollbacking();
handleRetryCommitting();
handleAsyncCommitting();
timeoutCheck();
undoLogDelete();
其中:
handleRetryRollbacking(); 处理重试回滚,每秒1次 handleRetryCommitting(); 处理重试提交,每秒1次 handleAsyncCommitting(); 处理异步提交,每秒1次 timeoutCheck(); 超时提交,每秒1次 undoLogDelete(); undo log删除,每3小时1次
客户端启动的时候,可以看到其会执行GlobalTransactionScanner继承了InitializingBean和AbstractAutoProxyCreator。因此
@Override
public void afterPropertiesSet() {
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
return;
}
if (initialized.compareAndSet(false, true)) {
LOGGER.info("初始化客户端");
initClient();
}
}
io.seata.spring.annotation.GlobalTransactionScanner#initClient方法,初始化客户端:
private void initClient() {
//init TM
LOGGER.info("-------TM客户端初始化-------");
TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
//init RM
LOGGER.info("-------RM客户端初始化-------");
RMClient.init(applicationId, txServiceGroup);
// 注册钩子方法
registerSpringShutdownHook();
}
主要做了三件事:
初始化TM,注册相关处理器,同时放入processorTable
初始化RM,注册相关处理器,同时放入processorTable
注册钩子方法
接着注册服务到seata中,然后Netty会执行channelRead执行事件处理:
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
Object body = rpcMessage.getBody();
if (body instanceof MessageTypeAware) {
// 根据messageType获取对应的处理器和线程池,processorTable 在 netty.init 时会初始化
MessageTypeAware messageTypeAware = (MessageTypeAware) body;
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
// 判断该Pair是否有初始化线程池,如果有就用业务线程池执行,否则直接执行
if (pair != null) {
if (pair.getSecond() != null) {
try {
//执行处理
pair.getSecond().execute(() -> {
try {
LOGGER.info("---执行处理processMessage-----");
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
});
} catch (RejectedExecutionException e) {
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
int idx = new Random().nextInt(100);
try {
Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
try {
pair.getFirst().process(ctx, rpcMessage);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
} else {
LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
}
} else {
LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
}
}
判断该Pair是否有初始化线程池,如果有就用业务线程池执行,否则直接执行。
pair.getFirst().process(ctx, rpcMessage);
进入RM注册操作,通过Netty的RegRmProcessor。
RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata', applicationId='order-service',transactionServiceGroup='my_test_tx_group'},channel:[id: 0xb806e7c6, L:/127.0.0.1:8091 - R:/127.0.0.1:57663],client version:1.4.2
可以看到rm注册的信息:数据源连接信息、应用id、事务服务组、netty通道信息
进入Tm注册操作,通过Netty的RegTmProcessor。
TM register success,message:RegisterTMRequest{applicationId='account-service', transactionServiceGroup='my_test_tx_group'},channel:[id: 0xf2eafbcb, L:/127.0.0.1:8091 - R:/127.0.0.1:57691],client version:1.4.2
可以看到tm注册的信息:应用id、事务服务组、netty通道信息
/**
* 购买下单,模拟全局事务提交
*
* @return
*/
@RequestMapping("/purchase/commit")
public Boolean purchaseCommit(HttpServletRequest request) {
businessService.purchase("1001", "2001", 1);
return true;
}
/**
* 减库存,下订单
*
* @param userId
* @param commodityCode
* @param orderCount
*/
@GlobalTransactional
public void purchase(String userId, String commodityCode, int orderCount) {
LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
stockClient.deduct(commodityCode, orderCount);
orderClient.create(userId, commodityCode, orderCount);
}
可以看到减库存,下订单信息:
此时我们可以看到有一个方法:io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke
可以看到拦截器里面有一个invoke方法,此方法会获取全局事务注解和全局锁注解。
final GlobalTransactional globalTransactionalAnnotation =getAnnotation(method, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
根据对应的注解执行对应的处理:
handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
handleGlobalLock(methodInvocation, globalLockAnnotation)
可以看到控制台打印的日志:
业务系统客户端:
发起的事务:
分支事务:
可以看到分支事务二阶段提交和提交状态committed。
可以看到这个过程经历了:
开启全局事务
创建全局session
session开启
channelRead执行业务处理processMessage
注册分支事务
提交全局事务
二阶段分支提交
二阶段提交发送
channelRead执行业务处理processMessage
执行处理processMessage
二阶段提交,删除分支
提交全局事务成功
其中:
在二阶段提交前,会先生成前镜像,然后执行业务sql,然后生成后镜像,准备事务日志。
完成后,会执行二阶段的提交操作。
整个过程的操作:
一阶段:首先拦截sql,解析sql语句的语义,提取元数据,找到sql语句,在执行sql前生成前镜像,执行业务sql后,生成后镜像。生成seata事务锁数据,然后构建事务日志并插入事务日志表,注册分支事务。
二阶段:二阶段分支提交,删除保存的事务日志数据,完成数据清理。通过异步线程批量删除在二阶段提交的分支事务日志数据。如果是二阶段回滚操作,则通过事务协调管理器执行二阶段回滚,此时资源管理器会执行回滚一阶段已经执行的业务sql语句,还原数据。