在上一篇博客 《Spring Cloud Alibaba 系列之 Seata 介绍》 介绍了 Seata,在这一篇中我们来看看怎么使用。以一个用户购买商品的微服务示例开始,整个业务逻辑由3个微服务提供支持:仓储服务:对给定的商品扣除仓储数量。订单服务:根据采购需求创建订单。帐户服务:从用户帐户中扣除余额。
每一个服务都对应自己的数据库,这里使用 JPA 自动创建表,另外每个库都需要单独的回滚日志表,建表语句如下。
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.3.4.RELEASE</spring-boot.version>
<spring-cloud-alibaba.version>2.2.2.RELEASE</spring-cloud-alibaba.version>
<spring-cloud.version>Hoxton.SR8</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
除了添加 application.yml 以外还需要添加 file.conf 以及 registry.conf 两个文件
# application.yml
server:
port: 8081
spring:
application:
name: order-server
cloud:
alibaba:
seata:
# 自定义事务组名称需要与 seata-server 中的对应
tx-service-group: my_seata_tx_group
nacos:
discovery:
# 服务端地址
server-addr: 127.0.0.1:8848
datasource:
# mysql 驱动 6.0 以上使用如下配置
driverClassName: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/order?serverTimezone=UTC
username: root
password: root
jpa:
database: MySQL
show-sql: true
generate-ddl: true
hibernate:
ddl-auto: update # 没有表创建表,有表更新表
naming:
physical-strategy: org.springframework.boot.orm.jpa.hibernate.SpringPhysicalNamingStrategy # 命名策略
# file.conf
transport {
# tcp udt unix-domain-socket
type = "TCP"
#NIO NATIVE
server = "NIO"
#enable heartbeat
heartbeat = true
# the client batch send request enable
enableClientBatchSendRequest = true
#thread factory for netty
threadFactory {
bossThreadPrefix = "NettyBoss"
workerThreadPrefix = "NettyServerNIOWorker"
serverExecutorThread-prefix = "NettyServerBizHandler"
shareBossWorker = false
clientSelectorThreadPrefix = "NettyClientSelector"
clientSelectorThreadSize = 1
clientWorkerThreadPrefix = "NettyClientWorkerThread"
# netty boss thread size,will not be used for UDT
bossThreadSize = 1
#auto default pin or 8
workerThreadSize = "default"
}
shutdown {
# when destroy server, wait seconds
wait = 3
}
serialization = "seata"
compressor = "none"
}
service {
#transaction service group mapping
#
# 改成对应的, 与添加到 nacos 的保持一致
#
vgroupMapping.my_seata_tx_group = "default"
#only support when registry.type=file, please don't set multiple addresses
default.grouplist = "127.0.0.1:8091"
#degrade, current not support
enableDegrade = false
#disable seata
disableGlobalTransaction = false
}
client {
rm {
asyncCommitBufferLimit = 10000
lock {
retryInterval = 10
retryTimes = 30
retryPolicyBranchRollbackOnConflict = true
}
reportRetryCount = 5
tableMetaCheckEnable = false
reportSuccessEnable = false
sagaBranchRegisterEnable = false
}
tm {
commitRetryCount = 5
rollbackRetryCount = 5
degradeCheck = false
degradeCheckPeriod = 2000
degradeCheckAllowTimes = 10
}
undo {
dataValidation = true
onlyCareUpdateColumns = true
logSerialization = "jackson"
logTable = "undo_log"
}
log {
exceptionRate = 100
}
}
# registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
loadBalance = "RandomLoadBalance"
loadBalanceVirtualNodes = 10
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
eureka {
serviceUrl = "http://localhost:8761/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
}
consul {
serverAddr = "127.0.0.1:8500"
}
apollo {
appId = "seata-server"
apolloMeta = "http://192.168.1.204:8801"
namespace = "application"
apolloAccesskeySecret = ""
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/3
* @description 订单实体类
*/
@Data
public class Order implements Serializable {
private static final long SerialVersionUID = 1L;
@Id
@GeneratedValue(strategy= GenerationType.IDENTITY)
private Long id;
// 商品 id
private Long productId;
// 商品价格
private Integer productPrice;
// 商品数量
private Integer productNum;
// 是否付款
private Boolean flag;
}
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/3
* @description
*/
public interface OrderDao extends JpaRepository<Orders, Long>, JpaSpecificationExecutor<Orders> {
}
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/3
* @description OrderService 接口
*/
public interface OrderService {
// 创建订单
void create(Orders orders);
}
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/3
* @description OrderService 实现类
*/
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderDao orderDao;
@Resource
private AccountService accountService;
@Resource
private StorageService storageService;
@Override
public void create(Orders orders) {
log.info("###### 开始创建订单 ######");
orders.setFlag(false);
Orders save = orderDao.save(orders);
log.info("###### 结束创建订单 ######");
log.info("###### 开始减库存 ######");
storageService.deduct(save.getProductId(), save.getProductNum());
log.info("###### 结束减库存 ######");
log.info("###### 开始减余额 ######");
accountService.debit(save.getUserId(), save.getProductPrice());
log.info("###### 结束减余额 ######");
log.info("###### 开始修改订单状态 ######");
save.setFlag(true);
orderDao.save(save);
log.info("###### 结束修改订单状态 ######");
}
}
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/3
* @description account-server 远程调用
*/
@Component
@FeignClient("account-server")
public interface AccountService {
// 扣减余额
@PostMapping("/account/debit")
void debit(@RequestParam("userId") Long userId, @RequestParam("money") Integer money);
}
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/3
* @description storage-server 远程调用
*/
@Component
@FeignClient("storage-server")
public interface StorageService {
// 扣减库存
@PostMapping("/storage/deduct")
void deduct(@RequestParam("id") Long id, @RequestParam("count") Integer count);
}
/**
* Created with IntelliJ IDEA.
*
* @author Demo_Null
* @date 2020/12/3
* @description
*/
@RestController
@RequestMapping("/order")
public class OrderController {
@Autowired
private OrderService orderService;
@GetMapping("/create")
public String create(Orders orders) {
orderService.create(orders);
return "创建成功";
}
}
/**
* Created with IntelliJ IDEA.
*
* @author gaohu9712@163.com
* @date 2020/12/3
* @description
*/
@Configuration
public class DataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
return new DruidDataSource();
}
/**
* 需要将 DataSourceProxy 设置为主数据源,否则事务无法回滚
*
* @param druidDataSource The DruidDataSource
* @return The default datasource
*/
@Primary
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
// 如果使用 MyBatis 还需要额外注入 org.apache.ibatis.session.SqlSessionFactory
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(dataSourceProxy);
sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());
return sqlSessionFactoryBean.getObject();
}
}
博客中只构建 order 模块,另外两个模块构建方式类似,请自行构建或下载源码 ☞ GitHub
当订单创建成功后,准备扣减库存时发生错误,若此时没有分布式事务可能会造成订单创建了,库存没有减的问题,此类问题比比皆是。如不使用分布式事务简直就是一场灾难,当然你也可以只使用一个库。
我们只需要使用一个 @GlobalTransactional 注解在业务方法上就可以实现分布式事务。
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderDao orderDao;
@Resource
private AccountService accountService;
@Resource
private StorageService storageService;
@Override
@GlobalTransactional
public void create(Orders orders) {
log.info("###### 开始创建订单 ######");
orders.setFlag(false);
Orders save = orderDao.save(orders);
log.info("###### 结束创建订单 ######");
log.info("###### 开始减库存 ######");
storageService.deduct(save.getProductId(), save.getProductNum());
log.info("###### 结束减库存 ######");
log.info("###### 开始减余额 ######");
accountService.debit(save.getUserId(), save.getProductPrice());
log.info("###### 结束减余额 ######");
log.info("###### 开始修改订单状态 ######");
save.setFlag(true);
orderDao.save(save);
log.info("###### 结束修改订单状态 ######");
}
}