前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【RocketMQ系列(三)】基于RocketMQ的分布式事务

【RocketMQ系列(三)】基于RocketMQ的分布式事务

作者头像
行百里er
发布2020-12-02 15:47:58
1.2K0
发布2020-12-02 15:47:58
举报
文章被收录于专栏:JavaJourneyJavaJourney

Java技术分享,点击上方蓝字关注我吧。

RocketMQ系列第三篇。 前两篇介绍了消息队列及RocketMQ的基本使用,本次来聊一下基于RocketMQ的分布式事务解决方案。

  • Why分布式事务
  • 分布式事务解决方案
  • 基于RocketMQ的分布式事务
  • 代码实现

0x01 为什么有分布式事务

现在很多大公司的项目都拆分为为服务器架构的了,通常每个服务只处理一件事情,部署在一个服务器节点上,不同的服务部署在不同的机器上,这就存在服务之间的相互通信问题。

比如订单服务和支付服务,这里举一个简单的业务流程,创建一个订单之后,向MQ发送消息,支付服务消费消息,调起支付,然后订单服务进行修改订单状态,发货。

如果用户已经支付完成了,但是在处理订单状态环节出现了问题,该怎么办?这个时候消费者方(支付服务)已经把消息消费了,无法回滚了。

所以这两个服务,从创建订单到支付到更新订单状态等一系列的操作必须是原子性的。

这就是分布式系统中涉及到的分布式事务问题。

0x02 实现最终一致性的解决思路

2.1 两阶段提交(2PC)

Two-phase Commit,简称2PC两阶段提交。从字面意思就能想到,提交事务时分两个阶段来完成最终事务的提交。

该方案通过引入一个第三方协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。

2.1.1 阶段一:准备阶段

协调者询问参与者事务是否执行成功,参与者发回事务执行结果。

2.1.2 阶段二:提交阶段

如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。

Tip:在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚

2.1.3 两阶段提交存在的问题

  • 同步阻塞 所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作。
  • 单点问题 协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其它操作。
  • 数据不一致 在阶段二,如果协调者只发送了部分 Commit 消息,此时网络发生异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。
  • 太过保守 任意一个节点失败就会导致整个事务失败,没有完善的容错机制。

2.2 三阶段提交(3PC,TCC,补偿事务)

Try-Confirm-Cancel,TCC,采用的是补偿机制。其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作

2.2.1 三个阶段

  • Try 主要是对业务系统做检测及资源预留
  • Confirm 主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功
  • Cancel 主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

2.2.2 优缺点

TCC是对2PC的一个改进,try阶段通过预留资源的方式避免了同步阻塞资源的情况

但是TCC编程需要业务自己实现try,confirm,cancel,对业务入侵太大,实现起来也比较复杂

0x03 基于RocketMQ的分布式事务

RocketMQ支持分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致。

3.1 实现方式

Half Message(半消息,预处理消息)

Broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中,它暂时不会被Consumer消费。

检查事务状态

Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向Producer确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。

事务消息的三种状态

  • 提交状态:提交事务,它允许消费者消费此消息。
  • 回滚状态:回滚事务,它代表该消息将被删除,不允许被消费。
  • 未知状态:中间状态,它代表需要检查消息队列来确定状态。

消息回查

有一种场景,如果发送预备消息成功,执行本地事务成功,但发送确认消息失败;那么问题就来了,因为Producer的业务都已经处理完毕了,就剩下Consumer消费了,但是你commit失败了,Consumer消费不到,这里就出现了数据不一致。

RocketMQ采用消息状态回查来解决这种问题,RocketMQ会定时遍历commitlog中的预备消息。

因为预备消息最终肯定会变为Commit消息Rollback消息,所以遍历预备消息去回查本地业务的执行状态,如果发现本地业务没有执行成功就Rollback,如果执行成功就发送Commit消息。

超时

如果超过回查次数,默认回滚消息。

3.2 Show you the code

3.2.1 Producer发送事务性消息

RocketMQ的分布式事务,需要生产者发送事务性消息,使用TransactionMQProducer类创建生产者,并指定唯一的ProducerGroup,就可以设置自定义线程池来处理这些检查请求。

执行本地事务后,需要根据执行结果对消息队列进行回复。

生成TransactionMQProducer实例

代码语言:javascript
复制
TransactionMQProducer producer = new TransactionMQProducer("laopo");
producer.setNamesrvAddr("192.168.2.110:9876");

//处理检查请求的线程池
ExecutorService executorService = new ThreadPoolExecutor(2,
        5,
        100,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(2000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

producer.setExecutorService(executorService);

3.2.2 设置监听回查

设置监听事务的接口TransactionListener:当发送半消息成功时,使用executeLocalTransaction方法来执行本地事务,返回前文所述的三种状态之一:提交、回滚、未知。

checkLocalTransaction方法用于检查本地事务状态,并回应消息队列的检查请求,该方法也返回提交、回滚、未知三种状态之一。

代码语言:javascript
复制
//设置回查
producer.setTransactionListener(new TransactionListener() {

    private AtomicInteger transactionIndex = new AtomicInteger(0);
    //用来保存事务的状态
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    //半消息发送成功触发此方法来执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(message.getTransactionId(), status);

        return LocalTransactionState.UNKNOW;
    }

    //broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        Integer status = localTrans.get(messageExt.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});

3.2.3 发送消息

调用sendMessageInTransaction来发送消息:

代码语言:javascript
复制
//生产并发送消息
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
    Message msg =
            new Message("girl", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    //发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.printf("%s%n", sendResult);
    Thread.sleep(10);
}
for (int i = 0; i < 100000; i++) {
    Thread.sleep(1000);
}
//关闭生产者实例
producer.shutdown();
System.out.printf("%s", "已关闭生产者实例");

运行结果:

3.2.4 消费事务消息

之前生产消息生产了TagA、B到TagE的消息,我们这里顺便再验证一下TAG过滤消费,就消费TagB的吧:

代码语言:javascript
复制
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("laopo-consumer");
consumer.setNamesrvAddr("192.168.2.110:9876");

//订阅topic,消费TagB的消息
consumer.subscribe("girl", "TagB");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
        // 标记该消息已经被成功消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动Consumer实例
consumer.start();
System.out.println("consumer started.");

成功消费了TagB的消息。

完整代码位于 GitHub github.com/xblzer/JavaJourney

本次导航结束,以上。


首发公众号 行百里er ,欢迎老铁们关注阅读指正。代码仓库 GitHub github.com/xblzer/JavaJourney

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 行百里er 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0x01 为什么有分布式事务
  • 0x02 实现最终一致性的解决思路
    • 2.1 两阶段提交(2PC)
      • 2.1.1 阶段一:准备阶段
      • 2.1.2 阶段二:提交阶段
      • 2.1.3 两阶段提交存在的问题
    • 2.2 三阶段提交(3PC,TCC,补偿事务)
      • 2.2.1 三个阶段
      • 2.2.2 优缺点
  • 0x03 基于RocketMQ的分布式事务
    • 3.1 实现方式
      • Half Message(半消息,预处理消息)
      • 检查事务状态
      • 事务消息的三种状态
      • 消息回查
      • 超时
    • 3.2 Show you the code
      • 3.2.1 Producer发送事务性消息
      • 3.2.2 设置监听回查
      • 3.2.3 发送消息
      • 3.2.4 消费事务消息
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档