前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Seata学习整理

Seata学习整理

作者头像
路行的亚洲
发布2023-02-28 13:29:51
2680
发布2023-02-28 13:29:51
举报
文章被收录于专栏:后端技术学习后端技术学习

以下代码基于seata和seata-example

一、Seata使用的业务场景

在配车的业务中,我们使用了Seata的分布式事务来保证配车的业务逻辑能够正常时,才会做订单信息推送到财务系统。我们的系统配车业务一开始使用Seata的TCC模式来实现的,需要自己实现try和confirm或者cancel的逻辑。之后,由于seata推出了AT模式,之后系统采用的分布式事务使用的是AT模式。

相比如之前的TCC模式,AT模式只需要添加 @GlobalTransactional就可以实现分布式事务。

代码语言:javascript
复制
   @GlobalTransactional(rollbackFor = Exception.class)
    @Override
    public Integer confirmCarSubmit(MatchConfirmCarDTO matchConfirmCarDTO, LoginInfoDto loginInfoDto) {
        return retailOrderService.matchConfirmCarSubmit(null, matchConfirmCarDTO, loginInfoDto);
    }

配车的过程中,如果配了车,则需要修改库存中车辆库存状态为已锁定,配车状态为已配车,且释放原车。同时做财务信息推送。由于涉及到库存和财务信息,因此需要用到分布式事务。

那么为啥加了 @GlobalTransactional,它就可以实现分布式事务呢?

首先Seata分为两端,Seata Server和Seata Client。TC作为Seata的Server端,而RM和TM作为客户端。由于其是注解,因此,我们可以想象得到应该是基于全局事务注解。

下面我们下载seata的源码,基于seata的源码进行学习。

二、Seata服务端启动

首先启动Seata Server:

可以看到Seata Server主要做了这样几件事:

1)初始化监控度量信息

2)初始化改良版雪花算法UUID

3) 初始化SessionHolder

4) 初始化协调器TC,并将其设置为handler

5) 初始化netty服务端

代码语言:javascript
复制
    public static void main(String[] args) throws IOException {

        //initialize the metrics
        LOGGER.info("------初始化监控信息-----");
        MetricsManager.get().init();

        LOGGER.info("------初始化改良版雪花算法UUID-----");
        UUIDGenerator.init(parameterParser.getServerNode());

        LOGGER.info("------初始化SessionHolder-----");
        SessionHolder.init(parameterParser.getStoreMode());

        // 事务协调器,相当于netty的handler角色
        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        LOGGER.info("------初始化协调器TC-----");
        coordinator.init();
        nettyRemotingServer.setHandler(coordinator);

        LOGGER.info("------初始化netty服务端-----");
        nettyRemotingServer.init();
    }

其中最重点的是coordinator.init(); 事务协调器初始化。

其主要启动了5个定时任务:

代码语言:javascript
复制
 handleRetryRollbacking();
 handleRetryCommitting();
 handleAsyncCommitting();
 timeoutCheck();
 undoLogDelete();

其中:

handleRetryRollbacking(); 处理重试回滚,每秒1次 handleRetryCommitting(); 处理重试提交,每秒1次 handleAsyncCommitting(); 处理异步提交,每秒1次 timeoutCheck(); 超时提交,每秒1次 undoLogDelete(); undo log删除,每3小时1次

三、Seata客户端启动

客户端启动的时候,可以看到其会执行GlobalTransactionScanner继承了InitializingBean和AbstractAutoProxyCreator。因此

代码语言:javascript
复制
@Override
    public void afterPropertiesSet() {
        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
            (ConfigurationChangeListener)this);
        if (disableGlobalTransaction) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Global transaction is disabled.");
            }
            return;
        }
        if (initialized.compareAndSet(false, true)) {
            LOGGER.info("初始化客户端");
            initClient();
        }
    }

io.seata.spring.annotation.GlobalTransactionScanner#initClient方法,初始化客户端:

代码语言:javascript
复制
 private void initClient() {
        //init TM
        LOGGER.info("-------TM客户端初始化-------");
        TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);

        //init RM
        LOGGER.info("-------RM客户端初始化-------");
        RMClient.init(applicationId, txServiceGroup);

        // 注册钩子方法
        registerSpringShutdownHook();

    }

主要做了三件事:

代码语言:javascript
复制
初始化TM,注册相关处理器,同时放入processorTable
初始化RM,注册相关处理器,同时放入processorTable
注册钩子方法

接着注册服务到seata中,然后Netty会执行channelRead执行事件处理:

