前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >分布式事务常用的解决方案,XA,Saga,TCC,MQ补偿

分布式事务常用的解决方案,XA,Saga,TCC,MQ补偿

作者头像
Zeal
发布2020-11-11 15:20:05
1.1K0
发布2020-11-11 15:20:05
举报
文章被收录于专栏:Hyperledger实践

1.MySQL XA事务例子

XA是分布式事务处理的一个规范, open group发起, 被数据库厂商广泛的支持, 具体规范在 http://www.opengroup.org/public/pubs/catalog/c193.htm. MySQL5.x和Connector/J 5.0.0起,InnoDB引擎就开始支持XA了, 来段代码实际些。

package com._51discuss.xa_test; import com.mysql.cj.jdbc.JdbcConnection; import com.mysql.cj.jdbc.MysqlXAConnection; import com.mysql.cj.jdbc.MysqlXid; import javax.sql.XAConnection; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; import java.sql.*; import java.util.UUID; /** * CREATE TABLE `user` ( `user_id` int(10) unsigned NOT NULL AUTO_INCREMENT, `user_name` varchar(32) CHARACTER SET utf8mb4 NOT NULL COMMENT 'User nick name', `user_mobile` char(11) CHARACTER SET utf8mb4 NOT NULL, `user_pwd` varchar(64) COLLATE utf8mb4_bin NOT NULL, `user_create_time` datetime(3) NOT NULL, PRIMARY KEY (`user_id`), UNIQUE KEY `user_mobile_UNIQUE` (`user_mobile`) ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; * @author Zeal */ public class LocalXaTest { public LocalXaTest() { try { // The newInstance() call is a work around for some // broken Java implementations Class.forName("com.mysql.cj.jdbc.Driver").newInstance(); } catch (Exception ex) { // handle the error throw new IllegalStateException(ex.toString(), ex); } } public void multiInserts() throws Exception { String jdbcUrl = "jdbc:mysql://localhost/test?serverTimezone=GMT%2B8&autoReconnect=true&characterEncoding=utf8&zeroDateTimeBehavior=CONVERT_TO_NULL&allowMultiQueries=true"; String jdbcUser = "root"; String jdbcPassword = null; XAConnection connection1 = null; XAConnection connection2 = null; try { connection1 = getConnection(jdbcUrl, jdbcUser, jdbcPassword, true); connection2 = getConnection(jdbcUrl, jdbcUser, jdbcPassword, true); XAResource resource1 = connection1.getXAResource(); XAResource resource2 = connection2.getXAResource(); final byte[] gtrid = UUID.randomUUID().toString().getBytes(); final int formatId = 1; Xid xid1 = null; Xid xid2 = null; try { //====================================================================================================== //First time, it will succeed xid1 = insertUser(connection1, gtrid, formatId, "user1", "13555555555", "123456"); //With duplicate user, it should roll back xid2 = insertUser(connection2, gtrid, formatId, "user2", "13555555556", "123456"); //====================================================================================================== //Second time, it will roll back //The second time, it should roll back // xid1 = insertUser(connection1, gtrid, formatId, "user3", "13555555557", "123456"); // //With duplicate user, it should roll back // xid2 = insertUser(connection2, gtrid, formatId, "user1", "13555555556", "123456"); //2PC int prepare1 = resource1.prepare(xid1); int prepare2 = resource2.prepare(xid2); final boolean onePhase = false; if (prepare1 == XAResource.XA_OK && prepare2 == XAResource.XA_OK) { resource1.commit(xid1, onePhase); resource2.commit(xid2, onePhase); } else { resource1.rollback(xid1); resource2.rollback(xid2); } } catch (Exception e) { e.printStackTrace(); if (xid1 != null) { resource1.rollback(xid1); } if (xid2 != null) { resource2.rollback(xid2); } throw e; } } finally { close(connection1).close(connection2); } } private Xid insertUser(XAConnection xaConnection, final byte[] gtrid, final int formatId, String userName, String userMobile, String userPwd) throws Exception { Connection connection = xaConnection.getConnection(); XAResource resource = xaConnection.getXAResource(); String sql = "INSERT INTO `user`(`user_name`,`user_mobile`,`user_pwd`,`user_create_time`) VALUES(?,?,?,?)"; byte[] bqual = UUID.randomUUID().toString().getBytes(); MysqlXid xid = new MysqlXid(gtrid, bqual, formatId); resource.start(xid, XAResource.TMNOFLAGS); try (PreparedStatement psm = connection.prepareStatement(sql)) { psm.setString(1, userName); psm.setString(2, userMobile); psm.setString(3, userPwd); psm.setTimestamp(4, new Timestamp(System.currentTimeMillis())); psm.executeUpdate(); resource.end(xid, XAResource.TMSUCCESS); return xid; } } private XAConnection getConnection(String jdbcUrl, String jdbcUser, String jdbcPassword, boolean logXaCommand) throws SQLException { Connection connection = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); return new MysqlXAConnection((JdbcConnection) connection, logXaCommand); } private LocalXaTest close(XAConnection closeable) { if (closeable != null) { try { closeable.close(); } catch (Exception e) { e.printStackTrace(); } } return this; } public static void main(String[] args) throws Exception { LocalXaTest test = new LocalXaTest(); test.multiInserts(); } }

第一轮插入user1,user2成功, 第二轮插入user3,user1会因为user1主键重复整体回滚。这里偷懒了,最好能连不同的数据库去执行。而这里XA就是经典的2PC(Two-Phase Commit两段提交)。按照MySQL官方的话说:

The MySQL implementation of XA enables a MySQL server to act as a Resource Manager that handles XA transactions within a global transaction. A client program that connects to the MySQL server acts as the Transaction Manager.

在微服务等场景, 每个服务管理自己的数据库, 调用客户端就用不到XAResource了, 就得用其它方案了。而实际XA用得不多,性能通常不大好,数据库实现也有差异甚至限制,实际上可能会使用更多的柔性事务。

2. JTA/XA实现,Atomikos与Bitronix

Spring boot官方推荐的JTA解决分布式事务。Atomikos其实是先驱级别的方案, TransactionEssentials是开源版, 支持JTA/XA, JDBC, JMS。而商用版ExtremeTransactions则首创的支持TCC(TRYING、CONFIRMING、CANCELIING), 概念上和XA和DTP模型有些相像, 把资源层事务转移到业务层去补偿, 全局事务下的每个小事务就类似要实现try, confirm, cancel的业务接口了, 对业务实现要求就提高了。

Bitronix和开源版的Atomikos类似, 仅支持JTA/XA, 配置上一般是代理若干个数据源,实现JtaTransactionManger, 接口方法加个指定的@Transactional,用起来挺方便, 只是在服务化的场景使用有限, 细节就跳过了。

3.柔性事务Saga

Saga是分布式事务模式, 实现最终一致性。Saga包含多个子事务和对应子事务的补偿动作, 相比TCC则是少了try/prepare 准备动作, 不同的场景这样有好有坏。Saga一个出名些的实现是Apache ServiceComb, 主要是华为系的开源的全栈微服务解决方案,感觉蛮不错的, 新版本也开始支持TCC了,可以多了解。还有一个是jdon提到的Eventuate Tram Saga框架。

4.柔性事务TCC

TCC有蛮多实现Atomikos ExtremeTransactions, ByteTCC, EasyTransaction,spring-cloud-rest-tcc,TCC-transaction,TXLCN还有阿里的Seata, 我们就简单的看下大厂的Seata, 直接套用官方例子。https://github.com/seata/seata/wiki/Quick-Start, 看下常用的spring boot, mybatis例子, https://github.com/seata/seata-samples/tree/master/springboot-mybatis

4.1 注意: 每个服务的数据库都需要增加undo日志表

代码语言:javascript
复制
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;


4.2 需要启动seata-server, 即作为Transcation Coordinate/TC事务协调者

代码语言:javascript
复制
Usage: sh seata-server.sh(for linux and mac) or cmd seata-server.bat(for windows) [options]
  Options:
    --host, -h
      The host to bind.
      Default: 0.0.0.0
    --port, -p
      The port to listen.
      Default: 8091
    --storeMode, -m
      log store mode : file、db
      Default: file
    --help

e.g.

sh seata-server.sh -p 8091 -h 127.0.0.1 -m file


4.3 服务调用需要传递全局xid, spring boot mybatis例子是通过web filter获取xid, rest template拦截器调用服务自动增加xid。

代码语言:javascript
复制
package io.seata.samples.common.filter;

import io.seata.core.context.RootContext;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Component;

import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;

@Component
public class SeataFilter implements Filter {
    @Override
    public void init(FilterConfig filterConfig) throws ServletException {
    }

    @Override
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
        HttpServletRequest req = (HttpServletRequest) servletRequest;
        String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());
        boolean isBind = false;
        if (StringUtils.isNotBlank(xid)) {
            RootContext.bind(xid);
            isBind = true;
        }
        try {
            filterChain.doFilter(servletRequest, servletResponse);
        } finally {
            if (isBind) {
                RootContext.unbind();
            }
        }
    }

    @Override
    public void destroy() {
    }
}
代码语言:javascript
复制
package io.seata.samples.common.interceptor;

import io.seata.core.context.RootContext;
import org.apache.commons.lang.StringUtils;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.support.HttpRequestWrapper;

import java.io.IOException;

public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {
    public SeataRestTemplateInterceptor() {
    }

    public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
        String xid = RootContext.getXID();
        if (StringUtils.isNotEmpty(xid)) {
            requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
        }

        return clientHttpRequestExecution.execute(requestWrapper, bytes);
    }
}


