前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >重学SpringBoot3-集成RocketMQ(二)

重学SpringBoot3-集成RocketMQ(二)

作者头像
CoderJia
发布2024-10-18 09:19:48
870
发布2024-10-18 09:19:48
举报
文章被收录于专栏:CoderJia的工作笔记

今天介绍下如何在 Spring Boot 3 中与 RocketMQ 整合实现分布式事务。RocketMQ 提供了类似 X/Open XA 的分布式事务功能,通过事务消息能达到分布式事务的最终一致。XA 是一种分布式事务解决方案,一种分布式事务处理模式。下面详细介绍下 RocketMQ 如何实现事务消息。

1. 基础概念

在 RocketMQ 中,半消息(Half Message)主要用于实现事务消息。它是指生产者在发送事务消息时,RocketMQ 会先将消息保存为 半消息,等待事务状态的最终确认,确保消息的可靠性和一致性。

图片来源:https://www.cnblogs.com/dennyzhangdd/p/14572024.html
图片来源:https://www.cnblogs.com/dennyzhangdd/p/14572024.html

半消息的工作流程:

  1. 发送半消息:生产者首先将消息发送到 RocketMQ,RocketMQ 将其标记为半消息,并暂时存储到消息队列中,但这时消费者不会收到该消息。
  2. 执行本地事务:生产者在发送半消息后,开始执行自己的本地事务操作。
  3. 提交或回滚消息
    • 如果本地事务成功,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费者。
    • 如果本地事务失败,生产者会通知 RocketMQ回滚消息,RocketMQ 会删除该半消息。
  4. 事务状态回查:如果 RocketMQ 没有收到生产者的事务状态确认,RocketMQ 会通过回查机制询问生产者事务的最终状态,确保消息的一致性。

现有一个案例,后端文件上传接口,同时上传 OSS 和 MySQL,数据库负责文件元信息的增删改查, OSS负责存储文件对象,如何保证最终一致性?下面用RocketMQ 的事务消息来实现最终一致性。

2. 准备工作

请参考《重学SpringBoot3-集成RocketMQ(一)》进行环境搭建和配置工作。配置文件新增如下配置:

代码语言:javascript
复制
  consumer2:
    group: springboot-consumer-group2  # 新的消费者组名称
    topic: transaction-topic  # 订阅新的主题
    access-key: RocketMQ    # 若启用了 ACL 功能
    secret-key: 12345678    # 若启用了 ACL 功能

3. 实现事务消息的生产者

创建一个事务消息的生产者类,通过事务生产者发送消息,并处理本地事务逻辑。

代码语言:javascript
复制
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class TransactionalMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送事务消息
     */
    public void sendTransactionMessage(String topic, String message) {
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction(topic, MessageBuilder.withPayload(message).build(), null);
        System.out.println("Transaction message sent: " + sendResult.getLocalTransactionState());
    }
}

4. 事务监听器实现

通过实现 RocketMQLocalTransactionListener 接口,定义事务的提交或回滚逻辑。

代码语言:javascript
复制
package com.example.boot308rocketmq;

import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * @author CoderJia
 * @create 2024/09/12 15:06
 * @Description
 **/
@Component
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务逻辑,根据业务情况返回事务的提交或回滚状态
        try {
            // 模拟本地事务处理逻辑
            System.out.println("Executing local transaction...");
            boolean success = performLocalTransaction();
            if (success) {
                return RocketMQLocalTransactionState.COMMIT;
            } else {
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        // 事务回查逻辑,确认本地事务的最终状态
        System.out.println("Checking local transaction...");

        // 根据本地事务的处理结果返回 COMMIT_MESSAGE 或 ROLLBACK_MESSAGE
        System.out.println("local transaction check success");
        return RocketMQLocalTransactionState.COMMIT;
    }

    private boolean performLocalTransaction() {
        // TODO 模拟本地事务处理文件上传OSS
        try {
            System.out.println("Upload files to OSS...");
            Thread.sleep(3000);
            System.out.println("File upload to OSS completed");
            return true;
        } catch (InterruptedException e) {
            System.out.println("Failed to upload file to OSS");
            return false;
        }
    }

}

5. 消费者示例

创建一个消费者,订阅并消费事务消息。

RocketMQListener<String> 是一个接口类型,用于定义一个 RocketMQ 消息监听器,它指定接收的消息类型为 String。在 RocketMQ 中,消费者可以通过实现 RocketMQListener 接口来自动处理接收到的消息。

