有奖捉虫:行业应用 & 管理与支持文档专题 HOT
TDMQ RocketMQ 版兼容了社区版 HTTP SDK 的接入,如果您此前使用的客户端使用了社区版 HTTP SDK,您在切换到 TDMQ RocketMQ 版后,您无需在客户端进行任何代码改造。

操作场景

如果当前您已使用了 HTTP 协议进行消息的收发,在您的客户端引入开源 HTTP SDK 后,TDMQ RocketMQ 版支持用户通过内网或公网使用 HTTP 协议接入。
本文以调用 Java SDK 为例介绍通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
注意:
暂不支持通过使用 HTTP 协议实现事务消息。
在创建 Group(消费组)时需要制定类型(TCP 或者 HTTP,详情请参见 创建 Group 说明 ),因此,同一个 Group(消费组)不支持 TCP 和 HTTP 客户端同时消费。

前提条件

通过 Maven 方式引入依赖,在 pom.xml 文件中添加对应语言的 SDK 依赖。

重试机制

HTTP 采用固定重试间隔的机制,暂不支持自定义。
消息类型
重试间隔
最大重试次数
普通消息
5分钟
288
顺序消息
1分钟
288
说明:
客户端在重试间隔内 ACK 这条消息,表示消费成功,不会重试。
重试间隔到期后客户端仍未 ACK,客户端会重新消费到这条消息。
每次消费的消息句柄只在重试间隔内有效,过期无效。

操作步骤

步骤1:安装 Java 依赖库

在 Java 项目中引入社区版 HTTP SDK 依赖。

步骤2:获取参数

1. 登录 TDMQ 控制台,选择所在的集群,单击集群名进入集群详情页。
2. 如下图所示,选择顶部的命名空间页签,单击右侧的配置权限进入权限配置页面,如当前角色列表为空,可以单击新建,新建一个角色,详细描述请参见 完成资源创建与准备



3. 在页面上复制对应的 AK 和 SK,以备在接下来的步骤中使用。




步骤3:生产消息

创建消息生产者

// 获取Client
MQClient mqClient = new MQClient(endpoint, accessKey, secretKey);

// 获取Topic的Producer
MQProducer producer = mqClient.getProducer(namespace, topicName);
参数
说明
topicName
主题名称,在控制台集群管理中 Topic 页签中复制。
namespace
命名空间名称,在控制台集群管理中 Namespace 页签中复制。



endpoint
集群 HTTP 协议接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。
secretKey
角色名称,在 角色管理 页面复制。
accessKey
角色密钥,在 角色管理 页面复制密钥列复制。
img



发送消息

try {
for (int i = 0; i < 10; i++) {
TopicMessage pubMsg;
pubMsg = new TopicMessage(
("Hello RocketMQ " + i).getBytes(),
"TAG"
);
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());
}
} catch (Throwable e) {
System.out.println("Send mq message failed.");
e.printStackTrace();
}
参数
说明
TAG
设置消息的 TAG。

步骤3:消费消息

创建消费者

// 获取Client
MQClient mqClient = new MQClient(endpoint, accessKey, secretKey);

// 获取Topic的Consumer
MQConsumer consumer = mqClient.getConsumer(namespace, topicName, groupName, "TAG");
参数
说明
topicName
主题名称,在控制台集群管理中 Topic 页签中复制。
groupName
生产者组名称,在控制台集群管理中 Group 页签中复制。
namespace
命名空间名称,在控制台集群管理中 Namespace 页签中复制。



TAG
订阅的标签。
endpoint
集群 HTTP 协议接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。
secretKey
角色名称,在 角色管理 页面复制。
accessKey
角色密钥,在 角色管理 页面复制密钥列复制。
img



订阅消息

do {
List<Message> messages = null;

try {
// 长轮询消费消息
// 长轮询表示如果 topic 没有消息则请求会在服务端等待,如果有消息可以消费则立即返回
messages = consumer.consumeMessage(
Integer.parseInt(batchSize),
Integer.parseInt(waitSeconds)
);
} catch (Throwable e) {
e.printStackTrace();
}
if (messages == null || messages.isEmpty()) {
System.out.println(Thread.currentThread().getName() + ": no new message, continue!");
continue;
}

for (Message message : messages) {
System.out.println("Receive message: " + message);
}

{
List<String> handles = new ArrayList<String>();
for (Message message : messages) {
handles.add(message.getReceiptHandle());
}

try {
consumer.ackMessage(handles);
} catch (Throwable e) {
if (e instanceof AckMessageException) {
AckMessageException errors = (AckMessageException) e;
System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
if (errors.getErrorMessages() != null) {
for (String errorHandle :errors.getErrorMessages().keySet()) {
System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()
+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());
}
}
continue;
}
e.printStackTrace();
}
}
} while (true);
参数
说明
batchSize
一次拉取的消息条数,支持最多16条。
waitSeconds
一次拉取的轮询等待时间,支持最长30秒。