操作场景
本文以调用 Java SDK 为例介绍通过开源 SDK 实现事务消息收发的操作过程。
前提条件
完成资源创建与准备(如果是全局顺序消息,需要创建单队列topic)
操作步骤
步骤1:安装 Java 依赖库
在 Java 项目中引入相关依赖,以 maven 工程为例,在 pom.xml 添加以下依赖:
说明:
依赖版本要求 ≥ 4.9.3, 当前建议为4.9.4。
<!-- in your <dependencies> block --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.4</version></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.4</version></dependency>
步骤2:生产消息
实现 TransactionListener
public class TransactionListenerImpl implements TransactionListener {//半消息发送成功后,回调该方法执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {//这里执行数据库事务,如果成功,就返回成功,否则返回未知,或者回滚,等待回查return LocalTransactionState.UNKNOW;}//回查本地事务@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {//这里查询本地db的数据状态,然后决定是否是否提交return LocalTransactionState.COMMIT_MESSAGE;}}
创建消息生产者
//需要用户实现一个TransactionListener 实例,TransactionListener transactionListener = new TransactionListenerImpl();// 实例化事务消息生产者ProducerTransactionMQProducer producer = new TransactionMQProducer("transaction_group",// ACL权限new AclClientRPCHook(new SessionCredentials(ClientCreater.ACCESS_KEY, ClientCreater.SECRET_KEY)));// 设置NameServer的地址producer.setNamesrvAddr(ClientCreater.NAMESERVER);producer.setTransactionListener(transactionListener);producer.start();
发送消息
for (int i = 0; i < 3; i++) {// 构造消息示例Message msg = new Message(TOPIC_NAME, "your tag", "KEY" + i,("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));SendResult sendResult = producer.sendMessageInTransaction(msg,null);System.out.printf("%s%n", sendResult);}
步骤3:消费消息
创建消费者
TDMQ RocketMQ 版支持 push 和 pull 两种消费模式。推荐Push消费模式。
// 实例化消费者DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限// 设置NameServer的地址pushConsumer.setNamesrvAddr(nameserver);pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive transaction messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});
订阅消息
根据消费模式不同,订阅方式也有所区别。
// 订阅topicpushConsumer.subscribe(topic_name, "*");// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();
步骤4:查看消费详情

说明: