前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >seata TC 请求处理流程

seata TC 请求处理流程

作者头像
luoxn28
发布2021-01-28 15:53:08
8360
发布2021-01-28 15:53:08
举报
文章被收录于专栏:TopCoderTopCoder

TC的业务channelHandler为类 io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler,注意到达该类的请求都是经过编解码的了,请求类型为RpcMessage。ServerHandler类处理方法有:

channelRead

channelRead接收到数据的回调方法后,通过rpcMessage.body获取messageTypeAware,然后找到该type对应的处理器,最后将rpcMessage消息交给对应处理器来执行。注意如果处理器执行线程池为空会直接在当前线程(netty worker线程)中来执行处理逻辑。对应代码如下:

代码语言:javascript
复制
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    MessageTypeAware messageTypeAware = (MessageTypeAware) rpcMessage.getBody();
    final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
    if (pair.getSecond() != null) {
        pair.getSecond().execute(() -> { // 交给线程池来执行
            pair.getFirst().process(ctx, rpcMessage);
        });
    } else {
        pair.getFirst().process(ctx, rpcMessage);
    }
}

RpcMessage处理器初始化逻辑在方法io.seata.core.rpc.netty.NettyRemotingServer#registerProcessor中,会按照不同类型进行注册:

代码语言:javascript
复制
private void registerProcessor() {
    // 1. registry on request message processor
    ServerOnRequestProcessor onRequestProcessor =
        new ServerOnRequestProcessor(this, getHandler());
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
    // 2. registry on response message processor
    ServerOnResponseProcessor onResponseProcessor =
        new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
    // 3. registry rm message processor
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
    // 4. registry tm message processor
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
    // 5. registry heartbeat message processor
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

有些处理器是有线程池的,处理器执行线程池初的创建代码在方法io.seata.server.Server#main中,从线程池参数可以看出,默认核心线程数50,最大线程数500,阻塞队列大小20000,拒绝策略为在当前线程中执行。

代码语言:javascript
复制
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
        NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
        new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());

下面就该关注message processor中的处理逻辑了,这里不同请求对应不同的处理器类型,下面重点关注ServerOnRequestProcessorRegRmProcessorRegTmProcessor。注意:MessageType.TYPE_REG_RMMessageType.TYPE_REG_CLT是MR/TM启动后发送注册请求的类型。

RegTmProcessor和RegRmProcessor

RegTmProcessor和RegRmProcessor接收到处理流程基本是一样的,如下:

  • 解析client channel ip/port信息存下来
  • 然后往IDENTIFIED_CHANNELSRM_CHANNELS/TM_CHANNELS写channel信息,IDENTIFIED_CHANNELS会存储所有client信息,RM_CHANNELS只存储rm client信息,TM_CHANNELS只存储tm client信息
  • 最后异步返回请求结果,TM对应RegisterTMResponse,RM对应RegisterRMResponse。

ServerOnRequestProcessor

ServerOnRequestProcessor处理RM/TM客户端请求消息,对应请求消息类型如下:

代码语言:javascript
复制
/**
 * process RM/TM client request message.
 * message type:
 * RM:
 * 1) {@link MergedWarpMessage}
 * 2) {@link BranchRegisterRequest}
 * 3) {@link BranchReportRequest}
 * 4) {@link GlobalLockQueryRequest}
 * TM:
 * 1) {@link MergedWarpMessage}
 * 2) {@link GlobalBeginRequest}
 * 3) {@link GlobalCommitRequest}
 * 4) {@link GlobalReportRequest}
 * 5) {@link GlobalRollbackRequest}
 * 6) {@link GlobalStatusRequest}
 */

上诉的不同消息类型对应TCInboundHandler类中不同的处理方法,如下所示:

代码语言:javascript
复制
public interface TCInboundHandler {
    GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);
    GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext);
    GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext);
    BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);
    BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext);
    GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext);
    GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext);
    GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);
}

下面按照分布式事务执行先后顺序进行逐个分析:

  • GlobalBeginRequest:事务开始,创建GlobalSession,调用UUIDGenerator.generateUUID()生成transactionId,ip+port+transactionId作为事务xid,然后调用begin开启事务,设置事务状态为GlobalStatus.Begin,执行SessionLifecycleListener回调(GLOBAL_ADD类型的writeSession持久化)。
  • BranchRegisterRequest:分支事务注册,根据xid找到对应的GlobalSession,锁定该GlobalSession,然后执行业务逻辑。首先创建分支事务,设置branchId=UUIDGenerator.generateUUID(),锁定branchSessionLock对应的lockKeys(AT模式),然后执行BranchSession添加分支事务到全局事务中(此时分支事务状态为BranchStatus.Registered),执行SessionLifecycleListener回调(BRANCH_ADD类型的writeSession持久化),分时事务存储在GlobalSession.branchSessions字段,最后返回分支事务branchId。
  • BranchReportRequest:分支报告请求,获取对应的GlobalSession和BranchSession,执行SessionLifecycleListener回调(BRANCH_UPDATE类型的writeSession持久化)。
  • GlobalReportRequest:全局报告请求,目前TC端真正有业务语义的是SAGA模式,其他模式都是执行回调后直接返回GlobalStatus作为响应结果。
  • GlobalLockQueryRequest:分支事务全局锁定查询,AT模式下才有具体意义的业务逻辑,这里只是检查对应的lockKey是否处于锁定状态。
  • GlobalStatusRequest:全局事务状态查询,只是获取对应的GlobalSession.status。

