本文主要介绍消息队列 TDMQ RocketMQ 版中事务消息的概念、技术原理、应用场景和使用方式。
相关概念
事务消息是 RocketMQ 提供的一种高级特性消息,通过将二阶段提交的动作和本地事务绑定,来保障分布式场景下消息生产和本地事务的最终一致性,相比普通消息,主要是扩展了二次确认和本地事务状态回查补偿的机制。
1. 半消息 half message:
生产者发送事务消息到 RocketMQ 服务端后,消息会被持久化并标记为“暂不可投递”的状态,直到本地事务执行完成并确认后,消息才会决定是否对消费者可见,此状态下的消息,称为半消息(half message)。
2. 二阶段提交
实现事务最终一致性的关键机制,一阶段为发送 half message,二阶段为生产者执行本地事务,并根据执行结果向 RocketMQ 服务端提交 commit(允许投递)或 rollback(丢弃消息)的确认结果,以此来决定 half message 的去留。
3. OP 消息:
用于给消息状态打标记,没有对应 OP 消息的 half message,就说明二阶段确认状态未知,需要 RocketMQ 服务端进行本地事务状态主动回查,OP 消息的内容为对应的 half message 的存储的Offset。
4. 相关 topic:
real topic:业务真实 topic,生产者发消息时指定的 topic 值。
half topic:系统 topic,topic名称为 RMQ_SYS_TRANS_HALF_TOPIC,用于存储 half message。
op topic:系统 topic,topic名称为 MQ_SYS_TRANS_OP_HALF_TOPIC,用于存储 OP 消息, half message 二次状态确认后,不管是 commit 还是 rollback,都会写入一条对应的 OP 消息到这个 topic。
应用场景
例如在跨行转账这一典型的分布式场景中,必须严格保证转出方账户扣款与转入方账户入账的最终一致性,我们可以采用 TDMQ RocketMQ 版的事务消息来实现这一功能,具体分为以下三个阶段:

1. 阶段一:发送事务消息(准备转账)
用户 1 发起转账后,其所在的银行系统 A,向 RocketMQ 服务端对应的业务 topic 发送一条事务消息,内容包含 “用户 1 转出 1000 元至用户 2 账户”。此时,该消息对转入方的系统 B 不可见,避免在转出方本地事务确认前,转入方提前执行入账操作,确保资金流转安全性。
2. 阶段二:执行本地事务(转出账户扣减)
事务消息发送成功后,系统 A 继续执行本地事务,扣减用户1 的账户余额,若扣减成功,则提交二次确认 commit 到 RocketMQ 服务端,消息被继续投递到下游,反之,提交 rollback,事务结束,双方账户余额保持不变。
3. 阶段三:下游服务消费(转入银行余额增加)
转入方银行系统 B 预先订阅转账 topic,接收消息后执行用户 2 账户余额增加操作。若消费过程中因网络异常、账户状态问题导致失败,RocketMQ 将自动触发重试机制,若多次重试仍未成功,消息将转入死信队列,后续由人工介入核对,通过补偿流程保障资金最终准确入账。
通过以上三个阶段,RocketMQ 事务消息机制在跨行转账场景中,成功实现了分布式事务的最终一致性。类似的,在电商订单支付与库存扣减、金融交易对账、企业多系统数据同步等场景中,RocketMQ 事务消息都能凭借其可靠的机制,保障跨服务操作的最终一致性。
实现流程

