Java SDK 使用

最近更新时间:2024-06-18 18:05:11

我的收藏

操作场景

本文以调用 Java SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
说明
以 Java 客户端为例说明,其他语言客户端请参见 SDK 文档。

前提条件

完成资源创建与准备

操作步骤

步骤1:安装 Java 依赖库

在 Java 项目中引入相关依赖,以 Maven 工程为例,在 pom.xml 添加以下依赖:
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.6</version>
</dependency>
</dependencies>
注意:
如果您是在 Spring Boot 环境下使用,可能会遇到 annotations-api 依赖冲突,这时候,可以增加排除依赖即可。
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.0.6</version>
<exclusions>
<exclusion>
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

步骤2. 生产消息

public class NormalMessageSyncProducer {
private static final Logger log = LoggerFactory.getLogger(NormalMessageSyncProducer.class);

private NormalMessageSyncProducer() {
}

public static void main(String[] args) throws ClientException, IOException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// 添加配置的ak和sk
String accessKey = "yourAccessKey"; //ak,可以在控制台“权限管理”页面获取
String secretKey = "yourSecretKey"; //sk,可以在控制台“权限管理”页面获取
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);

// 填写腾讯云提供的接入地址
String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8080";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String topic = "yourNormalTopic";
// In most case, you don't need to create too many producers, singleton pattern is recommended.
final Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the topic name(s), which is optional but recommended. It makes producer could prefetch the topic
// route before message publishing.
.setTopics(topic)
// May throw {@link ClientException} if the producer is not initialized.
.build();
// Define your message body.
byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
String tag = "yourMessageTagA";
final Message message = provider.newMessageBuilder()
// Set topic for the current message.
.setTopic(topic)
// Message secondary classifier of message besides topic.
.setTag(tag)
// Key(s) of the message, another way to mark message besides message id.
.setKeys("yourMessageKey-1c151062f96e")
.setBody(body)
.build();
try {
final SendReceipt sendReceipt = producer.send(message);
log.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (Throwable t) {
log.error("Failed to send message", t);
}
// Close the producer when you don't need it anymore.
producer.close();
}
}

步骤3. 消费消息

腾讯云消息队列 TDMQ RocketMQ 版 5.x 系列支持两种类型的消费者客户端,分别为 Push Consumer 和 Simple Consumer。
以下代码示例以 Push Consumer 为例:
public class NormalPushConsumer {
private static final Logger log = LoggerFactory.getLogger(NormalPushConsumer.class);

private NormalPushConsumer() {
}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();

// 添加配置的ak和sk
String accessKey = "yourAccessKey"; //ak,可以在控制台“权限管理”页面获取
String secretKey = "yourSecretKey"; //sk,可以在控制台“权限管理”页面获取
SessionCredentialsProvider sessionCredentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);

// 填写腾讯云提供的接入地址
String endpoints = "rmq-xxx.rocketmq.xxxtencenttdmq.com:8080";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.enableSsl(false)
.setCredentialProvider(sessionCredentialsProvider)
.build();
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
String consumerGroup = "yourConsumerGroup";
String topic = "yourTopic";
// In most case, you don't need to create too many consumers, singleton pattern is recommended.
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// Set the consumer group name.
.setConsumerGroup(consumerGroup)
// Set the subscription for the consumer.
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
// Handle the received message and return consume result.
log.info("Consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
// Block the main thread, no need for production environment.
Thread.sleep(Long.MAX_VALUE);
// Close the push consumer when you don't need it anymore.
pushConsumer.close();
}
}

步骤4. 查看消息详情

发送完成消息后会得到一个消息ID (messageID),开发者可以在消息查询页面查询刚刚发送的消息,如下图所示;并且可以查看特定消息的详情和轨迹等信息,详情请参见 消息查询