全局事务提交

全局事务提交请求入口为方法io.seata.server.AbstractTCInboundHandler#handle(GlobalCommitRequest, RpcContext),代码如下:

代码语言:javascript
复制
public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
    GlobalCommitResponse response = new GlobalCommitResponse();
    response.setGlobalStatus(GlobalStatus.Committing);
    exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
        @Override
        public void execute(GlobalCommitRequest request, GlobalCommitResponse response) {
            // 全局事务提交
            doGlobalCommit(request, response, rpcContext);
        }
        @Override
        public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {
            // 设置错误码,检查响应全局事务状态
            super.onException(request, response, rex);
            checkTransactionStatus(request, response);
        }


    }, request, response);
    return response;
}

真正的实物提交在方法io.seata.server.coordinator.DefaultCore#commit中,首先会判断globalSession是否可以提交(执行close globalSession,清除分支事务锁资源),然后遍历分支事务进行提交操作。

代码语言:javascript
复制
public GlobalStatus commit(String xid) throws TransactionException {
    GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
    if (globalSession == null) {
        return GlobalStatus.Finished;
    }
    globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
    // just lock changeStatus

    boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
        // Highlight: Firstly, close the session, then no more branch can be registered.
        globalSession.closeAndClean();
        // close clean 之后,会不会出现同一个RM服务中,另一个事务进行全局加锁成功,但是这个全局事务还未commit的情况??
        // 由于清除了分支锁资源,所以是可能存在的,但是由于RM上事务已经提交(在阶段一就行了提交)或者RM事务失败,因此是可以让RM上其他实物进行加锁的
        if (globalSession.getStatus() == GlobalStatus.Begin) {
            if (globalSession.canBeCommittedAsync()) {
                globalSession.asyncCommit();
                return false;
            } else {
                globalSession.changeStatus(GlobalStatus.Committing);
                return true;
            }
        }
        return false;
    });

    if (shouldCommit) {
        boolean success = doGlobalCommit(globalSession, false);
        //If successful and all remaining branches can be committed asynchronously, do async commit.
        if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
            globalSession.asyncCommit();
            return GlobalStatus.Committed;
        } else {
            return globalSession.getStatus();
        }
    } else {
        return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
    }
}

针对分支事务的提交操作是由方法doGlobalCommit来完成的,会遍历所有分支事务进行branchCommit,这里是同步方式发送给客户端BranchCommitRequest请求,接收到成功提交结果之后就会将该分支事务从全局事务中移除,如果分支事务能够异步进行提交,TC会尝试进行异步提交操作,对应的处理线程池为 io.seata.server.coordinator.DefaultCoordinator#retryCommitting

全局事务回滚

全局事务回滚流程大致和全局事务提交类似,都是遍历所有分支事务进行branchRollback,接收到成功回滚之后将该分支事务从全局事务中移除,如果异常会有对应的重试回滚逻辑。注意这里有一个重要的差别是:针对分支事务上的锁资源,提交时可以先进行globalSession.closeAndClean(),而回滚时只能先进行globalSession.close(),最后再执行锁资源清除。进行globalSession.close()是为了不再接受RM的注册请求,回滚时先不清除锁资源是为了防止在分支事务为回滚成功之前,该RM上其他事务进行同样的lockKeys全局加锁操作。

retryRollbacking/retryCommitting/asyncCommitting线程池

事务提交和回滚涉及到retryRollbacking/retryCommitting/asyncCommitting这几个线程池,这些线程池默认都是运行周期都是1000ms,初始化代码如下:

代码语言:javascript
复制
retryRollbacking.scheduleAtFixedRate(() -> {
    handleRetryRollbacking();
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

retryCommitting.scheduleAtFixedRate(() -> {
    handleRetryCommitting();
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

asyncCommitting.scheduleAtFixedRate(() -> {
    handleAsyncCommitting();
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);    

异步提交和重试提交都会调用doGlobalCommit方法,区别就是异步就一直调用,而重试提交有最大超时时间限制,如果某个GlobalSession超过一定时间还未提交成功,那么不会再进行重试操作。重试回滚会调用doGlobalRollback方法,并且也有最大超时时间的限制。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-01-19,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TopCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • channelRead
  • RegTmProcessor和RegRmProcessor
  • ServerOnRequestProcessor
  • 全局事务提交
  • 全局事务回滚
  • retryRollbacking/retryCommitting/asyncCommitting线程池
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档