代码语言:javascript
复制
 protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
        Object body = rpcMessage.getBody();
        if (body instanceof MessageTypeAware) {
            // 根据messageType获取对应的处理器和线程池,processorTable 在 netty.init 时会初始化
            MessageTypeAware messageTypeAware = (MessageTypeAware) body;
            final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
            // 判断该Pair是否有初始化线程池,如果有就用业务线程池执行,否则直接执行
            if (pair != null) {
                if (pair.getSecond() != null) {
                    try {
                        //执行处理
                        pair.getSecond().execute(() -> {
                            try {
                                LOGGER.info("---执行处理processMessage-----");
                                pair.getFirst().process(ctx, rpcMessage);
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            }
                        });
                    } catch (RejectedExecutionException e) {
                        LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                            "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                        if (allowDumpStack) {
                            String name = ManagementFactory.getRuntimeMXBean().getName();
                            String pid = name.split("@")[0];
                            int idx = new Random().nextInt(100);
                            try {
                                Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
                            } catch (IOException exx) {
                                LOGGER.error(exx.getMessage());
                            }
                            allowDumpStack = false;
                        }
                    }
                } else {
                    try {
                        pair.getFirst().process(ctx, rpcMessage);
                    } catch (Throwable th) {
                        LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                    }
                }
            } else {
                LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());
            }
        } else {
            LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);
        }
    }

判断该Pair是否有初始化线程池,如果有就用业务线程池执行,否则直接执行。

pair.getFirst().process(ctx, rpcMessage);

进入RM注册操作,通过Netty的RegRmProcessor。

代码语言:javascript
复制
RM register success,message:RegisterRMRequest{resourceIds='jdbc:mysql://127.0.0.1:3306/seata', applicationId='order-service',transactionServiceGroup='my_test_tx_group'},channel:[id: 0xb806e7c6, L:/127.0.0.1:8091 - R:/127.0.0.1:57663],client version:1.4.2

可以看到rm注册的信息:数据源连接信息、应用id、事务服务组、netty通道信息

进入Tm注册操作,通过Netty的RegTmProcessor。

代码语言:javascript
复制
TM register success,message:RegisterTMRequest{applicationId='account-service', transactionServiceGroup='my_test_tx_group'},channel:[id: 0xf2eafbcb, L:/127.0.0.1:8091 - R:/127.0.0.1:57691],client version:1.4.2

可以看到tm注册的信息:应用id、事务服务组、netty通道信息

四、执行业务系统业务

代码语言:javascript
复制
 /**
     * 购买下单,模拟全局事务提交
     *
     * @return
     */
    @RequestMapping("/purchase/commit")
    public Boolean purchaseCommit(HttpServletRequest request) {
        businessService.purchase("1001", "2001", 1);
        return true;
    }

  /**
     * 减库存,下订单
     *
     * @param userId
     * @param commodityCode
     * @param orderCount
     */
    @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
        stockClient.deduct(commodityCode, orderCount);
        orderClient.create(userId, commodityCode, orderCount);
    }

可以看到减库存,下订单信息:

此时我们可以看到有一个方法:io.seata.spring.annotation.GlobalTransactionalInterceptor#invoke

可以看到拦截器里面有一个invoke方法,此方法会获取全局事务注解和全局锁注解。

代码语言:javascript
复制
   final GlobalTransactional globalTransactionalAnnotation =getAnnotation(method, targetClass, GlobalTransactional.class);
   final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);

根据对应的注解执行对应的处理:

代码语言:javascript
复制
handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
handleGlobalLock(methodInvocation, globalLockAnnotation)

可以看到控制台打印的日志:

业务系统客户端:

发起的事务:

分支事务:

可以看到分支事务二阶段提交和提交状态committed。

可以看到这个过程经历了:

代码语言:javascript
复制
开启全局事务
创建全局session
session开启
channelRead执行业务处理processMessage
注册分支事务
提交全局事务
二阶段分支提交
二阶段提交发送
channelRead执行业务处理processMessage
执行处理processMessage
二阶段提交,删除分支
提交全局事务成功

其中:

在二阶段提交前,会先生成前镜像,然后执行业务sql,然后生成后镜像,准备事务日志。

完成后,会执行二阶段的提交操作。

五、总结

整个过程的操作:

一阶段:首先拦截sql,解析sql语句的语义,提取元数据,找到sql语句,在执行sql前生成前镜像,执行业务sql后,生成后镜像。生成seata事务锁数据,然后构建事务日志并插入事务日志表,注册分支事务。

二阶段:二阶段分支提交,删除保存的事务日志数据,完成数据清理。通过异步线程批量删除在二阶段提交的分支事务日志数据。如果是二阶段回滚操作,则通过事务协调管理器执行二阶段回滚,此时资源管理器会执行回滚一阶段已经执行的业务sql语句,还原数据。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-12-05,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 后端技术学习 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、Seata使用的业务场景
  • 二、Seata服务端启动
  • 三、Seata客户端启动
  • 四、执行业务系统业务
  • 五、总结
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档