前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hmily实现TCC事务控制

Hmily实现TCC事务控制

作者头像
全栈程序员站长
发布2022-08-31 15:57:31
2930
发布2022-08-31 15:57:31
举报

大家好,又见面了,我是你们的朋友全栈君。

目标

通过本案例的学习,掌握Hmily实现TCC事务控制的方法,掌握TCC事务控制的思想。

Hmily介绍

Hmily是一个高性能分布式事务TCC开源框架。基于Java语言来开发(JDK1.8),支持Dubbo,Spring Cloud等RPC框架进行分布式事务。它目前支持以下特性:

  • 支持嵌套事务(Nested transaction support).
  • 采用disruptor框架进行事务日志的异步读写,与RPC框架的性能毫无差别。
  • 支持SpringBoot-starter 项目启动,使用简单。
  • RPC框架支持 : dubbo,motan,springcloud。
  • 本地事务存储支持 : redis,mongodb,zookeeper,file,mysql。
  • 事务日志序列化支持 :java,hessian,kryo,protostuff。
  • 采用Aspect AOP 切面思想与Spring无缝集成,天然支持集群。
  • RPC事务恢复,超时异常恢复等。 Hmily利用AOP对参与分布式事务的本地方法与远程方法进行拦截处理,通过多方拦截,事务参与者能透明的调用到另一方的Try、Confirm、Cancel方法;传递事务上下文;并记录事务日志,酌情进行补偿,重试等。 Hmily不需要事务协调服务,但需要提供一个数据库(mysql/mongodb/zookeeper/redis/file)来进行日志存储。 Hmily实现的TCC服务与普通的服务一样,只需要暴露一个接口,也就是它的Try业务。Confirm/Cancel业务逻辑,只是因为全局事务提交/回滚的需要才提供的,因此Confirm/Cancel业务只需要被Hmily TCC事务框架发现即可,不需要被调用它的其他业务服务所感知。

官网介绍:https://dromara.org/website/zh-cn/docs/hmily/index.html

TCC事务控制思想

TCC分为三个阶段:

  1. Try 阶段是做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的Confirm 一起才能真正构成一个完整的业务逻辑。
  2. Confirm 阶段是做确认提交,Try阶段所有分支事务执行成功后开始执行 Confirm。通常情况下,采用TCC则认为 Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。
  3. Cancel 阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。
  4. TM事务管理器 TM事务管理器可以实现为独立的服务,也可以让全局事务发起方充当TM的角色,TM独立出来是为了成为公用组件,是为了考虑系统结构和软件复用。

TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条,用来记录事务上下文,追踪和记录状态,由于Confirm 和cancel失败需进行重试,因此需要实现为幂等,幂等性是指同一个操作无论请求多少次,其结果都相同。

TCC需要注意三种异常处理分别是空回滚、幂等、悬挂:

空回滚:

在没有调用 TCC 资源 Try 方法的情况下,调用了二阶段的 Cancel 方法,Cancel 方法需要识别出这是一个空回滚,然后直接返回成功。

出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,分布式事务进行回滚则会调用二阶段的Cancel方法,从而形成空回滚。

解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回滚;如果没执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务 ID 和分支事务 ID,第一阶段 Try 方法里会插入一条记录,表示一阶段执行了。Cancel 接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。

幂等:

通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求 TCC 的二阶段 Try、Confirm 和 Cancel 接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。

解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。

悬挂:

悬挂就是对于一个分布式事务,其二阶段 Cancel 接口比 Try 接口先执行。

出现原因是在 RPC 调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时 RPC 调用的网络发生拥堵,通常 RPC 调用是有超时时间的,RPC 超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC 请求才到达参与者真正执行,而一个 Try 方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就称为悬挂,即业务资源预留后没法继续处理。

解决思路是如果二阶段执行完成,那一阶段就不能再继续执行。在执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。

案例场景

场景为 A 转账 30 元给 B,A和B账户在不同的服务。 方案1:

账户A

代码语言:javascript
复制
try:
	检查余额是否够30元
	扣减30元
	
confirm:
	空

cancel:
	增加30元

账户B

代码语言:javascript
复制
try:
	增加30元

confirm:
	空

cancel:
	减少30元

方案1说明:

1)账户A,这里的余额就是所谓的业务资源,按照前面提到的原则,在第一阶段需要检查并预留业务资源,因此,我们在扣钱 TCC 资源的 Try 接口里先检查 A 账户余额是否足够,如果足够则扣除 30 元。 Confirm 接口表示正式提交,由于业务资源已经在 Try 接口里扣除掉了,那么在第二阶段的 Confirm 接口里可以什么都不用做。Cancel 接口的执行表示整个事务回滚,账户A回滚则需要把 Try 接口里扣除掉的 30 元还给账户。

2)账号B,在第一阶段 Try 接口里实现给账户B加钱,Cancel 接口的执行表示整个事务回滚,账户B回滚则需要把 Try 接口里加的 30 元再减去。

方案1的问题分析:

1)如果账户A的try没有执行在cancel则就多加了30元。

2)由于try,cancel、confirm都是由单独的线程去调用,且会出现重复调用,所以都需要实现幂等。

3)账号B在try中增加30元,当try执行完成后可能会其它线程给消费了。

4)如果账户B的try没有执行在cancel则就多减了30元。

问题解决:

1)账户A的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。

2)try,cancel、confirm方法实现幂等。

3)账号B在try方法中不允许更新账户金额,在confirm中更新账户金额。

4)账户B的cancel方法需要判断try方法是否执行,正常执行try后方可执行cancel。

优化方案:

账户A

代码语言:javascript
复制
try:
	try幂等校验
	try悬挂处理
	检查余额是否够30元
	扣减30元

confirm:
	空

cancel:
	cancel幂等校验
	cancel空回滚处理
	增加可用余额30元

账户B

代码语言:javascript
复制
try:
	空
confirm:
	confirm幂等校验
	正式增加30元
cancel:
	空 

案例实现

本实例通过Hmily实现TCC分布式事务,模拟两个账户的转账交易过程。

两个账户分别在不同的银行(张三在bank1、李四在bank2),bank1、bank2是两个微服务。交易过程是,张三给李四转账指定金额。 上述交易步骤,要么一起成功,要么一起失败,必须是一个整体性的事务。

在这里插入图片描述
在这里插入图片描述

程序组成部分

数据库:MySQL-5.7.25

JDK:64位 jdk1.8.0_201

微服务:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE

Hmily:hmily-springcloud.2.0.4-RELEASE

微服务及数据库的关系 :

代码语言:javascript
复制
dtx/dtx-tcc-demo/dtx-tcc-demo-bank1  银行1,操作张三账户, 连接数据库bank1

dtx/dtx-tcc-demo/dtx-tcc-demo-bank2   银行2,操作李四账户,连接数据库bank2

服务注册中心:dtx/discover-server

创建数据库

导入数据库脚本:sql\bank1.sql、sql\bank2.sql、已经导过不用重复导入。 代码:https://github.com/pbteach/pbdtx/tree/master/sql

创建hmily数据库,用于存储hmily框架记录的数据。

代码语言:javascript
复制
CREATE DATABASE `hmily` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

创建bank1库,并导入以下表结构和数据(包含张三账户)

代码语言:javascript
复制
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
  `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
  `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',
  `account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);

创建bank2库,并导入以下表结构和数据(包含李四账户)

代码语言:javascript
复制
CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';

CREATE TABLE `account_info`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户主姓名',
  `account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行卡号',
  `account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帐户密码',
  `account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的账户', '2', NULL, 0);

每个数据库都创建try、confirm、cancel三张日志表:

