前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >seata TC 模块分析

seata TC 模块分析

作者头像
luoxn28
发布2021-01-28 15:52:45
1.2K0
发布2021-01-28 15:52:45
举报
文章被收录于专栏:TopCoderTopCoder

在分析TC各模块之前,首先再回顾下seata的整个执行流程:

  • TM:事务的发起者。用来告诉TC,全局事务的开始,提交,回滚。
  • RM:具体的事务资源,每一个RM都会作为一个分支事务注册在TC。
  • TC:事务的协调者。也可以看做是seata-server,用于接收事务注册,提交和回滚。

为什么TC是seata核心呢?因为TC这个角色就好像上帝一样,协调控制TM、RM协同工作,TC一旦不好使,那么RM和TM就会出现问题,那必定会乱的一塌糊涂。

那么一个优秀的事务协调者应该具备哪些能力呢?

  • 正确的协调:能正确的协调RM和TM接下来应该做什么,做错了应该怎么办,做对了应该怎么办。
  • 高可用: 事务协调器在分布式事务中很重要,如果不能保证高可用,那么它也没有存在的必要了。
  • 高性能:事务协调器的性能一定要高,如果事务协调器性能有瓶颈那么它所管理的RM和TM那么会经常遇到超时,从而引起回滚频繁。
  • 高扩展性:这个特点是属于代码层面的,如果是一个优秀的框架,那么需要给使用方很多自定义扩展,比如服务注册/发现,读取配置等等。

TC整体设计

TC整体设计如上,各模块说明如下:

  • Coordinator Core: 在最下面的模块是事务协调器核心代码,主要用来处理事务协调的逻辑,如是否commit,rollback等协调活动。
  • Store: 存储模块,用来将我们的数据持久化,防止重启或者宕机数据丢失。
  • Discovery: 服务注册/发现模块,用于将Server地址暴露给我们Client。
  • Config: 用来存储和查找我们服务端的配置。
  • Lock: 锁模块,用于给Seata提供全局锁的功能。
  • RPC: 用于和其它端通信。
  • HA-Cluster: 高可用集群,目前还没开源。

Discovery

Discovery模块就是服务发现模块,TC启动后需要将自己的信息注册到服务中心,这样才能暴露给其他使用者,Discovery接口定义如下:

代码语言:javascript
复制
public interface RegistryService<T> {
    void register(InetSocketAddress address) throws Exception;
    void unregister(InetSocketAddress address) throws Exception;
    void subscribe(String cluster, T listener) throws Exception;
    void unsubscribe(String cluster, T listener) throws Exception;
    List<InetSocketAddress> lookup(String key) throws Exception;
    void close() throws Exception;
}

上述方法看定义就能知道其作用,因此不在赘述。Discovery模块有多个实现类,如下图所示:

我们知道,服务注册到服务中心,一般需要与服务中心进行心跳保活,否则服务中心会将该服务信息给清除,一般服务中心的client jar包会集成对应的心跳能力。但是针对redis来说,该如何注册呢,下面就以redis作为示例来分析服务注册流程,对应类 RedisRegistryServiceImpl。

从源码来看,seata使用redis注册是使用的是hash字典结构,那么它怎么心跳的呢?准确来说,seata注册redis是没有心跳的,只使用到了redis channel作为通知机制来保证tc实例变化时的通知上下线能力。注意 redis channel只有在更改时的通知能力,因此tm/rm在启动时需要先从redis获取数据之后,然后再设置channel监听,seata对应逻辑在方法io.seata.discovery.registry.redis.RedisRegistryServiceImpl#subscribe中,严格来讲,在获取数据和设置channel监听之间,如果数据发生了变更,是存在更新丢失问题的,不过这种问题触发概率极地可以忽略,并且后续有更新时还可以再次获取得到新的数据。

代码语言:javascript
复制
@Override
public void register(InetSocketAddress address) {
    NetUtil.validAddress(address);
    String serverAddr = NetUtil.toStringAddress(address);
    try (Jedis jedis = jedisPool.getResource()) {
        jedis.hset(getRedisRegistryKey(), serverAddr, ManagementFactory.getRuntimeMXBean().getName());
        jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.REGISTER);
    }
}

@Override
public void unregister(InetSocketAddress address) {
    NetUtil.validAddress(address);
    String serverAddr = NetUtil.toStringAddress(address);
    try (Jedis jedis = jedisPool.getResource()) {
        jedis.hdel(getRedisRegistryKey(), serverAddr);
        jedis.publish(getRedisRegistryKey(), serverAddr + "-" + RedisListener.UN_REGISTER);
    }
}

由于没有心跳能力,那就就需要在tc进程关闭(JVM关闭)时执行unregister逻辑,因此会添加对应的ShutdownHook钩子函数,其能保证在程序正常退出、System.out、Ctrl+C中断结束、系统关闭、OOM宕机、kill pid进程时被执行到,但是如果是执行kill -9 pid这种,是没有执行到钩子函数的。register是在Netty启动后进行注册的,对应的逻辑在方法io.seata.core.rpc.netty.NettyServerBootstrap#start中,这里不在赘述。

Config

配置模块是seata的基础模块,比如netty线程配置、session配置等,这些配置seata基本上都有默认配置。seata有一个关于配置的类Configuration:

代码语言:javascript
复制
public interface Configuration {
    int getInt(String dataId, int defaultValue);

    String getConfig(String dataId, long timeoutMills);
    boolean putConfig(String dataId, String content);
    boolean removeConfig(String dataId, long timeoutMills);
    boolean removeConfig(String dataId);
    void addConfigListener(String dataId, ConfigurationChangeListener listener);
    void removeConfigListener(String dataId, ConfigurationChangeListener listener);
}

目前seata适配了多种配置中心,如下:

  • getInt/Long/Boolean/getConfig():通过dataId来获取对应的值,读取不到配置、异常或超时将返回参数中的默认值。
  • putConfig:用于添加配置。
  • removeConfig:删除一个配置。
  • add/remove/get ConfigListener:添加/删除/获取 配置监听器,一般用来监听配置的变更。

在Seata中需要配置registry.conf,来配置config.type :

代码语言:javascript
复制
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
    dataId = "seataServer.properties"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
    apolloAccesskeySecret = ""
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

Store

Store模块为seata的存储模块,主要存储session数据,存储类为TransactionStoreManager,其主要提供了读写session接口:

代码语言:javascript
复制
public interface TransactionStoreManager {
    boolean writeSession(LogOperation logOperation, SessionStorable session);
    GlobalSession readSession(String xid);
    GlobalSession readSession(String xid, boolean withBranchSessions);
    List<GlobalSession> readSession(SessionCondition sessionCondition);
    void shutdown();

    enum LogOperation {
        /**
         * Global add log operation.
         */
        GLOBAL_ADD((byte)1),
        /**
         * Global update log operation.
         */
        GLOBAL_UPDATE((byte)2),
        /**
         * Global remove log operation.
         */
        GLOBAL_REMOVE((byte)3),
        /**
         * Branch add log operation.
         */
        BRANCH_ADD((byte)4),
        /**
         * Branch update log operation.
         */
        BRANCH_UPDATE((byte)5),
        /**
         * Branch remove log operation.
         */
        BRANCH_REMOVE((byte)6);

        private byte code;
    }
}

存储模块目前支持file/db/redis存储,如果需要保证TC可用性建议将数据存储到DB中。

写入file文件流程如下:

  • 加锁,开始将数据序列化,然后写入到currFileChannel
  • 写入完成之后,判断是否需要创建新的存储文件
  • 加锁,刷盘,刷盘有同步和异步2种方式,异步的话有flush线程来异步完成
代码语言:javascript
复制
public boolean writeSession(LogOperation logOperation, SessionStorable session) {
    writeSessionLock.lock();
    long curFileTrxNum;
    try {
        if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {
            return false;
        }
        lastModifiedTime = System.currentTimeMillis();
        curFileTrxNum = FILE_TRX_NUM.incrementAndGet();
        if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0
            && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {
            return saveHistory();
        }
    } catch (Exception exx) {
        return false;
    } finally {
        writeSessionLock.unlock();
    }
    flushDisk(curFileTrxNum, currFileChannel);
    return true;
}

注意这里的刷盘模式默认为异步模式,为了数据安全性的话可以设置为同步刷盘,避免系统断电写入pagecache中未刷盘的数据丢失。

Lock

大家知道数据库实现隔离级别主要是通过锁来实现的,同样的在分布式事务框架Seata中要实现隔离级别也需要通过锁。一般在数据库中数据库的隔离级别一共有四种:读未提交,读已提交,可重复读,串行化。在Seata中可以保证隔离级别是读已提交,但是提供了达到读已提交隔离的手段。

Lock模块也就是Seata实现隔离级别的核心模块。在Lock模块中提供了一个接口用于管理我们的锁:

代码语言:javascript
复制
public interface Locker {
    boolean acquireLock(List<RowLock> rowLock) ;
    boolean releaseLock(List<RowLock> rowLock);
    boolean releaseLock(String xid, Long branchId);
    boolean releaseLock(String xid, List<Long> branchIds);

    boolean isLockable(List<RowLock> rowLock);
    void cleanAllLocks();
}

