发送与接收普通消息

最近更新时间:2024-05-28 15:05:51

我的收藏
TDMQ RocketMQ 5.x 版兼容了社区版 HTTP SDK 的接入,如果您此前使用的客户端使用了社区版 HTTP SDK,您在切换到 TDMQ RocketMQ 5.x 版后,您无需在客户端进行任何代码改造。

操作场景

如果当前您已使用了 HTTP 协议进行消息的收发,在您的客户端引入开源 HTTP SDK 后,TDMQ RocketMQ 5.x 版支持用户通过内网或公网使用 HTTP 协议接入。
本文以调用 Java SDK 为例介绍通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
注意:
暂不支持通过使用 HTTP 协议实现事务消息。

前提条件

通过 Maven 方式引入依赖,在 pom.xml 文件中添加对应语言的 SDK 依赖。

重试机制

HTTP 采用固定重试间隔的机制:
重试间隔
最大重试次数
5分钟
可通过修改消费组配置实现自定义最大重试次数,默认 16 次。
说明:
客户端在重试间隔内 ACK 这条消息,表示消费成功,不会重试。
重试间隔到期后客户端仍未 ACK,客户端会重新消费到这条消息。
每次消费的消息句柄只在重试间隔内有效,过期无效。

操作步骤

步骤1:安装 Java 依赖库

在 Java 项目中引入社区版 HTTP SDK 依赖。

步骤2:获取参数

1. 登录消息队列 RocketMQ 控制台,选择集群。
2. 复制集群 ID、接入地址等参数。




步骤3:生产消息

创建消息生产者

// 获取Client
MQClient mqClient = new MQClient(endpoint, accessKey, secretKey);

// 获取Topic的Producer
MQProducer producer = mqClient.getProducer(instanceId, topicName);
参数
说明
endpoint
接入地址,在基本信息页面获取。
accessKey
角色 AccessKey,在集群权限页面获取。
secretKey
角色 SecretKey,在集群权限页面获取。
instanceId
集群 ID。
topicName
主题名称,在 Topic 页面获取。

发送消息

try {
for (int i = 0; i < 10; i++) {
TopicMessage pubMsg;
pubMsg = new TopicMessage(
("Hello RocketMQ " + i).getBytes(),
"TAG"
);
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());
}
} catch (Throwable e) {
System.out.println("Send mq message failed.");
e.printStackTrace();
}
参数
说明
TAG
设置消息的 TAG。

步骤3:消费消息

创建消费者

// 获取Client
MQClient mqClient = new MQClient(endpoint, accessKey, secretKey);

// 获取Topic的Consumer
MQConsumer consumer = mqClient.getConsumer(instanceId, topicName, groupName, "TAG");
参数
说明
endpoint
接入地址,在基本信息页面获取。
accessKey
角色 AccessKey,在集群权限页面获取。
secretKey
角色 SecretKey,在集群权限页面获取。
instanceId
集群 ID。
topicName
主题名称,在 Topic 页面获取。
groupName
消费组名称,在 Group 页面获取。

订阅消息

do {
List<Message> messages = null;

try {
// 长轮询消费消息
// 长轮询表示如果 topic 没有消息则请求会在服务端等待,如果有消息可以消费则立即返回
// 如果对消费延迟比较敏感,强烈建议使用多线程并发拉取消息
messages = consumer.consumeMessage(
Integer.parseInt(batchSize),
Integer.parseInt(waitSeconds)
);
} catch (Throwable e) {
e.printStackTrace();
}
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}

for (Message message : messages) {
System.out.println("Receive message: " + message);
}

{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}

try {
consumer.ackMessage(handles);
} catch (Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
参数
说明
batchSize
一次拉取的消息条数,支持最多16条。
waitSeconds
一次拉取的轮询等待时间,支持最长30秒。