代码语言:javascript
复制
CREATE TABLE `local_try_log` (
  `tx_no` varchar(64) NOT NULL COMMENT '事务id',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE `local_confirm_log` (
  `tx_no` varchar(64) NOT NULL COMMENT '事务id',
  `create_time` datetime DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE `local_cancel_log` (
  `tx_no` varchar(64) NOT NULL COMMENT '事务id',
  `create_time` datetime DEFAULT NULL,
  PRIMARY KEY (`tx_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

案例工程

代码:https://github.com/pbteach/pbdtx/tree/master/dtx-tcc-demo 两个测试工程如下:

代码语言:javascript
复制
dtx/dtx-tcc-demo/dtx-tcc-demo-bank1  银行1,操作张三账户,连接数据库bank1

dtx/dtx-tcc-demo/dtx-tcc-demo-bank2   银行2,操作李四账户,连接数据库bank2
dtx-tcc-demo-bank1实现try和cancel方法
代码语言:javascript
复制
try:
	try幂等校验
	try悬挂处理
	检查余额是够扣减金额
	扣减金额

confirm:
	空

cancel:
	cancel幂等校验
	cancel空回滚处理
	增加可用余额

1)Dao

代码语言:javascript
复制
@Mapper
@Component
public interface AccountInfoDao {
    @Update("update account_info set account_balance=account_balance - #{amount} where account_balance>#{amount} and account_no=#{accountNo} ")
    int subtractAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
    @Update("update account_info set account_balance=account_balance + #{amount} where account_no=#{accountNo} ")
    int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

    /**
     * 增加某分支事务try执行记录
     * @param localTradeNo 本地事务编号
     * @return
     */
    @Insert("insert into local_try_log values(#{txNo},now());")
    int addTry(String localTradeNo);

    @Insert("insert into local_confirm_log values(#{txNo},now());")
    int addConfirm(String localTradeNo);

    @Insert("insert into local_cancel_log values(#{txNo},now());")
    int addCancel(String localTradeNo);

    /**
     * 查询分支事务try是否已执行
     * @param localTradeNo 本地事务编号
     * @return
     */
    @Select("select count(1) from local_try_log where tx_no = #{txNo} ")
    int isExistTry(String localTradeNo);
    /**
     * 查询分支事务confirm是否已执行
     * @param localTradeNo 本地事务编号
     * @return
     */
    @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
    int isExistConfirm(String localTradeNo);

    /**
     * 查询分支事务cancel是否已执行
     * @param localTradeNo 本地事务编号
     * @return
     */
    @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
    int isExistCancel(String localTradeNo);

}

2)try和cancel方法

代码语言:javascript
复制
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
   private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);


   @Autowired
   private AccountInfoDao accountInfoDao;

   @Autowired
   private Bank2Client bank2Client;

   @Override
   @Transactional
   @Hmily(confirmMethod = "commit", cancelMethod = "rollback")
   public  void updateAccountBalance(String accountNo, Double amount) {
      //事务id
      String transId = HmilyTransactionContextLocal.getInstance().get().getTransId();
      log.info("******** Bank1 Service  begin try...  "+transId );
      int existTry = accountInfoDao.isExistTry(transId);
      //try幂等校验
      if(existTry>0){
         log.info("******** Bank1 Service 已经执行try,无需重复执行,事务id:{}  "+transId );
         return ;
      }
      //try悬挂处理
      if(accountInfoDao.isExistCancel(transId)>0 || accountInfoDao.isExistConfirm(transId)>0){
         log.info("******** Bank1 Service 已经执行confirm或cancel,悬挂处理,事务id:{}  "+transId );
         return ;
      }
      //从账户扣减
      if(accountInfoDao.subtractAccountBalance(accountNo ,amount )<=0){
         //扣减失败
         throw new HmilyRuntimeException("bank1 exception,扣减失败,事务id:{}"+transId);
      }
      //增加本地事务try成功记录,用于幂等性控制标识
      accountInfoDao.addTry(transId);

      //远程调用bank2
      if(!bank2Client.test2(amount,transId)){
         throw new HmilyRuntimeException("bank2Client exception,事务id:{}"+transId);
      }
      if(amount==10){//异常一定要抛在Hmily里面
         throw new RuntimeException("bank1 make exception  10");
      }
      log.info("******** Bank1 Service  end try...  "+transId );
   }


   @Transactional
   public  void commit( String accountNo, double amount) {
      String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();
      logger.info("******** Bank1 Service begin commit..."+localTradeNo );
   }
   @Transactional
   public void rollback( String accountNo, double amount) {
      String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();
      log.info("******** Bank1 Service begin rollback...  " +localTradeNo);
      if(accountInfoDao.isExistTry(localTradeNo) == 0){ //空回滚处理,try阶段没有执行什么也不用做
         log.info("******** Bank1 try阶段失败... 无需rollback "+localTradeNo );
         return;
      }
      if(accountInfoDao.isExistCancel(localTradeNo) > 0){ //幂等性校验,已经执行过了,什么也不用做
         log.info("******** Bank1 已经执行过rollback... 无需再次rollback " +localTradeNo);
         return;
      }
      //再将金额加回账户
      accountInfoDao.addAccountBalance(accountNo,amount);
      //添加cancel日志,用于幂等性控制标识
      accountInfoDao.addCancel(localTradeNo);
      log.info("******** Bank1 Service end rollback...  " +localTradeNo);
   }



}

3)feignClient

代码语言:javascript
复制
@FeignClient(value = "seata-demo-bank2", fallback = Bank2Fallback.class)
public interface Bank2Client {

    @GetMapping("/bank2/transfer")
    @Hmily
    Boolean transfer(@RequestParam("amount") Double amount);
}
dtx-tcc-demo-bank2实现如下功能
代码语言:javascript
复制
try:
	空
confirm:
	confirm幂等校验
	正式增加金额
cancel:
	空 

1)Dao

代码语言:javascript
复制
@Component
@Mapper
public interface AccountInfoDao {

    @Update("update account_info set account_balance=account_balance + #{amount} where  account_no=#{accountNo} ")
    int addAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);


    /**
     * 增加某分支事务try执行记录
     * @param localTradeNo 本地事务编号
     * @return
     */
    @Insert("insert into local_try_log values(#{txNo},now());")
    int addTry(String localTradeNo);

    @Insert("insert into local_confirm_log values(#{txNo},now());")
    int addConfirm(String localTradeNo);

    @Insert("insert into local_cancel_log values(#{txNo},now());")
    int addCancel(String localTradeNo);

    /**
     * 查询分支事务try是否已执行
     * @param localTradeNo 本地事务编号
     * @return
     */
    @Select("select count(1) from local_try_log where tx_no = #{txNo} ")
    int isExistTry(String localTradeNo);
    /**
     * 查询分支事务confirm是否已执行
     * @param localTradeNo 本地事务编号
     * @return
     */
    @Select("select count(1) from local_confirm_log where tx_no = #{txNo} ")
    int isExistConfirm(String localTradeNo);

    /**
     * 查询分支事务cancel是否已执行
     * @param localTradeNo 本地事务编号
     * @return
     */
    @Select("select count(1) from local_cancel_log where tx_no = #{txNo} ")
    int isExistCancel(String localTradeNo);

}

2)实现confirm方法

代码语言:javascript
复制
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {

   @Autowired
   private AccountInfoDao accountInfoDao;

   @Override
   @Transactional
   @Hmily(confirmMethod = "confirmMethod", cancelMethod = "cancelMethod")
   public void updateAccountBalance(String accountNo, Double amount) {
      String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();
      log.info("******** Bank2 Service Begin try ..."+localTradeNo);

   }

   @Transactional
   public  void confirmMethod(String accountNo, Double amount) {
      String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();
      log.info("******** Bank2 Service commit...  " +localTradeNo);
      if(accountInfoDao.isExistConfirm(localTradeNo) > 0){ //幂等性校验,已经执行过了,什么也不用做
         log.info("******** Bank2 已经执行过confirm... 无需再次confirm "+localTradeNo );
         return ;
      }
      //正式增加金额
      accountInfoDao.addAccountBalance(accountNo,amount);
      //添加confirm日志
      accountInfoDao.addConfirm(localTradeNo);
   }

   @Transactional
   public  void cancelMethod(String accountNo, Double amount) {
      String localTradeNo = HmilyTransactionContextLocal.getInstance().get().getTransId();
      log.info("******** Bank2 Service begin cancel...  "+localTradeNo );

   }


}

3)Controller

代码语言:javascript
复制
@RestController
public class Bank2Controller {
   @Autowired
    AccountInfoService accountInfoService;
   
   @RequestMapping("/transfer")
    public Boolean test2(@RequestParam("amount") Double amount) {
       this.accountInfoService.updateAccountBalance("2", amount);
        return true;
    }
      
}

测试

  • 张三向李四转账成功。
  • 李四事务失败,张三事务回滚成功。
  • 张三事务失败,李四分支事务回滚成功。
  • 分支事务超时测试。

发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/143181.html原文链接:https://javaforall.cn

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022年5月2,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 目标
  • Hmily介绍
  • TCC事务控制思想
  • 案例场景
  • 案例实现
    • 程序组成部分
      • 创建数据库
        • 案例工程
          • dtx-tcc-demo-bank1实现try和cancel方法
          • dtx-tcc-demo-bank2实现如下功能
        • 测试
        相关产品与服务
        数据库
        云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档