其中有三个方法:

  • acquireLock:用于对我们的BranchSession加锁,这里虽然是传的分支事务Session,实际上是对分支事务的资源加锁,成功返回true。
  • isLockable:根据事务ID,资源Id,锁住的Key来查询是否已经加锁。
  • cleanAllLocks:清除所有的锁。对于锁我们可以在本地实现,也可以通过redis或者mysql来帮助我们实现。官方默认提供了本地全局锁的实现:
代码语言:javascript
复制
public class FileLocker extends AbstractLocker {

    private static final int BUCKET_PER_TABLE = 128;

    private static final ConcurrentMap<String/* resourceId */, ConcurrentMap<String/* tableName */,
        ConcurrentMap<Integer/* bucketId */, BucketLockMap>>>
        LOCK_MAP = new ConcurrentHashMap<>();

在本地锁的实现中有两个常量需要关注:

  • BUCKET_PER_TABLE:用来定义每个table有多少个bucket,目的是为了后续对同一个表加锁的时候减少竞争。
  • LOCK_MAP:这个map从定义上来看非常复杂,里里外外套了很多层Map,这里用个表格具体说明一下:

可以看见实际上的加锁在bucketLockMap这个map中,这里具体的加锁方法比较简单就不作详细阐述,主要是逐步的找到bucketLockMap,然后将当前trascationId塞进去,如果这个主键当前有TranscationId,那么比较是否是自己,如果不是则加锁失败。

RPC

seata RPC通信基层基于netty来保证高性能,采用默认的配置netty线程池模型处理流程如下:

如果采用默认的基本配置那么会有一个Acceptor线程用于处理客户端的链接,会有cpu*2数量的NIO-Thread,在这个线程中不会做业务太重的事情,只会做一些速度比较快的事情,比如编解码,心跳事件,和TM注册。一些比较费时间的业务操作将会交给业务线程池,默认情况下业务线程池配置为最小线程为100,最大为500。

seata心跳是通过netty的IdleStateHandler来完成的,在Sever端对于写没有设置最大空闲时间,对于读设置了最大空闲时间,默认为15s(客户端默认写空闲为5s,发送ping消息),如果超过15s则会将链接断开,关闭资源。

在TC server侧的netty处理流程中,接收到数据首先进行解码,按照seata定义的固定协议格式进行,会将数据解码成 RpcMessage 消息,代码如下:

代码语言:javascript
复制
public class RpcMessage {
    private int id;
    private byte messageType;
    private byte codec;
    private byte compressor;
    private Map<String, String> headMap = new HashMap<>();
    private Object body;

后续seata的各种处理器的处理流程都是基于 RpcMessage 消息来的。

Coordinator Core

Coordinator Core的实现为DefaultCoordinator,其实TC中重要的协调类,负责分布式事务生命周期中的各种操作管理工作,其初始化代码如下:

代码语言:javascript
复制
// main方法中
DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
coordinator.init();
nettyRemotingServer.setHandler(coordinator);

// 初始化方法
public void init() {
    retryRollbacking.scheduleAtFixedRate(() -> {
        // xxx
    }, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    retryCommitting.scheduleAtFixedRate(() -> {
        // xxx
    }, 0, COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    asyncCommitting.scheduleAtFixedRate(() -> {
        // xxx
    }, 0, ASYNC_COMMITTING_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    timeoutCheck.scheduleAtFixedRate(() -> {
        // xxx
    }, 0, TIMEOUT_RETRY_PERIOD, TimeUnit.MILLISECONDS);

    undoLogDelete.scheduleAtFixedRate(() -> {
        // xxx
    }, UNDO_LOG_DELAY_DELETE_PERIOD, UNDO_LOG_DELETE_PERIOD, TimeUnit.MILLISECONDS);
}

其内部会初始化几种线程池来驱动分布式事务操作的进行,比如undoLog清理、commit/rollback重试等等。关于Coordinator这块业务逻辑较多,后续会专门写这块内容,本文就不在赘述了。

参考资料:

  1. http://seata.io/zh-cn/blog/seata-analysis-java-server.html
  2. https://blog.csdn.net/qq_26323323/article/details/89814410
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-01-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 TopCoder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • TC整体设计
  • Discovery
  • Config
  • Store
  • Lock
  • RPC
  • Coordinator Core
相关产品与服务
云数据库 Redis
腾讯云数据库 Redis(TencentDB for Redis)是腾讯云打造的兼容 Redis 协议的缓存和存储服务。丰富的数据结构能帮助您完成不同类型的业务场景开发。支持主从热备,提供自动容灾切换、数据备份、故障迁移、实例监控、在线扩容、数据回档等全套的数据库服务。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档