普通事务管理的过程
外部(全局)事务-JTA
JTA事务管理的过程
使用应用服务器
不使用应用服务器(一般使用的是Atomikos)
XA与JTA
XA规范的JAVA实现-JTA
上图中JTA是事务管理器在Java中的实现,它的全称为Java Transaction API.XAResource是Java中对Resource规范的实现。
JTA
我们来看一下TransactionManager接口
public interface TransactionManager {
/**
* 开启一个事务
*
*/
public void begin() throws NotSupportedException, SystemException;
/**
* 提交一个事务
*
*/
public void commit() throws RollbackException,
HeuristicMixedException, HeuristicRollbackException, SecurityException,
IllegalStateException, SystemException;
/**
* 获取事务状态
*
*/
public int getStatus() throws SystemException;
/**
* 开启一个事务
*
*/
public Transaction getTransaction() throws SystemException;
/**
* 继续挂起的事务
*/
public void resume(Transaction tobj)
throws InvalidTransactionException, IllegalStateException,
SystemException;
/**
* 回滚
*
*/
public void rollback() throws IllegalStateException, SecurityException,
SystemException;
/**
* 设置回滚只读
*
*/
public void setRollbackOnly() throws IllegalStateException, SystemException;
/**
* 设置事务的超时时间
*
*/
public void setTransactionTimeout(int seconds) throws SystemException;
/**
* 挂起一个事务
*
*/
public Transaction suspend() throws SystemException;
}
XAResource接口
public interface XAResource {
int TMENDRSCAN = 8388608;
int TMFAIL = 536870912;
int TMJOIN = 2097152;
int TMNOFLAGS = 0;
int TMONEPHASE = 1073741824;
int TMRESUME = 134217728;
int TMSTARTRSCAN = 16777216;
int TMSUCCESS = 67108864;
int TMSUSPEND = 33554432;
int XA_RDONLY = 3;
int XA_OK = 0;
//控制某个id的事务进行第几阶段的提交
void commit(Xid var1, boolean var2) throws XAException;
void end(Xid var1, int var2) throws XAException;
void forget(Xid var1) throws XAException;
//获取事务的超时时间
int getTransactionTimeout() throws XAException;
//是否在同一个ResourceManager里面呢
boolean isSameRM(XAResource var1) throws XAException;
//准备一个全局的事务
int prepare(Xid var1) throws XAException;
//恢复一个全局事务
Xid[] recover(int var1) throws XAException;
void rollback(Xid var1) throws XAException;
boolean setTransactionTimeout(int var1) throws XAException;
void start(Xid var1, int var2) throws XAException;
}
XID接口
public interface Xid {
int MAXGTRIDSIZE = 64;
int MAXBQUALSIZE = 64;
int getFormatId();
byte[] getGlobalTransactionId();
byte[] getBranchQualifier();
}
JTA事务管理的弊端
现在我们用一个样例来说明JTA事务管理,我们先在不同的数据库中添加两张表
pom
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>8.0.11</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.29</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
配置文件
logging:
level:
root: info
com.guanjiann: debug
file: logs/${spring.application.name}.log
server:
port: 8080
spring:
application:
name: Twocommit
datasource:
test1:
driver-class-name: com.mysql.cj.jdbc.Driver
jdbcurl: jdbc:mysql://xxx.xxx.xxx.xxx:3306/cloud_resource?useSSL=FALSE&serverTimezone=GMT%2B8
username: root
password: abcd123
type: com.alibaba.druid.pool.DruidDataSource
filters: stat
maxActive: 20
initialSize: 1
maxWait: 60000
minIdle: 1
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: select 'x'
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxOpenPreparedStatements: 20
minPoolSize: 3
maxPoolSize: 25
maxLifetime: 20000
borrowConnectionTimeout: 30
loginTimeout: 30
maintenanceInterval: 60
maxIdleTime: 60
test2:
driver-class-name: com.mysql.cj.jdbc.Driver
jdbcurl: jdbc:mysql://xxx.xxx.xxx.xxx:3306/cloud_resource_base?useSSL=FALSE&serverTimezone=GMT%2B8
username: root
password: abcd123
type: com.alibaba.druid.pool.DruidDataSource
filters: stat
maxActive: 20
initialSize: 1
maxWait: 60000
minIdle: 1
timeBetweenEvictionRunsMillis: 60000
minEvictableIdleTimeMillis: 300000
validationQuery: select 'x'
testWhileIdle: true
testOnBorrow: false
testOnReturn: false
poolPreparedStatements: true
maxOpenPreparedStatements: 20
minPoolSize: 3
maxPoolSize: 25
maxLifetime: 20000
borrowConnectionTimeout: 30
loginTimeout: 30
maintenanceInterval: 60
maxIdleTime: 60
mybatis:
type-aliases-package: com.guanjian.twocommit.domain
mapper-locations: classpath:/mybatis-mappers/*
configuration:
mapUnderscoreToCamelCase: true
SpringBoot启动类
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class}, scanBasePackages = {"com.guanjian.twocommit"})
@EnableTransactionManagement
@EnableConfigurationProperties(value = {DBConfig1.class, DBConfig2.class})
public class TwocommitApplication extends SpringBootServletInitializer {
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(TwocommitApplication.class);
}
public static void main(String[] args) {
SpringApplication.run(TwocommitApplication.class, args);
}
}
配置文件读取类
@Data
@ConfigurationProperties(prefix = "spring.datasource.test1")
public class DBConfig1 {
private String jdbcurl;
private String username;
private String password;
private int minPoolSize;
private int maxPoolSize;
private int maxLifetime;
private int borrowConnectionTimeout;
private int loginTimeout;
private int maintenanceInterval;
private int maxIdleTime;
private String testQuery;
}
@Data
@ConfigurationProperties(prefix = "spring.datasource.test2")
public class DBConfig2 {
private String jdbcurl;
private String username;
private String password;
private int minPoolSize;
private int maxPoolSize;
private int maxLifetime;
private int borrowConnectionTimeout;
private int loginTimeout;
private int maintenanceInterval;
private int maxIdleTime;
private String testQuery;
}
Mybatis整合atomikos全局事务管理类
@Configuration
@MapperScan(basePackages = "com.guanjian.twocommit.dao", sqlSessionTemplateRef = "test1SqlSessionTemplate")
public class MyBatisConfig1 {
@Bean(name = "test1DataSource") //test1DataSource
public DataSource testDataSource(DBConfig1 testConfig) throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
//mysqlXaDataSource.setUrl(testConfig.getUrl());
mysqlXaDataSource.setUrl(testConfig.getJdbcurl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(testConfig.getPassword());
mysqlXaDataSource.setUser(testConfig.getUsername());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
// 将本地事务注册到创 Atomikos全局事务
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("test1DataSource");
xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
xaDataSource.setTestQuery(testConfig.getTestQuery());
return xaDataSource;
}
@Bean(name = "test1SqlSessionFactory")
public SqlSessionFactory testSqlSessionFactory(@Qualifier("test1DataSource") DataSource dataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
return bean.getObject();
}
@Bean(name = "test1SqlSessionTemplate")
public SqlSessionTemplate testSqlSessionTemplate(
@Qualifier("test1SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
@Configuration
@MapperScan(basePackages = "com.guanjian.twocommit.dao2", sqlSessionTemplateRef = "test2SqlSessionTemplate")
public class MyBatisConfig2 {
@Bean(name = "test2DataSource") //test1DataSource
public DataSource testDataSource(DBConfig2 testConfig) throws SQLException {
MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
//mysqlXaDataSource.setUrl(testConfig.getUrl());
mysqlXaDataSource.setUrl(testConfig.getJdbcurl());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
mysqlXaDataSource.setPassword(testConfig.getPassword());
mysqlXaDataSource.setUser(testConfig.getUsername());
mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
// 将本地事务注册到创 Atomikos全局事务
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXaDataSource);
xaDataSource.setUniqueResourceName("test2DataSource");
xaDataSource.setMinPoolSize(testConfig.getMinPoolSize());
xaDataSource.setMaxPoolSize(testConfig.getMaxPoolSize());
xaDataSource.setMaxLifetime(testConfig.getMaxLifetime());
xaDataSource.setBorrowConnectionTimeout(testConfig.getBorrowConnectionTimeout());
xaDataSource.setLoginTimeout(testConfig.getLoginTimeout());
xaDataSource.setMaintenanceInterval(testConfig.getMaintenanceInterval());
xaDataSource.setMaxIdleTime(testConfig.getMaxIdleTime());
xaDataSource.setTestQuery(testConfig.getTestQuery());
return xaDataSource;
}
@Bean(name = "test2SqlSessionFactory")
public SqlSessionFactory testSqlSessionFactory(@Qualifier("test2DataSource") DataSource dataSource)
throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
return bean.getObject();
}
@Bean(name = "test2SqlSessionTemplate")
public SqlSessionTemplate testSqlSessionTemplate(
@Qualifier("test2SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
实体类
@Data
public class User {
private int id;
private String name;
}
@Data
public class Account {
private int id;
private String name;
}
@Data
public class UADTO {
private User user;
private Account account;
}
dao接口
public interface UserDao {
@Select("select * from user where name=#{name}")
User findUserByName(String name);
@Options(useGeneratedKeys = true, keyProperty = "id")
@Insert("insert into user (name) values (#{name})")
int addUser(User user);
}
public interface AccountDao {
@Select("select * from account where name=#{name}")
Account findAccountByName(String name);
@Options(useGeneratedKeys = true, keyProperty = "id")
@Insert("insert into account (name) values (#{name})")
int addAccount(Account account);
}
service接口
public interface UADTOService {
int addUserAndAccount(UADTO uadto);
}
打标签的全局事务实现类
@Service
public class UADTOSerciceImpl implements UADTOService {
@Autowired
private UserDao userDao;
@Autowired
private AccountDao accountDao;
@Override
@Transactional
public int addUserAndAccount(UADTO uadto) {
userDao.addUser(uadto.getUser());
accountDao.addAccount(uadto.getAccount());
return 1;
}
}
代码事务实现类
@Service
@Primary
public class UADTOServiceXAImpl implements UADTOService {
@Autowired
private UserDao userDao;
@Autowired
private AccountDao accountDao;
@Autowired
private PlatformTransactionManager transactionManager;
@Override
public int addUserAndAccount(UADTO uadto) {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
def.setTimeout(15);
TransactionStatus status = transactionManager.getTransaction(def);
try {
userDao.addUser(uadto.getUser());
accountDao.addAccount(uadto.getAccount());
transactionManager.commit(status);
return 1;
}catch (Exception e) {
transactionManager.rollback(status);
throw e;
}
}
}
以上这两个实现类的全局事务是等价的,无论是标签还是代码,其实都使用到了JTA的分布式事务的二阶段提交。
Controller
@RestController
@Slf4j
public class AllController {
@Autowired
private UADTOService uadtoService;
@PostMapping("/addall")
public int addUserAndAccount(@RequestBody UADTO uadto) {
log.info(uadto.toString());
return uadtoService.addUserAndAccount(uadto);
}
}
这里的二阶段提交事务,对不同数据库的表插入要么是同时成功,要么是同时失败的。
TCC模式实现思路
每个需要实现事务的接口,都需要3个接口,分别是:
比方说在上图的下单的操作中,Order服务接收了下单的请求,扣费的时候,Order服务去调用User服务的时候,它去调用User服务的tryCharge()方法。调用完之后就把这一次的调用注册到协调器里面,当我们的下单服务的全部事务完成之后,协调器服务会调用confirmCharge()方法,去完成这个扣费的操作。如果在下单的过程中,出了任何的错误,协调器服务会帮我们去调用User服务的cancelCharge()方法,去把扣费操作去取消。
TCC模式协调器的功能
TCC模式实现分布式事务
现在我们以一个实际业务场景来加以说明
TCC 实现阶段一:Try
在上图的预处理中,那个订单服务先把自己的状态修改为:OrderStatus.UPDATING。这个状态是个没有任何含义的这么一个状态,代表有人正在修改这个状态罢了。库存服务别直接扣减库存,而是冻结掉库存。举个例子,本来你的库存数量是 100,你别直接 100 - 2 = 98,扣减这个库存!你可以把可销售的库存:100 - 2 = 98,设置为 98 没问题,然后在一个单独的冻结库存的字段里,设置一个 2。也就是说,有 2 个库存是给冻结了。
积分服务也是同理,别直接给用户增加会员积分。你可以先在积分表里的一个预增加积分字段加入积分。比如:用户积分原本是 1190,现在要增加 10 个积分,别直接 1190 + 10 = 1200 个积分!你可以保持积分为 1190 不变,在一个预增加字段里,比如说 prepare_add_credit 字段,设置一个 10,表示有 10 个积分准备增加。
仓储服务也是同理啊,你可以先创建一个销售出库单,但是这个销售出库单的状态是“UNKNOWN”。也就是说,刚刚创建这个销售出库单,此时还不确定它的状态是什么。
这个操作,一般都是锁定某个资源,设置一个预备类的状态,冻结部分数据,等等,大概都是这类操作。
TCC 实现阶段二:Confirm
然后就分成两种情况了,第一种情况是比较理想的,那就是各个服务执行自己的那个 Try 操作,都执行成功了!此时,TCC 分布式事务框架会控制进入 TCC 下一个阶段,第一个 C 阶段,也就是 Confirm 阶段。为了实现这个阶段,你需要在各个服务里再加入一些代码。比如说,订单服务里,你可以加入一个 Confirm 的逻辑,就是正式把订单的状态设置为“已支付”了。
库存服务也是类似的,将之前冻结库存字段的 2 个库存扣掉变为 0。这样的话,可销售库存之前就已经变为 98 了,现在冻结的 2 个库存也没了,那就正式完成了库存的扣减。
积分服务也是类似的,就是将预增加字段的 10 个积分扣掉,然后加入实际的会员积分字段中,从 1190 变为 1120。
仓储服务也是类似,将销售出库单的状态正式修改为“已创建”,可以供仓储管理人员查看和使用,而不是停留在之前的中间状态“UNKNOWN”了。
上面各种服务的 Confirm 的逻辑都实现好了,一旦订单服务里面的 TCC 分布式事务框架感知到各个服务的 Try 阶段都成功了以后,就会执行各个服务的 Confirm 逻辑。
TCC 实现阶段三:Cancel
在 Try 阶段,比如积分服务吧,它执行出错了,此时会怎么样?那订单服务内的 TCC 事务框架是可以感知到的,然后它会决定对整个 TCC 分布式事务进行回滚。也就是说,会执行各个服务的第二个 C 阶段,Cancel 阶段。同样,为了实现这个 Cancel 阶段,各个服务还得加一些代码。
首先订单服务,就是可以将订单的状态设置为“CANCELED”,也就是这个订单的状态是已取消。
库存服务也是同理,就是将冻结库存扣减掉 2,加回到可销售库存里去,98 + 2 = 100。
积分服务也需要将预增加积分字段的 10 个积分扣减掉。
仓储服务也需要将销售出库单的状态修改为“CANCELED”设置为已取消。
然后这个时候,订单服务的 TCC 分布式事务框架只要感知到了任何一个服务的 Try 逻辑失败了,就会跟各个服务内的 TCC 分布式事务框架进行通信,然后调用各个服务的 Cancel 逻辑。
总结与思考
先来 Try 一下,不要把业务逻辑完成,先试试看,看各个服务能不能基本正常运转,能不能先冻结我需要的资源。
如果 Try 都 OK,也就是说,底层的数据库、Redis、Elasticsearch、MQ 都是可以写入数据的,并且你保留好了需要使用的一些资源(比如冻结了一部分库存)。
接着,再执行各个服务的 Confirm 逻辑,基本上 Confirm 就可以很大概率保证一个分布式事务的完成了。
那如果 Try 阶段某个服务就失败了,比如说底层的数据库挂了,或者 Redis 挂了,等等。
此时就自动执行各个服务的 Cancel 逻辑,把之前的 Try 逻辑都回滚,所有服务都不要执行任何设计的业务逻辑。保证大家要么一起成功,要么一起失败。
等一等,你有没有想到一个问题?如果有一些意外的情况发生了,比如说订单服务突然挂了,然后再次重启,TCC 分布式事务框架是如何保证之前没执行完的分布式事务继续执行的呢?
所以,TCC 事务框架都是要记录一些分布式事务的活动日志的,可以在磁盘上的日志文件里记录,也可以在数据库里记录。保存下来分布式事务运行的各个阶段和状态。
问题还没完,万一某个服务的 Cancel 或者 Confirm 逻辑执行一直失败怎么办呢?
那也很简单,TCC 事务框架会通过活动日志记录各个服务的状态。举个例子,比如发现某个服务的 Cancel 或者 Confirm 一直没成功,会不停的重试调用它的 Cancel 或者 Confirm 逻辑,务必要它成功!
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
有关于Seata的说明可以参考https://seata.io/zh-cn/docs/overview/what-is-seata.html