TC的业务channelHandler为类 io.seata.core.rpc.netty.AbstractNettyRemotingServer.ServerHandler
,注意到达该类的请求都是经过编解码的了,请求类型为RpcMessage。ServerHandler类处理方法有:
channelRead接收到数据的回调方法后,通过rpcMessage.body获取messageTypeAware,然后找到该type对应的处理器,最后将rpcMessage消息交给对应处理器来执行。注意如果处理器执行线程池为空会直接在当前线程(netty worker线程
)中来执行处理逻辑。对应代码如下:
protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
MessageTypeAware messageTypeAware = (MessageTypeAware) rpcMessage.getBody();
final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());
if (pair.getSecond() != null) {
pair.getSecond().execute(() -> { // 交给线程池来执行
pair.getFirst().process(ctx, rpcMessage);
});
} else {
pair.getFirst().process(ctx, rpcMessage);
}
}
RpcMessage处理器初始化逻辑在方法io.seata.core.rpc.netty.NettyRemotingServer#registerProcessor
中,会按照不同类型进行注册:
private void registerProcessor() {
// 1. registry on request message processor
ServerOnRequestProcessor onRequestProcessor =
new ServerOnRequestProcessor(this, getHandler());
super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
// 2. registry on response message processor
ServerOnResponseProcessor onResponseProcessor =
new ServerOnResponseProcessor(getHandler(), getFutures());
super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
// 3. registry rm message processor
RegRmProcessor regRmProcessor = new RegRmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
// 4. registry tm message processor
RegTmProcessor regTmProcessor = new RegTmProcessor(this);
super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
// 5. registry heartbeat message processor
ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}
有些处理器是有线程池的,处理器执行线程池初的创建代码在方法io.seata.server.Server#main
中,从线程池参数可以看出,默认核心线程数50,最大线程数500,阻塞队列大小20000,拒绝策略为在当前线程中执行。
ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
下面就该关注message processor中的处理逻辑了,这里不同请求对应不同的处理器类型,下面重点关注ServerOnRequestProcessor
、RegRmProcessor
和RegTmProcessor
。注意:MessageType.TYPE_REG_RM
和MessageType.TYPE_REG_CLT
是MR/TM启动后发送注册请求的类型。
RegTmProcessor和RegRmProcessor接收到处理流程基本是一样的,如下:
IDENTIFIED_CHANNELS
和RM_CHANNELS/TM_CHANNELS
写channel信息,IDENTIFIED_CHANNELS会存储所有client信息,RM_CHANNELS只存储rm client信息,TM_CHANNELS只存储tm client信息ServerOnRequestProcessor处理RM/TM客户端请求消息,对应请求消息类型如下:
/**
* process RM/TM client request message.
* message type:
* RM:
* 1) {@link MergedWarpMessage}
* 2) {@link BranchRegisterRequest}
* 3) {@link BranchReportRequest}
* 4) {@link GlobalLockQueryRequest}
* TM:
* 1) {@link MergedWarpMessage}
* 2) {@link GlobalBeginRequest}
* 3) {@link GlobalCommitRequest}
* 4) {@link GlobalReportRequest}
* 5) {@link GlobalRollbackRequest}
* 6) {@link GlobalStatusRequest}
*/
上诉的不同消息类型对应TCInboundHandler类中不同的处理方法,如下所示:
public interface TCInboundHandler {
GlobalBeginResponse handle(GlobalBeginRequest globalBegin, RpcContext rpcContext);
GlobalCommitResponse handle(GlobalCommitRequest globalCommit, RpcContext rpcContext);
GlobalRollbackResponse handle(GlobalRollbackRequest globalRollback, RpcContext rpcContext);
BranchRegisterResponse handle(BranchRegisterRequest branchRegister, RpcContext rpcContext);
BranchReportResponse handle(BranchReportRequest branchReport, RpcContext rpcContext);
GlobalLockQueryResponse handle(GlobalLockQueryRequest checkLock, RpcContext rpcContext);
GlobalStatusResponse handle(GlobalStatusRequest globalStatus, RpcContext rpcContext);
GlobalReportResponse handle(GlobalReportRequest globalReport, RpcContext rpcContext);
}
下面按照分布式事务执行先后顺序进行逐个分析:
全局事务提交请求入口为方法io.seata.server.AbstractTCInboundHandler#handle(GlobalCommitRequest, RpcContext)
,代码如下:
public GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {
GlobalCommitResponse response = new GlobalCommitResponse();
response.setGlobalStatus(GlobalStatus.Committing);
exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {
@Override
public void execute(GlobalCommitRequest request, GlobalCommitResponse response) {
// 全局事务提交
doGlobalCommit(request, response, rpcContext);
}
@Override
public void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {
// 设置错误码,检查响应全局事务状态
super.onException(request, response, rex);
checkTransactionStatus(request, response);
}
}, request, response);
return response;
}
真正的实物提交在方法io.seata.server.coordinator.DefaultCore#commit
中,首先会判断globalSession是否可以提交(执行close globalSession,清除分支事务锁资源),然后遍历分支事务进行提交操作。
public GlobalStatus commit(String xid) throws TransactionException {
GlobalSession globalSession = SessionHolder.findGlobalSession(xid);
if (globalSession == null) {
return GlobalStatus.Finished;
}
globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {
// Highlight: Firstly, close the session, then no more branch can be registered.
globalSession.closeAndClean();
// close clean 之后,会不会出现同一个RM服务中,另一个事务进行全局加锁成功,但是这个全局事务还未commit的情况??
// 由于清除了分支锁资源,所以是可能存在的,但是由于RM上事务已经提交(在阶段一就行了提交)或者RM事务失败,因此是可以让RM上其他实物进行加锁的
if (globalSession.getStatus() == GlobalStatus.Begin) {
if (globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return false;
} else {
globalSession.changeStatus(GlobalStatus.Committing);
return true;
}
}
return false;
});
if (shouldCommit) {
boolean success = doGlobalCommit(globalSession, false);
//If successful and all remaining branches can be committed asynchronously, do async commit.
if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {
globalSession.asyncCommit();
return GlobalStatus.Committed;
} else {
return globalSession.getStatus();
}
} else {
return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();
}
}
针对分支事务的提交操作是由方法doGlobalCommit
来完成的,会遍历所有分支事务进行branchCommit
,这里是同步方式发送给客户端BranchCommitRequest
请求,接收到成功提交结果之后就会将该分支事务从全局事务中移除,如果分支事务能够异步进行提交,TC会尝试进行异步提交操作,对应的处理线程池为 io.seata.server.coordinator.DefaultCoordinator#retryCommitting
。
全局事务回滚流程大致和全局事务提交类似,都是遍历所有分支事务进行branchRollback,接收到成功回滚之后将该分支事务从全局事务中移除,如果异常会有对应的重试回滚逻辑。注意这里有一个重要的差别是:针对分支事务上的锁资源,提交时可以先进行globalSession.closeAndClean()
,而回滚时只能先进行globalSession.close(),最后再执行锁资源清除。进行globalSession.close()是为了不再接受RM的注册请求,回滚时先不清除锁资源是为了防止在分支事务为回滚成功之前,该RM上其他事务进行同样的lockKeys全局加锁操作。
事务提交和回滚涉及到retryRollbacking/retryCommitting/asyncCommitting
这几个线程池,这些线程池默认都是运行周期都是1000ms,初始化代码如下:
retryRollbacking.scheduleAtFixedRate(() -> {
handleRetryRollbacking();
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
retryCommitting.scheduleAtFixedRate(() -> {
handleRetryCommitting();
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
asyncCommitting.scheduleAtFixedRate(() -> {
handleAsyncCommitting();
}, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);
异步提交和重试提交都会调用doGlobalCommit方法,区别就是异步就一直调用,而重试提交有最大超时时间限制,如果某个GlobalSession超过一定时间还未提交成功,那么不会再进行重试操作。重试回滚会调用doGlobalRollback方法,并且也有最大超时时间的限制。