操作场景
本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
说明
前提条件
下载 Demo
操作步骤
步骤1:安装 Java 依赖库
在 Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.6</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.7</version></dependency></dependencies>
步骤2. 生产消息
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.apache.rocketmq.client.java.example.AsyncProducerExample; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class NormalMessageSyncProducer {private static final Logger log = LoggerFactory.getLogger(NormalMessageSyncProducer.class);private NormalMessageSyncProducer() {}public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 添加配置的ak和skString accessKey = "yourAccessKey"; //akString secretKey = "yourSecretKey"; //skSessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 填写腾讯云提供的接入地址String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String topic = "yourNormalTopic";// 通常在一个客户端内无需创建过多的生产者。final Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration)// 设置主题名,此处的设置非必须,但是推荐设置,以便生产者可以在正式发送消息前,预先抓取消息路由。.setTopics(topic)// 如生产者未初始化可能会报 M {@link ClientException} 的错误。.build();// 此处定义消息主体。byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);String tag = "yourMessageTagA";final Message message = provider.newMessageBuilder()// Set topic for the current message..setTopic(topic)// 在 topic 下进行的消息二级分类,区别同一个主题内不同的消息。.setTag(tag)// 消息键,除消息 ID 外可以区别不同消息的其他途径。.setKeys("yourMessageKey-1c151062f96e").setBody(body).build();try {final SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {log.error("Failed to send message", t);}// 发送完成后,如无别的需要可以关闭生产者客户端。producer.close();}}
步骤3. 消费消息
腾讯云消息队列 TDMQ RocketMQ 版 5.x 系列支持两种消费模式,分别为 Push Consumer 和 Simple Consumer,以下代码示例以 Push Consumer 为例。
import java.io.IOException; import java.util.Collections; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.SessionCredentialsProvider; import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class NormalPushConsumer {private static final Logger log = LoggerFactory.getLogger(NormalPushConsumer.class);private NormalPushConsumer() {}public static void main(String[] args) throws ClientException, IOException, InterruptedException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 添加配置的 ak 和 skString accessKey = "yourAccessKey"; //akString secretKey = "yourSecretKey"; //skSessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 填写腾讯云提供的接入地址String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8081";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);String consumerGroup = "yourConsumerGroup";String topic = "yourTopic";// 通常在一个客户端内无需创建过多的消费者。PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// 设置消费者组名称。.setConsumerGroup(consumerGroup)// 设置消费者订阅名称.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setMessageListener(messageView -> {// 处理消息并返回消息消费结果。log.info("Consume message={}", messageView);return ConsumeResult.SUCCESS;}).build();// 生产环境无需阻塞主线程。Thread.sleep(Long.MAX_VALUE);// 消费完成后,如无别的需要可以关闭消费者客户端。pushConsumer.close();}}
步骤4. 查看消息详情

