前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式事务 顶

分布式事务 顶

作者头像
算法之名
发布2020-04-24 17:05:35
7120
发布2020-04-24 17:05:35
举报
文章被收录于专栏:算法之名算法之名
  • JTA/XA二段式提交

普通事务管理的过程

  1. do
  2. commit/rollback

外部(全局)事务-JTA

  1. 外部事务管理器提供事务管理
  2. 通过Spring事务接口,调用外部管理器。
  3. 使用JNDI等方式获取外部事务管理器的实例
  4. 外部事务管理器一般由应用服务器提供,如JBOSS,WebLogic,不过我们常用的Tomcat是不提供的。
  5. 外部事务管理器提供JTA事务管理
  6. JTA事务管理器可以管理多个数据资源
  7. 通过2阶段提交实现多数据源的事务。

JTA事务管理的过程

  1. do
  2. prepare / rollback
  3. commit / rollback

使用应用服务器

不使用应用服务器(一般使用的是Atomikos)

XA与JTA

  1. Transaction Manage
  2. XA Resource
  3. 两阶段提交

XA规范的JAVA实现-JTA

上图中JTA是事务管理器在Java中的实现,它的全称为Java Transaction API.XAResource是Java中对Resource规范的实现。

JTA

  1. TransactionManager
  2. XAResouce
  3. XID

我们来看一下TransactionManager接口

代码语言:javascript
复制
public interface TransactionManager {

    /**
     * 开启一个事务
     *
     */
    public void begin() throws NotSupportedException, SystemException;

    /**
     * 提交一个事务
     *
     */
    public void commit() throws RollbackException,
   HeuristicMixedException, HeuristicRollbackException, SecurityException,
   IllegalStateException, SystemException;

    /**
     * 获取事务状态
     *
     */
    public int getStatus() throws SystemException;

    /**
     * 开启一个事务
     *
     */
    public Transaction getTransaction() throws SystemException;

    /**
     * 继续挂起的事务
     */
    public void resume(Transaction tobj)
            throws InvalidTransactionException, IllegalStateException,
            SystemException;

    /**
     * 回滚
     *
     */
    public void rollback() throws IllegalStateException, SecurityException,
                            SystemException;

    /**
     * 设置回滚只读
     *
     */
    public void setRollbackOnly() throws IllegalStateException, SystemException;

    /**
     * 设置事务的超时时间
     *
     */
    public void setTransactionTimeout(int seconds) throws SystemException;

    /**
     * 挂起一个事务
     *
     */
    public Transaction suspend() throws SystemException;
}

XAResource接口

代码语言:javascript
复制
public interface XAResource {
    int TMENDRSCAN = 8388608;
    int TMFAIL = 536870912;
    int TMJOIN = 2097152;
    int TMNOFLAGS = 0;
    int TMONEPHASE = 1073741824;
    int TMRESUME = 134217728;
    int TMSTARTRSCAN = 16777216;
    int TMSUCCESS = 67108864;
    int TMSUSPEND = 33554432;
    int XA_RDONLY = 3;
    int XA_OK = 0;
    //控制某个id的事务进行第几阶段的提交
    void commit(Xid var1, boolean var2) throws XAException;
    
    void end(Xid var1, int var2) throws XAException;

    void forget(Xid var1) throws XAException;
    //获取事务的超时时间
    int getTransactionTimeout() throws XAException;
    //是否在同一个ResourceManager里面呢
    boolean isSameRM(XAResource var1) throws XAException;
    //准备一个全局的事务
    int prepare(Xid var1) throws XAException;
    //恢复一个全局事务
    Xid[] recover(int var1) throws XAException;
    
    void rollback(Xid var1) throws XAException;

    boolean setTransactionTimeout(int var1) throws XAException;

    void start(Xid var1, int var2) throws XAException;
}

XID接口

代码语言:javascript
复制
public interface Xid {
    int MAXGTRIDSIZE = 64;
    int MAXBQUALSIZE = 64;

    int getFormatId();

    byte[] getGlobalTransactionId();

    byte[] getBranchQualifier();
}

