TDMQ RocketMQ 版兼容了社区版 HTTP SDK 的接入,如果您此前使用的客户端使用了社区版 HTTP SDK,您在切换到 TDMQ RocketMQ 版后,您无需在客户端进行任何代码改造。
操作场景
如果当前您已使用了 HTTP 协议进行消息的收发,在您的客户端引入开源 HTTP SDK 后,TDMQ RocketMQ 版支持用户通过内网或公网使用 HTTP 协议接入。
本文以调用 Java SDK 为例介绍通过 HTTP SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。
注意:
暂不支持通过使用 HTTP 协议实现事务消息。
如果您使用的是 4.x 集群,在创建 Group 消费组时需要设置协议类型(TCP 或者 HTTP,详情请参见 Group 管理 ),因此,同一个 Group(消费组)不支持 TCP 和 HTTP 客户端同时消费。
前提条件
通过 Maven 方式引入依赖,在 pom.xml 文件中添加对应语言的 SDK 依赖。
重试机制
HTTP 采用固定重试间隔的机制:
重试间隔 | 最大重试次数 |
5分钟 | 可通过修改消费组配置实现自定义最大重试次数,默认 16 次。 |
说明:
客户端在重试间隔内 ACK 这条消息,表示消费成功,不会重试。
重试间隔到期后客户端仍未 ACK,客户端会重新消费到这条消息。
每次消费的消息句柄只在重试间隔内有效,过期无效。
操作步骤
步骤1:安装 Java 依赖库
在 Java 项目中引入社区版 HTTP SDK 依赖。
步骤2:生产消息
创建消息生产者
// 获取ClientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// 获取Topic的ProducerMQProducer producer = mqClient.getProducer(namespace, topicName);
说明:
参数 | 说明 |
endpoint | 集群接入地址,控制台集群基本信息页面的接入信息模块获取。 ![]() |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制。 |
namespace | 命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。 ![]() |
topicName | Topic 的名称,在控制台 topic 页面复制。 |
发送消息
try {for (int i = 0; i < 10; i++) {TopicMessage pubMsg;pubMsg = new TopicMessage(("Hello RocketMQ " + i).getBytes(),"TAG");TopicMessage pubResultMsg = producer.publishMessage(pubMsg);System.out.println("Send mq message success. MsgId is: " + pubResultMsg.getMessageId());}} catch (Throwable e) {System.out.println("Send mq message failed.");e.printStackTrace();}
参数 | 说明 |
TAG | 设置消息的 TAG。 |
步骤3:消费消息
创建消费者
// 获取ClientMQClient mqClient = new MQClient(endpoint, accessKey, secretKey);// 获取Topic的ConsumerMQConsumer consumer = mqClient.getConsumer(namespace, topicName, groupName, "TAG");
说明:
参数 | 说明 |
endpoint | 集群接入地址,控制台集群基本信息页面的接入信息模块获取。 ![]() |
accessKey | 角色密钥,在控制台的集群权限页面 AccessKey 列复制。 ![]() |
secretKey | 角色名称,在控制台的集群权限页面 SecretKey 列复制。 |
namespace | 命名空间的名称,在控制台命名空间页面复制。如果您使用的是4.x通用集群或者5.x集群,此处可填写集群的 ID。 ![]() |
topicName | Topic 的名称,在控制台 Topic 页面复制。 |
groupName | 消费组名称,在控制台 Group 管理页面复制。 ![]() |
订阅消息
do {List<Message> messages = null;try {// 长轮询消费消息// 长轮询表示如果 topic 没有消息则请求会在服务端等待,如果有消息可以消费则立即返回// 如果对消费延迟比较敏感,强烈建议使用多线程并发拉取消息messages = consumer.consumeMessage(Integer.parseInt(batchSize),Integer.parseInt(waitSeconds));} catch (Throwable e) {e.printStackTrace();}if (messages == null || messages.isEmpty()) {System.out.println(Thread.currentThread().getName() + ": no new message, continue!");continue;}for (Message message : messages) {System.out.println("Receive message: " + message);}{List<String> handles = new ArrayList<String>();for (Message message : messages) {handles.add(message.getReceiptHandle());}try {consumer.ackMessage(handles);} catch (Throwable e) {if (e instanceof AckMessageException) {AckMessageException errors = (AckMessageException) e;System.out.println("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");if (errors.getErrorMessages() != null) {for (String errorHandle :errors.getErrorMessages().keySet()) {System.out.println("Handle:" + errorHandle + ", ErrorCode:" + errors.getErrorMessages().get(errorHandle).getErrorCode()+ ", ErrorMsg:" + errors.getErrorMessages().get(errorHandle).getErrorMessage());}}continue;}e.printStackTrace();}}} while (true);
参数 | 说明 |
batchSize | 一次拉取的消息条数,支持最多16条。 |
waitSeconds | 一次拉取的轮询等待时间,支持最长30秒。 |