查询消息

最近更新时间:2023-12-08 18:07:11

我的收藏
当一条消息从生产者发送到 TDMQ RocketMQ 版服务端,再由消费者进行消费,TDMQ RocketMQ 版会完整记录这条消息中间的流转过程,并以消息轨迹的形式呈现在控制台。 消息轨迹记录了消息从生产端到 TDMQ RocketMQ 版服务端,最后到消费端的整个过程,包括各阶段的时间(精确到微秒)、执行结果、生产者 IP、消费者 IP 等。



操作场景

当您需要排查以下问题时,就可以使用 TDMQ RocketMQ 版控制台的消息查询功能,按照时间维度或者根据日志中查到的消息 ID/消息 Key,来查看具体某条消息的消息内容、消息参数和消息轨迹。
查看某条消息的具体内容,具体参数。
查看消息由哪个生产 IP 发送,是否发送成功,消息到服务端的具体时间。
查看消息是否已持久化。
查看消费有哪些消费者消费了,是否消费成功,消息确认消费的具体时间。
需要做分布式系统的性能分析,查看 MQ 对相关消息处理的时延。

查询限制

消息查询最多可以查询近3天的消息。

前提条件

已经参考 SDK 文档 部署好生产端和消费端服务,并在3天内有消息生产和消费。
2022年8月8日部分地区上线新的虚拟集群 RocketMQ 服务,由于新版的集群与旧版集群的消息轨迹实现不同,所以在使用上有些许差异。旧版集群消息轨迹由服务端实现,客户端无需关注消息轨迹相关设置,新版集群需要在客户端设置开启消息轨迹功能,具体设置示例如下:
生产者设置
push 消费者设置
pull 消费者设置
DefaultMQProducer producer = new DefaultMQProducer(namespace, groupName,
// ACL权限
new AclClientRPCHook(new SessionCredentials(AK, SK)), true, null);
// 实例化消费者
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(NAMESPACE,groupName,
new AclClientRPCHook(new SessionCredentials(AK, SK)),
new AllocateMessageQueueAveragely(), true, null);
DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(NAMESPACE,groupName,
new AclClientRPCHook(new SessionCredentials(AK, SK)));
// 设置NameServer的地址
pullConsumer.setNamesrvAddr(NAMESERVER);
pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
pullConsumer.setAutoCommit(false);
pullConsumer.setEnableMsgTrace(true);
pullConsumer.setCustomizedTraceTopic(null);
说明:
如果生产者和消费者都开启了轨迹后,依然无法查询到消息轨迹和消息的消费状态,可能是由于客户端 SDK 的版本过低,推荐使用 4.9.x 及以上版本。
如果是使用 Spring Boot Starter 接入(2.2.2版本及以上),具体的代码参考:
package com.lazycece.sbac.rocketmq.messagemodel;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
* @author lazycece
* @date 2019/8/21
*/
@Slf4j
@Component
public class MessageModelConsumer {

@Component
@RocketMQMessageListener(
topic = "topic-message-model",
consumerGroup = "message-model-consumer-group",
enableMsgTrace = true,
messageModel = MessageModel.CLUSTERING)
public class ConsumerOne implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("ConsumerOne: {}", message);
}
}

}

操作步骤

1. 登录 TDMQ RocketMQ 控制台,在左侧导航栏单击消息查询
2. 在消息查询页面,选择好地域后根据页面提示输入查询条件。
时间范围:选择需要查询的时间范围,支持近100条(默认按时间顺序展示最近的 100 条消息),近30分钟,近1小时,近6小时,近24小时,近3天和自定义时间范围。
当前集群:选择需要查询的 Topic 所在的集群。
命名空间:选择需要查询的 Topic 所在的命名空间。
Topic:选择需要查询的 Topic。
查询方式:消息查询功能支持两种查询方式。
按消息 ID 查询:该方式属于精确查询、速度快、精确匹配。
按消息 Key 查询:该方式属于模糊查询,适用于您没有记录消息 ID 但是设置了消息 Key 的场景。
按消息 Tag 查询:该方式属于模糊查询,适用于您没有记录消息 ID 但是设置了消息 Tag 的场景。单次查询展示前 64 条消息,如果希望查询后续消息,请指定更精确的时间范围重新查询。
3. 单击查询,下方列表会展示所有查询到的结果并分页展示。



4. 找到您希望查看内容或参数的消息,单击操作列的查看详情,即可查看消息的基本信息、内容(消息体)、详情参数和消费状态。
在消费状态模块,您可以查看到消费该消息的 Group 以及消费状态,同时还可以在操作栏进行如下操作:
重新发送:将消息重新发送到指定的客户端,如消息已消费成功,重新发送该消息可能导致消费重复。
异常诊断:若消费异常,可以查看异常诊断信息。



说明:
消费模式为 “广播消费” 的 Group 下如果没有在线的客户端,则不会在上图的详情页展示消费信息和异常诊断信息;如果您需要查看消费状态,可以前往 “消息轨迹” 页进行查看。
5. 单击操作列的查看消息轨迹,或者在详情页单击 Tab 栏的消息轨迹,即可查看该消息的消息轨迹(详细说明请参见 消息轨迹查询结果说明)。



消费验证

在查询到某条消息后,您可以单击操作列的消费验证将该条消息发送到指定的客户端来验证该消息。该功能可能会导致消息重复。
说明
当前仅专享集群支持消费验证功能。消费验证功能仅用于验证客户端的消费逻辑是否正常,并不会影响正常的收消息流程,因此消息的消费状态等信息在消费验证后并不会改变。
消费验证功能目前暂不支持发送到 Python 和 C++ 客户端。




导出消息

在查询到某条消息后,您可以单击操作列的 导出消息 将该条消息的消息体,消息Tag,消息Key,消息生产时间 和消费属性等。



同时,您可以批量选择当前页面内的消息,批量导出多条消息。将消息导出到本地后,用户可以根据需要对消息进行一些处理,如复制消息体或者进行时间排序等操作。