4.4 业务端调用只需要增加个@GlobalTransaction即可

代码语言:javascript
复制
package io.seata.samples.business.controller;

import io.seata.samples.business.service.BusinessService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;

@RequestMapping("/api/business")
@RestController
public class BusinessController {

    @Autowired
    private BusinessService businessService;

    /**
     * 购买下单,模拟全局事务提交
     *
     * @return
     */
    @RequestMapping("/purchase/commit")
    public Boolean purchaseCommit(HttpServletRequest request) {
        businessService.purchase("1001", "2001", 1);
        return true;
    }

    /**
     * 购买下单,模拟全局事务回滚
     *
     * @return
     */
    @RequestMapping("/purchase/rollback")
    public Boolean purchaseRollback() {
        try {
            businessService.purchase("1002", "2001", 1);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }

        return true;
    }
}
代码语言:javascript
复制
package io.seata.samples.business.service;

import io.seata.core.context.RootContext;
import io.seata.samples.business.client.OrderClient;
import io.seata.samples.business.client.StorageClient;
import io.seata.spring.annotation.GlobalTransactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class BusinessService {

    private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class);

    @Autowired
    private StorageClient storageClient;
    @Autowired
    private OrderClient orderClient;

    /**
     * 减库存,下订单
     *
     * @param userId
     * @param commodityCode
     * @param orderCount
     */
    @GlobalTransactional
    public void purchase(String userId, String commodityCode, int orderCount) {
        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
        storageClient.deduct(commodityCode, orderCount);
        orderClient.create(userId, commodityCode, orderCount);
    }
}

