TDMQ RocketMQ 版兼容了社区版 HTTP SDK 的接入,如果您此前使用的客户端使用了社区版 HTTP SDK,您在切换到 TDMQ RocketMQ 版后,您无需在客户端进行任何代码改造。
操作场景
如果当前您已使用了 HTTP 协议进行消息的收发,在您的客户端引入开源 HTTP SDK 后,TDMQ RocketMQ 版支持用户通过内网或公网使用 HTTP 协议接入。
本文以调用 Java SDK 为例介绍通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
注意:
暂不支持通过使用 HTTP 协议实现事务消息。
在创建 Group(消费组)时需要制定类型(TCP 或者 HTTP,详情请参见 创建 Group 说明 ),因此,同一个 Group(消费组)不支持 TCP 和 HTTP 客户端同时消费。
前提条件
通过 Maven 方式引入依赖,在 pom.xml 文件中添加对应语言的 SDK 依赖。
重试机制
HTTP 采用固定重试间隔的机制,暂不支持自定义。
消息类型 | 重试间隔 | 最大重试次数 |
普通消息 | 5分钟 | 288 |
顺序消息 | 1分钟 | 288 |
说明:
客户端在重试间隔内 ACK 这条消息,表示消费成功,不会重试。
重试间隔到期后客户端仍未 ACK,客户端会重新消费到这条消息。
每次消费的消息句柄只在重试间隔内有效,过期无效。
操作步骤
步骤1:安装 Java 依赖库
在 Java 项目中引入社区版 HTTP SDK 依赖。
步骤2:获取参数
1. 进入 消息队列 RocketMQ 控制台,在左侧导航栏选择集群管理,单击集群 ID 进入集群详情页。
2. 如下图所示,选择命名空间页签,单击操作列的配置权限进入权限配置页面。如果当前命名空间列表为空,可以单击新建,新建一个命名空间,详细描述请参见 完成资源创建与准备。

3. 在权限配置页复制对应的密钥(AccessKey)和角色(SecretKey),其中 AccessKey 简称 AK,SecretKey 简称 SK,以备在接下来的步骤中使用。

步骤3:生产消息
创建消息生产者
// 获取ClientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// 获取Topic的ProducerMQProducer producer = mqClient.getProducer(namespace, topicName);
参数 | 说明 |
topicName | 主题名称。 |
namespace | 命名空间名称。 ![]() 注意: 如果您使用的是 4.x 通用集群,此处填写集群 ID 即可。 |
endpoint | 集群 HTTP 协议接入地址。 ![]() |
secretKey | 角色名称。 |
accessKey | 角色密钥。 ![]() |
发送顺序消息
try {for (int i = 0; i < 10; i++) {TopicMessage pubMsg;pubMsg = new TopicMessage(("Hello RocketMQ " + i).getBytes(),"TAG");// 设置分区顺序消息的 ShardingKeypubMsg.setShardingKey(i % 3);TopicMessage pubResultMsg = producer.publishMessage(pubMsg);System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());}} catch (Throwable e) {System.out.println("Send mq message failed.");e.printStackTrace();}
参数 | 说明 |
TAG | 设置消息的 TAG。 |
ShardingKey | 顺序消息的分区字段,相同 ShardingKey 的消息会发送到同一个分区。 |
步骤4:消费消息
创建消费者
// 获取ClientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// 获取Topic的ConsumerMQProducer consumer = mqClient.getConsumer(namespace, topicName, groupName, "TAG");
参数 | 说明 |
topicName | 主题名称。 |
groupName | 生产者组名称。 |
namespace | 命名空间名称。 ![]() 注意: 如果您使用的是 4.x 通用集群,此处填写集群 ID 即可。 |
TAG | 订阅的标签。 |
endpoint | 集群 HTTP 协议接入地址,在控制台集群管理页面的集群列表操作栏的接入地址处获取。 |
secretKey | 角色名称。 |
accessKey | ![]() |
订阅消息
do {List<Message> messages = null;try {// 长轮询顺序消费消息, 拿到的消息可能是多个分区的, 一个分区的内的消息一定是顺序的// 对于顺序消费,如果一个分区内的消息只要有没有被确认消费成功的,则对于这个分区下次还会消费到相同的消息// 对于一个分区,只有所有消息确认消费成功才能消费下一批消息messages = consumer.consumeMessageOrderly(Integer.parseInt(batchSize),Integer.parseInt(waitSeconds));} catch (Throwable e) {e.printStackTrace();}if (messages == null || messages.isEmpty()) {System.out.println(Thread.currentThread().getName() + ": no new message, continue!");continue;}for (Message message : messages) {System.out.println("Receive message: " + message);}{List<String> handles = new ArrayList<String>();for (Message message : messages) {handles.add(message.getReceiptHandle());}try {consumer.ackMessage(handles);} catch (Throwable e) {if (e instanceof AckMessageException) {AckMessageException errors = (AckMessageException) e;System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");if (errors.getErrorMessages() != null) {for (String errorHandle :errors.getErrorMessages().keySet()) {System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());}}continue;}e.printStackTrace();}}} while (true);
参数 | 说明 |
batchSize | 一次拉取的消息条数,支持最多16条。 |
waitSeconds | 一次拉取的轮询等待时间,支持最长30秒。 |