JTA事务管理的弊端

  1. 两阶段提交
  2. 事务时间太长、锁数据的时间太长
  3. 低性能、吞吐量低

现在我们用一个样例来说明JTA事务管理,我们先在不同的数据库中添加两张表

pom

代码语言:javascript
复制
<dependencies>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jdbc</artifactId>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
      <groupId>org.mybatis.spring.boot</groupId>
      <artifactId>mybatis-spring-boot-starter</artifactId>
      <version>2.1.1</version>
   </dependency>
   <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <scope>8.0.11</scope>
   </dependency>
   <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>druid</artifactId>
      <version>1.0.29</version>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jta-atomikos</artifactId>
   </dependency>
   <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-configuration-processor</artifactId>
      <optional>true</optional>
   </dependency>
</dependencies>

配置文件

代码语言:javascript
复制
logging:
  level:
    root: info
    com.guanjiann: debug
  file: logs/${spring.application.name}.log
server:
  port: 8080
spring:
  application:
    name: Twocommit
  datasource:
    test1:
      driver-class-name: com.mysql.cj.jdbc.Driver
      jdbcurl: jdbc:mysql://xxx.xxx.xxx.xxx:3306/cloud_resource?useSSL=FALSE&serverTimezone=GMT%2B8
      username: root
      password: abcd123
      type: com.alibaba.druid.pool.DruidDataSource
      filters: stat
      maxActive: 20
      initialSize: 1
      maxWait: 60000
      minIdle: 1
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: select 'x'
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxOpenPreparedStatements: 20
      minPoolSize: 3
      maxPoolSize: 25
      maxLifetime: 20000
      borrowConnectionTimeout: 30
      loginTimeout: 30
      maintenanceInterval: 60
      maxIdleTime: 60
    test2:
      driver-class-name: com.mysql.cj.jdbc.Driver
      jdbcurl: jdbc:mysql://xxx.xxx.xxx.xxx:3306/cloud_resource_base?useSSL=FALSE&serverTimezone=GMT%2B8
      username: root
      password: abcd123
      type: com.alibaba.druid.pool.DruidDataSource
      filters: stat
      maxActive: 20
      initialSize: 1
      maxWait: 60000
      minIdle: 1
      timeBetweenEvictionRunsMillis: 60000
      minEvictableIdleTimeMillis: 300000
      validationQuery: select 'x'
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      poolPreparedStatements: true
      maxOpenPreparedStatements: 20
      minPoolSize: 3
      maxPoolSize: 25
      maxLifetime: 20000
      borrowConnectionTimeout: 30
      loginTimeout: 30
      maintenanceInterval: 60
      maxIdleTime: 60
