前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >seata TC启动流程分析

seata TC启动流程分析

作者头像
luoxn28
发布2021-01-28 15:52:04
7730
发布2021-01-28 15:52:04
举报
文章被收录于专栏:TopCoderTopCoder

编者注:Seata 是一款阿里开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案,github地址:https://github.com/seata/seata。

分析seata TC启动流程之前,首先看下分布式事务的核心要点

  1. 事务的持久化,事务所处的各种状态事务参与方的各种状态都需要持久化,当实例宕机时才能基于持久化的数据对事务回滚或提交,实现最终一致性
  2. 定时对超时未完成事务的处理(继续尝试提交或回滚),即通过重试机制实现事务的最终一致性
  3. 分布式事务的跨服务实例传播,当分布式事务跨多个实例时需要实现事务的传播,一般需要适配不同的rpc框架
  4. 事务的隔离级别:大多数分布式事务为了性能,默认的隔离级别是读未提交
  5. 幂等性:对于XA或者seata的AT这样的分布式事务来说,都已经默认实现了幂等性,而TCC、Saga这种接口级别实现的分布式事务都还需要业务开发者自己实现幂等性。

tc-server端启动流程如下:

启动类

seata-server 启动方法 io.seata.server.Server#main,默认启动端口 SERVER_DEFAULT_PORT = 8091。main方法主要是解析并设置一些配置,初始化几个线程池,启动DefaultCoordinator和Netty服务等,源码如下:

代码语言:javascript
复制
public static void main(String[] args) throws IOException {
    int port = PortHelper.getPort(args);
    System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

    // file/db/redis
    System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

    // 消息处理线程池,netty 接收到消息后传递给该线程进行处理
    ThreadPoolExecutor workingThreads = new ThreadPoolExecutor(NettyServerConfig.getMinServerPoolSize(),
            NettyServerConfig.getMaxServerPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
            new NamedThreadFactory("ServerHandlerThread", NettyServerConfig.getMaxServerPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());

    NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
    nettyRemotingServer.setListenPort(parameterParser.getPort());
    UUIDGenerator.init(parameterParser.getServerNode());
    SessionHolder.init(parameterParser.getStoreMode());

    // 默认的事务协调器:AT/TCC/XA/saga 等
    DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
    coordinator.init(); // 初始化几个线程池,commit重试、rollback重试等,TODO 待分析
    nettyRemotingServer.setHandler(coordinator);
    // register ShutdownHook
    ShutdownHook.getInstance().addDisposable(coordinator);
    ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

    XID.setIpAddress(NetUtil.getLocalIp());
    XID.setPort(nettyRemotingServer.getListenPort());

    // 启动服务
    nettyRemotingServer.init();
    System.exit(0);
}

SessionHolder负责Session的持久化,一个Session对象对应一个事务,事务分为两种:全局事务(GlobalSession)和分支事务(BranchSession)。SessionHolder支持file、db、redis这几种持久化方式,其中db支持集群模式,推荐使用db。SessionHolder中最主要的四个字段如下:

代码语言:javascript
复制
// ROOT_SESSION_MANAGER用于获取所有的Setssion,以及Session的创建、更新、删除等。
private static SessionManager ROOT_SESSION_MANAGER;
// 用于获取、更新所有的异步commit的Session
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 用于获取、更新所有需要重试commit的Session
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 用于获取、更新所有需要重试rollback的Session
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

目前TC这块关于HA的特性这块支持不太多,不过可以将session store设置为DB来支持TC的故障转移,毕竟session数据还在。

DefaultCoordinator是事务协调器的核心,如:开启、提交、回滚全局事务,注册、提交、回滚分支事务都是由DefaultCoordinator负责协调处理的。DefaultCoordinato通过RpcServer与远程的TM、RM通信来实现分支事务的提交、回滚等。

netty初始化

netty初始化从方法io.seata.core.rpc.netty.NettyRemotingServer#init开始,首先会注册各种处理器,然后开始标准的netty启动过程:启动bossGroup和wrokerGroup,设置编解码器和channelHandlers。

代码语言:javascript
复制
public void init() {
    // registry processor
    registerProcessor();
    // netty初始化
    super.init();
}

public void start() {
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
        .localAddress(new InetSocketAddress(listenPort))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                // 心跳
                ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                    .addLast(new ProtocolV1Decoder())
                    .addLast(new ProtocolV1Encoder());
                if (channelHandlers != null) {
                    addChannelPipelineLast(ch, channelHandlers);
                }
            }
        });

    ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
    RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
    initialized.set(true);
    future.channel().closeFuture().sync();
}

netty注册各种处理器也就是对应的业务处理器,这些处理器有些是在业务线程池中执行的,这里的业务线程池也就是main方法中创建的ThreadPoolExecutor workingThreads线程池。处理器类型有TC注册、RM注册、全局事务操作、心跳等类型:

代码语言:javascript
复制
private void registerProcessor() {
    // 1. registry on request message processor
    ServerOnRequestProcessor onRequestProcessor =
        new ServerOnRequestProcessor(this, getHandler());
    super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
    // 2. registry on response message processor
    ServerOnResponseProcessor onResponseProcessor =
        new ServerOnResponseProcessor(getHandler(), getFutures());
    super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
    super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
    // 3. registry rm message processor
    RegRmProcessor regRmProcessor = new RegRmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
    // 4. registry tm message processor
    RegTmProcessor regTmProcessor = new RegTmProcessor(this);
    super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
    // 5. registry heartbeat message processor
    ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
    super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
}

注意:如果super.registerProcessor入参executor为空的话,就会在netty worker线程中来执行

之后TC server端就开始工作了,接收TM/RM的请求,然后进行处理,实际的处理大都是交给不同的message processor处理器 来完成的。当处理器完成之后,再通过netty channel回写响应结果给客户端,这就是大致的通信流程。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-01-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TopCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 启动类
  • netty初始化
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档