操作场景
本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
说明:
以 Java 客户端为例说明,其他语言客户端请参见 SDK 文档。
前提条件
完成资源创建与准备
下载 Demo
操作步骤
步骤1:安装 Java 依赖库
在 Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.6</version></dependency></dependencies>
注意:
如果您是在 Spring Boot 环境下使用,可能会遇到 annotations-api 依赖冲突,这时候,可以增加排除依赖即可。
<dependencies><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.6</version><exclusions><exclusion><groupId>org.apache.tomcat</groupId><artifactId>annotations-api</artifactId></exclusion></exclusions></dependency></dependencies>
步骤2. 生产消息
public class NormalMessageSyncProducer {private static final Logger log = LoggerFactory.getLogger(NormalMessageSyncProducer.class);private NormalMessageSyncProducer() {}public static void main(String[] args) throws ClientException, IOException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 添加配置的ak和skString accessKey = "yourAccessKey"; //ak,可以在控制台“权限管理”页面获取String secretKey = "yourSecretKey"; //sk,可以在控制台“权限管理”页面获取SessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 填写腾讯云提供的接入地址String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String topic = "yourNormalTopic";// In most case, you don't need to create too many producers, singleton pattern is recommended.final Producer producer = provider.newProducerBuilder().setClientConfiguration(clientConfiguration)// Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic// route before message publishing..setTopics(topic)// May throw {@link ClientException} if the producer is not initialized..build();// Define your message body.byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);String tag = "yourMessageTagA";final Message message = provider.newMessageBuilder()// Set topic for the current message..setTopic(topic)// Message secondary classifier of message besides topic..setTag(tag)// Key(s) of the message, another way to mark message besides message id..setKeys("yourMessageKey-1c151062f96e").setBody(body).build();try {final SendReceipt sendReceipt = producer.send(message);log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (Throwable t) {log.error("Failed to send message", t);}// Close the producer when you don't need it anymore.producer.close();}}
步骤3. 消费消息
腾讯云消息队列 TDMQ RocketMQ 版 5.x 系列支持两种类型的消费者客户端,分别为 Push Consumer 和 Simple Consumer。
以下代码示例以 Push Consumer 为例:
public class NormalPushConsumer {private static final Logger log = LoggerFactory.getLogger(NormalPushConsumer.class);private NormalPushConsumer() {}public static void main(String[] args) throws ClientException, IOException, InterruptedException {final ClientServiceProvider provider = ClientServiceProvider.loadService();// 添加配置的ak和skString accessKey = "yourAccessKey"; //ak,可以在控制台“权限管理”页面获取String secretKey = "yourSecretKey"; //sk,可以在控制台“权限管理”页面获取SessionCredentialsProvider sessionCredentialsProvider =new StaticSessionCredentialsProvider(accessKey, secretKey);// 填写腾讯云提供的接入地址String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8080";ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).enableSsl(false).setCredentialProvider(sessionCredentialsProvider).build();String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);String consumerGroup = "yourConsumerGroup";String topic = "yourTopic";// In most case, you don't need to create too many consumers, singleton pattern is recommended.PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)// Set the consumer group name..setConsumerGroup(consumerGroup)// Set the subscription for the consumer..setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)).setMessageListener(messageView -> {// Handle the received message and return consume result.log.info("Consume message={}", messageView);return ConsumeResult.SUCCESS;}).build();// Block the main thread, no need for production environment.Thread.sleep(Long.MAX_VALUE);// Close the push consumer when you don't need it anymore.pushConsumer.close();}}
步骤4. 查看消息详情