mybatis:
  type-aliases-package: com.guanjian.twocommit.domain
  mapper-locations: classpath:/mybatis-mappers/*
  configuration:
    mapUnderscoreToCamelCase: true

SpringBoot启动类

代码语言:javascript
复制
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}, scanBasePackages = {"com.guanjian.twocommit"})
@EnableTransactionManagement
@EnableConfigurationProperties(value = {DBConfig1.class, DBConfig2.class})
public class TwocommitApplication extends SpringBootServletInitializer {

   @Override
   protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
      return application.sources(TwocommitApplication.class);
   }

   public static void main(String[] args) {
      SpringApplication.run(TwocommitApplication.class, args);
   }

}

配置文件读取类

代码语言:javascript
复制
@Data
@ConfigurationProperties(prefix = "spring.datasource.test1")
public class DBConfig1 {
    private String jdbcurl;
    private String username;
    private String password;
    private int minPoolSize;
    private int maxPoolSize;
    private int maxLifetime;
    private int borrowConnectionTimeout;
    private int loginTimeout;
    private int maintenanceInterval;
    private int maxIdleTime;
    private String testQuery;
}
代码语言:javascript
复制
@Data
@ConfigurationProperties(prefix = "spring.datasource.test2")
public class DBConfig2 {
    private String jdbcurl;
    private String username;
    private String password;
    private int minPoolSize;
    private int maxPoolSize;
    private int maxLifetime;
    private int borrowConnectionTimeout;
    private int loginTimeout;
    private int maintenanceInterval;
    private int maxIdleTime;
    private String testQuery;
}

Mybatis整合atomikos全局事务管理类

代码语言:javascript
复制
@Configuration
@MapperScan(basePackages = "com.guanjian.twocommit.dao", sqlSessionTemplateRef = "test1SqlSessionTemplate")
public class MyBatisConfig1 {
    @Bean(name = "test1DataSource")  //test1DataSource
    public DataSource testDataSource(DBConfig1 testConfig) throws SQLException {
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        //mysqlXaDataSource.setUrl(testConfig.getUrl());
        mysqlXaDataSource.setUrl(testConfig.getJdbcurl());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(testConfig.getPassword());
        mysqlXaDataSource.setUser(testConfig.getUsername());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

        // 将本地事务注册到创 Atomikos全局事务
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("test1DataSource");

        xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
        xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
        xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
        xaDataSource.setTestQuery(testConfig.getTestQuery());
        return xaDataSource;
    }

    @Bean(name = "test1SqlSessionFactory")
    public SqlSessionFactory testSqlSessionFactory(@Qualifier("test1DataSource") DataSource dataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

    @Bean(name = "test1SqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("test1SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
代码语言:javascript
复制
@Configuration
@MapperScan(basePackages = "com.guanjian.twocommit.dao2", sqlSessionTemplateRef = "test2SqlSessionTemplate")
public class MyBatisConfig2 {
    @Bean(name = "test2DataSource")  //test1DataSource
    public DataSource testDataSource(DBConfig2 testConfig) throws SQLException {
        MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
        //mysqlXaDataSource.setUrl(testConfig.getUrl());
        mysqlXaDataSource.setUrl(testConfig.getJdbcurl());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
        mysqlXaDataSource.setPassword(testConfig.getPassword());
        mysqlXaDataSource.setUser(testConfig.getUsername());
        mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);

        // 将本地事务注册到创 Atomikos全局事务
        AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
        xaDataSource.setXaDataSource(mysqlXaDataSource);
        xaDataSource.setUniqueResourceName("test2DataSource");

        xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
        xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
        xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
        xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
        xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
        xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
        xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
        xaDataSource.setTestQuery(testConfig.getTestQuery());
        return xaDataSource;
    }

    @Bean(name = "test2SqlSessionFactory")
    public SqlSessionFactory testSqlSessionFactory(@Qualifier("test2DataSource") DataSource dataSource)
            throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSource);
        return bean.getObject();
    }

    @Bean(name = "test2SqlSessionTemplate")
    public SqlSessionTemplate testSqlSessionTemplate(
            @Qualifier("test2SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}

实体类

代码语言:javascript
复制
@Data
public class User {
    private int id;
    private String name;
}
代码语言:javascript
复制
@Data
public class Account {
    private int id;
    private String name;
}
代码语言:javascript
复制
@Data
public class UADTO {
    private User user;
    private Account account;
}

dao接口

代码语言:javascript
复制
public interface UserDao {
    @Select("select * from user where name=#{name}")
    User findUserByName(String name);

    @Options(useGeneratedKeys = true, keyProperty = "id")
    @Insert("insert into user (name) values (#{name})")
    int addUser(User user);
}
代码语言:javascript
复制
public interface AccountDao {
    @Select("select * from account where name=#{name}")
    Account findAccountByName(String name);

    @Options(useGeneratedKeys = true, keyProperty = "id")
    @Insert("insert into account (name) values (#{name})")
    int addAccount(Account account);
}

service接口

代码语言:javascript
复制
public interface UADTOService {
    int addUserAndAccount(UADTO uadto);
}

打标签的全局事务实现类

代码语言:javascript
复制
@Service
public class UADTOSerciceImpl implements UADTOService {
    @Autowired
    private UserDao userDao;
    @Autowired
    private AccountDao accountDao;

    @Override
    @Transactional
    public int addUserAndAccount(UADTO uadto) {
        userDao.addUser(uadto.getUser());
        accountDao.addAccount(uadto.getAccount());
        return 1;
    }
}

代码事务实现类

代码语言:javascript
复制
@Service
@Primary
public class UADTOServiceXAImpl implements UADTOService {
    @Autowired
    private UserDao userDao;
    @Autowired
    private AccountDao accountDao;
    @Autowired
    private PlatformTransactionManager transactionManager;

    @Override
    public int addUserAndAccount(UADTO uadto) {
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
        def.setTimeout(15);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            userDao.addUser(uadto.getUser());
            accountDao.addAccount(uadto.getAccount());
            transactionManager.commit(status);
            return 1;
        }catch (Exception e) {
            transactionManager.rollback(status);
            throw e;
        }
    }
}

以上这两个实现类的全局事务是等价的,无论是标签还是代码,其实都使用到了JTA的分布式事务的二阶段提交。

Controller

代码语言:javascript
复制
@RestController
@Slf4j
public class AllController {
    @Autowired
    private UADTOService uadtoService;

    @PostMapping("/addall")
    public int addUserAndAccount(@RequestBody UADTO uadto) {
        log.info(uadto.toString());
        return uadtoService.addUserAndAccount(uadto);
    }
}

这里的二阶段提交事务,对不同数据库的表插入要么是同时成功,要么是同时失败的。

  • TCC模式的事务管理(柔性事务)
  1. Try
  2. Commit(Confirm) / Cancel

TCC模式实现思路

每个需要实现事务的接口,都需要3个接口,分别是:

  1. tryXX():业务检查,预留资源
  2. confirmXX():执行业务、使用资源
  3. cancelXX():回滚业务、释放资源

比方说在上图的下单的操作中,Order服务接收了下单的请求,扣费的时候,Order服务去调用User服务的时候,它去调用User服务的tryCharge()方法。调用完之后就把这一次的调用注册到协调器里面,当我们的下单服务的全部事务完成之后,协调器服务会调用confirmCharge()方法,去完成这个扣费的操作。如果在下单的过程中,出了任何的错误,协调器服务会帮我们去调用User服务的cancelCharge()方法,去把扣费操作去取消。

TCC模式协调器的功能

  1. 接管事务的管理,类似JTA的独立事务管理器(非两阶段提交)
  2. 保存每个资源上的事务记录:跟踪状态、检查超时
  3. 保证每个资源上的事务性
  4. 处理各种错误:超时、重试、网络异常、服务不可用

TCC模式实现分布式事务

  1. 借鉴XA的统一资源管理,又不是两阶段提交
  2. 不同资源之间没有锁,事务过程数据没有锁、没有隔离
  3. 出错时可能多次调用Confirm/Cancel方法、以及顺序无法保证
  4. Confirm/Cancel方法需满足幂等性,即重复调用时结果一致

现在我们以一个实际业务场景来加以说明

TCC 实现阶段一:Try

在上图的预处理中,那个订单服务先把自己的状态修改为:OrderStatus.UPDATING。这个状态是个没有任何含义的这么一个状态,代表有人正在修改这个状态罢了。库存服务别直接扣减库存,而是冻结掉库存。举个例子,本来你的库存数量是 100,你别直接 100 - 2 = 98,扣减这个库存!你可以把可销售的库存:100 - 2 = 98,设置为 98 没问题,然后在一个单独的冻结库存的字段里,设置一个 2。也就是说,有 2 个库存是给冻结了。

积分服务也是同理,别直接给用户增加会员积分。你可以先在积分表里的一个预增加积分字段加入积分。比如:用户积分原本是 1190,现在要增加 10 个积分,别直接 1190 + 10 = 1200 个积分!你可以保持积分为 1190 不变,在一个预增加字段里,比如说 prepare_add_credit 字段,设置一个 10,表示有 10 个积分准备增加。

仓储服务也是同理啊,你可以先创建一个销售出库单,但是这个销售出库单的状态是“UNKNOWN”。也就是说,刚刚创建这个销售出库单,此时还不确定它的状态是什么。

这个操作,一般都是锁定某个资源,设置一个预备类的状态,冻结部分数据,等等,大概都是这类操作。

TCC 实现阶段二:Confirm

然后就分成两种情况了,第一种情况是比较理想的,那就是各个服务执行自己的那个 Try 操作,都执行成功了!此时,TCC 分布式事务框架会控制进入 TCC 下一个阶段,第一个 C 阶段,也就是 Confirm 阶段。为了实现这个阶段,你需要在各个服务里再加入一些代码。比如说,订单服务里,你可以加入一个 Confirm 的逻辑,就是正式把订单的状态设置为“已支付”了。

库存服务也是类似的,将之前冻结库存字段的 2 个库存扣掉变为 0。这样的话,可销售库存之前就已经变为 98 了,现在冻结的 2 个库存也没了,那就正式完成了库存的扣减。

积分服务也是类似的,就是将预增加字段的 10 个积分扣掉,然后加入实际的会员积分字段中,从 1190 变为 1120。

仓储服务也是类似,将销售出库单的状态正式修改为“已创建”,可以供仓储管理人员查看和使用,而不是停留在之前的中间状态“UNKNOWN”了。

上面各种服务的 Confirm 的逻辑都实现好了,一旦订单服务里面的 TCC 分布式事务框架感知到各个服务的 Try 阶段都成功了以后,就会执行各个服务的 Confirm 逻辑。

TCC 实现阶段三:Cancel

在 Try 阶段,比如积分服务吧,它执行出错了,此时会怎么样?那订单服务内的 TCC 事务框架是可以感知到的,然后它会决定对整个 TCC 分布式事务进行回滚。也就是说,会执行各个服务的第二个 C 阶段,Cancel 阶段。同样,为了实现这个 Cancel 阶段,各个服务还得加一些代码。

首先订单服务,就是可以将订单的状态设置为“CANCELED”,也就是这个订单的状态是已取消。

库存服务也是同理,就是将冻结库存扣减掉 2,加回到可销售库存里去,98 + 2 = 100。

积分服务也需要将预增加积分字段的 10 个积分扣减掉。

仓储服务也需要将销售出库单的状态修改为“CANCELED”设置为已取消。

然后这个时候,订单服务的 TCC 分布式事务框架只要感知到了任何一个服务的 Try 逻辑失败了,就会跟各个服务内的 TCC 分布式事务框架进行通信,然后调用各个服务的 Cancel 逻辑。

总结与思考

先来 Try 一下,不要把业务逻辑完成,先试试看,看各个服务能不能基本正常运转,能不能先冻结我需要的资源。

如果 Try 都 OK,也就是说,底层的数据库、Redis、Elasticsearch、MQ 都是可以写入数据的,并且你保留好了需要使用的一些资源(比如冻结了一部分库存)。

接着,再执行各个服务的 Confirm 逻辑,基本上 Confirm 就可以很大概率保证一个分布式事务的完成了。

那如果 Try 阶段某个服务就失败了,比如说底层的数据库挂了,或者 Redis 挂了,等等。

此时就自动执行各个服务的 Cancel 逻辑,把之前的 Try 逻辑都回滚,所有服务都不要执行任何设计的业务逻辑。保证大家要么一起成功,要么一起失败。

等一等,你有没有想到一个问题?如果有一些意外的情况发生了,比如说订单服务突然挂了,然后再次重启,TCC 分布式事务框架是如何保证之前没执行完的分布式事务继续执行的呢?

所以,TCC 事务框架都是要记录一些分布式事务的活动日志的,可以在磁盘上的日志文件里记录,也可以在数据库里记录。保存下来分布式事务运行的各个阶段和状态。

问题还没完,万一某个服务的 Cancel 或者 Confirm 逻辑执行一直失败怎么办呢?

那也很简单,TCC 事务框架会通过活动日志记录各个服务的状态。举个例子,比如发现某个服务的 Cancel 或者 Confirm 一直没成功,会不停的重试调用它的 Cancel 或者 Confirm 逻辑,务必要它成功!

  • 分布式事务框架Seata

Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。

有关于Seata的说明可以参考https://seata.io/zh-cn/docs/overview/what-is-seata.html

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档