1. 生产者发送事务消息到 RocketMQ 服务端。
2. 服务端存储这条消息后返回发送成功的响应,此时消息对下游消费者不可见,处于 half message 状态。
3. 生产者收到半消息成功的响应后,继续往下执行本地事务(如更新业务数据库)。
4. 根据本地事务的执行结果,生产者会向 RocketMQ 服务端提交最终状态,也就是二次确认。
5. 确认结果为 commit 时,服务端会将事务消息继续向下投递给消费者,确认结果为 rollback 时,服务端将会丢弃该消息,不再向下投递。
6. 确认结果是 unknown 或一直没有收到确认结果时,一定时间后,将会触发事务状态主动回查。
7. 当生产者未提交最终状态或者二次确认的结果为 unknown 时,RocketMQ 服务端将会主动发起事务结果查询请求到生产者服务。
8. 生产者收到请求后提交二次确认结果,逻辑再次回到第5步,此时如果生产者服务暂时不可用,则 RocketMQ 服务端会在指定时间间隔后,继续主动发起回查请求,直到超过最大回查次数后,回滚消息。
如此,不管本地事务是否执行成功,都能实现事务状态的最终一致性。以上步骤,可用时序图直观体现为:

使用示例
这里以 TDMQ 版 RocketMQ 5.x 版本集群为例,演示事务消息的使用方式和效果。
1. 首先登录 RocketMQ 控制台,新建一个消息类型为事务消息的 topic。

2. 以 Java 语言为例,引入 5.x 对应版本的依赖。
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.6</version></dependency>
3. 启动生产者。
public class ProducerTransactionMessageDemo {private static final Logger log = LoggerFactory.getLogger(ProducerTransactionMessageDemo.class);private static boolean executeLocalTransaction() {// 模拟本地事务(如数据库插入操作),这里假设执行成功return true;}private static boolean checkTransactionStatus(String orderId) {// 模拟查询本地事务执行结果,如查询订单ID是否已入库,查到则return truereturn true;}public static void main(String[] args) throws ClientException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 在控制台权限管理页面获取ak和skString accessKey = "your-ak";String secretKey = "your-sk";SessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 在控制台获取并填写腾讯云提供的接入地址String endpoints = "https://your-endpoints";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String topic = "tran_topic";TransactionChecker checker = messageView -> {log.info("Receive transactional result check request, message={}", messageView);// 服务端主动回查本地事务状态String orderId = messageView.getProperties().get("orderId");boolean isSuccess = checkTransactionStatus(orderId);return isSuccess ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;};// 创建生产着并设置回查的checker对象Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration).setTopics(topic).setTransactionChecker(checker).build();// 开启事务final Transaction transaction = producer.beginTransaction();byte[] body = "This is a transaction message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);String tag = "tagA";final Message message = provider.newMessageBuilder().setTopic(topic).setTag(tag).setKeys("your-key-565ef26f5727")//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验.addProperty("orderId", "0001").setBody(body).build();// 发送半消息try {final SendReceipt sendReceipt = producer.send(message, transaction);log.info("Send transaction message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {log.error("Failed to send message", t);return;}// 执行本地事务boolean localTxSuccess = executeLocalTransaction();if (localTxSuccess) {// 本地事务执行成功,二次确认为Committransaction.commit();} else {// 本地事务执行失败,二次确认为Rollbacktransaction.rollback();}// producer.close();}}
4. 运行代码后,在控制台的消息查询页面,可以看到已经有一条投递完成等待消费的消息。

5. 启动消费者,订阅这个topic,成功消费消息后,在腾讯云控制台查看消息轨迹:

6. 修改代码,假设本地事务执行失败,使处于 half message 状态的事务消息回滚。
private static boolean executeLocalTransaction() {// 本地事务执行失败return false;}private static boolean checkTransactionStatus(String orderId) {// 回查结果自然也是rollback,返回falsereturn false;}
7. 此时,可以发现消息发送成功了,但在控制台的消息消息查询页面是不可见的,启动消费者也不能消费到这条消息。


注意事项
使用事务消息过程中,需注意以下几点:
1. topic 类型必须为事务 TRANSACTION,否则生产消息会报错,关键错误信息:current message type not match with topic accept message types。
2. 事务消息不支持延迟,若设置了延迟属性,在发送消息前会被清除延迟属性。
3. 如果本地事务执行较慢,此时服务端进行事务回查时,应返回 unknown,且如果确认本地事务执行耗时会很长,应修改第一次事务回查的时间,以避免产生大量结果未知的事务。