代码语言:javascript
复制
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-consumer-group")
public class TransactionalMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        // 处理接收到的消息
        System.out.printf("Received message: %s%n", message);
    }
}

6. 发送事务消息

在服务中调用事务消息生产者:

代码语言:javascript
复制
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class MessageController {

    @Resource
    private RocketMQProducer rocketMQProducer;

    @GetMapping("/sendTransactionMessage")
    public ResponseEntity<String> sendTransactionMessage(@RequestParam String message) {
        rocketMQProducer.sendTransactionMessage("transaction-topic", message);
        return ResponseEntity.ok("Transaction message sent: " + message);
    }
}

7. 测试

7.1 模拟本地事务正常提交

如下图观察到,当本地事务即文件上传完成之后,生产者会通知 RocketMQ 提交消息,RocketMQ 将半消息转换为正常消息,并发送给消费进行消费。

本地事务正常提交
本地事务正常提交

7.2 模拟本地事务提交失败,未回查

在 RocketMQ 中,如果消息生产者没有在规定的时间内向消息队列确认事务状态,RocketMQ 会通过回查机制(即“回滚检查”或“事务回查”)来询问生产者事务的最终状态,从而确保消息的一致性。broker.conf 中配置 transactionCheckMax=10000 表示 RocketMQ 最长等待 10 秒后进行事务状态回查。

本地事务提交失败,未回查
本地事务提交失败,未回查

7.3 模拟本地事务提交失败,回查成功

本人搭建 RocketMQ 设置的回查时间为15s,所以将本地事务执行时间修改为 16s,这样会触发 RocketMQ 进行事务状态回查。

transactionCheckMax
transactionCheckMax
事务状态回查成功
事务状态回查成功

7.4 模拟本地事务提交失败,回查失败

修改回查方法的返回值,让RocketMQ 回查本地状态值将消息进行回滚,消费者同样不会消费消息。

回查失败
回查失败

7.5 模拟本地事务提交成功,消费失败

例如生产者本地事务执行成功,但是消费者消费失败的情况,RocketMQ 会进行消息重试。 修改一下消费者处理逻辑:

代码语言:javascript
复制
package com.example.boot308rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * @author CoderJia
 * @create 2024/09/12 15:06
 * @Description
 **/
@Slf4j
@Service
@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "springboot-consumer-group2")
public class TransactionalMessageConsumer02 implements RocketMQListener<MessageExt> {


    @Override
    public void onMessage(MessageExt message) {
        try {
            // 处理消息的业务逻辑
            String msgBody = new String(message.getBody(), "UTF-8");
            log.info("Received message:{}", msgBody);

            // 模拟业务处理
            boolean success = processBusinessLogic(msgBody);

            if (!success) {
                throw new RuntimeException("Business processing failure");
            }

            log.info("Business processing successful");

        } catch (Exception e) {
            log.error("MsgID:{},reconsumeTimes:{},e:{}", message.getMsgId(), message.getReconsumeTimes(), e.getMessage());

            // 重新抛出异常,让 RocketMQ 进行重试
            throw new RuntimeException("Message consumption failed, retrying");
        }
    }

    private boolean processBusinessLogic(String message) {
        // 这里是业务逻辑,返回 true 表示成功,false 表示失败
        // 例如:数据库操作或其他远程调用
        return Math.random() > 0.5; // 模拟随机成功或失败
    }
}
消费失败
消费失败

关键点总结

  1. 事务消息发送:通过 RocketMQTemplate.sendMessageInTransaction() 方法发送事务消息。
  2. 本地事务处理:实现 TransactionListener 接口的 executeLocalTransaction() 方法处理本地事务逻辑。
  3. 事务回查:在 checkLocalTransaction() 方法中定义如何检查事务消息的最终状态。

这个示例展示了如何在 Spring Boot 3 中整合 RocketMQ,并实现事务消息的生产和消费。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-09-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 基础概念
  • 2. 准备工作
  • 3. 实现事务消息的生产者
  • 4. 事务监听器实现
  • 5. 消费者示例
  • 6. 发送事务消息
  • 7. 测试
    • 7.1 模拟本地事务正常提交
      • 7.2 模拟本地事务提交失败,未回查
        • 7.3 模拟本地事务提交失败,回查成功
          • 7.4 模拟本地事务提交失败,回查失败
            • 7.5 模拟本地事务提交成功,消费失败
            • 关键点总结
            相关产品与服务
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档