可能觉得神奇, TCC的cancel都不用写吗? 实际上seata的DataSourceProxy(DataSource)和RootContext应该做了很多事情, 上图整个流程的xid,undo-log/rollback-info,tc交互都隐藏其中,补偿SQL都自动做了,大厂还是大厂,且不论是KPI项目还是什么,不得不承认,阿里开源的中间件还是给中小团队带来福利。

seata最新的连saga也加入了,例子和文档也丰富, 估计这玩意会火起来。不过从架构上也看到, TC/seata-server应该容易成为瓶颈(单点或集群?), undo-log, rollback等肯定会拖慢些, 不过分布式事务都快不了。

5.其它分布式解决方案

互联网方案更多的实现最终一致性, 采用MQ最大努力的通知补偿, 或业务端一些主动查询,对账等方式补偿。

分布式事务如何处理还是得结合业务去考虑, 并不是什么业务都强要求ACID, 例如支付最大努力通知,主动查询业务补偿, 因地制宜的选择适合自己的方案就好。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-09-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Hyperledger实践 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.MySQL XA事务例子
  • 2. JTA/XA实现,Atomikos与Bitronix
  • 3.柔性事务Saga
  • 4.柔性事务TCC
    • 4.1 注意: 每个服务的数据库都需要增加undo日志表
      • 4.2 需要启动seata-server, 即作为Transcation Coordinate/TC事务协调者
        • 4.3 服务调用需要传递全局xid, spring boot mybatis例子是通过web filter获取xid, rest template拦截器调用服务自动增加xid。
          • 4.4 业务端调用只需要增加个@GlobalTransaction即可
          • 5.其它分布式解决方案
          相关产品与服务
          云数据库 SQL Server
          腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档