前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RocketMQ消息轨迹【源码笔记】

RocketMQ消息轨迹【源码笔记】

作者头像
瓜农老梁
发布2019-08-23 16:33:51
1.3K0
发布2019-08-23 16:33:51
举报
文章被收录于专栏:瓜农老梁瓜农老梁

1.Broker配置 首先看下broker.conf配置的两个属性

属性

默认值

traceTopicEnable

false

msgTraceTopicName

RMQ_SYS_TRACE_TOPIC

在一个集群中可以配置一台机器专门负责消息轨迹的收集工作,该台机器上配置traceTopicEnable = true,borker启动的时候自动创建默认轨迹topic

TopicConfigManager.java构造方法,BrokerController在启动的时候,会初始化TopicConfigManager实现trace topic的创建工作

代码语言:javascript
复制
{
      if (this.brokerController.getBrokerConfig().isTraceTopicEnable()) {
                String topic = this.brokerController.getBrokerConfig().getMsgTraceTopicName();
                TopicConfig topicConfig = new TopicConfig(topic);
                this.systemTopicList.add(topic);
                topicConfig.setReadQueueNums(1);
                topicConfig.setWriteQueueNums(1);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
       }
 }

2.客户端发送实现

客户端发送

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true);

代码语言:javascript
复制
public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {
        this.producerGroup = producerGroup;
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
        //if client open the message trace feature
        if (enableMsgTrace) {
            try {
                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
                dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
                traceDispatcher = dispatcher;
                //为消息轨迹注册hook,在消息发送前执行
                this.getDefaultMQProducerImpl().registerSendMessageHook(
                    new SendMessageTraceHookImpl(traceDispatcher));
            } catch (Throwable e) {
                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
            }
        }
  }

SendMessageTraceHookImpl 实现了SendMessageHook接口,在消息发送前后会被调用 AsyncTraceDispatcher 主要负责消息的发送工作;内部队列,由线程池批量(100条)发送

Hook调用

发送前hook调用

代码语言:javascript
复制
//如果有hook在消息发送前执行,消息轨迹通过这种方式记录
if (this.hasSendMessageHook()) {
    context = new SendMessageContext();
    context.setProducer(this); //发送对象
    context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); //生产组
    context.setCommunicationMode(communicationMode); //发送模式
    context.setBornHost(this.defaultMQProducer.getClientIP()); //客户端IP
    context.setBrokerAddr(brokerAddr); //发往broker的地址
    context.setMessage(msg); //消息内容
    context.setMq(mq); //消息 Queue
    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
    if (isTrans != null && isTrans.equals("true")) {
        context.setMsgType(MessageType.Trans_Msg_Half);
    }

    if (msg.getProperty("__STARTDELIVERTIME") != null ||                msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
        context.setMsgType(MessageType.Delay_Msg);
    }
    this.executeSendMessageHookBefore(context); //执行自定义个hook业务
}

发送后hook调用

代码语言:javascript
复制
//消息发送后执行的hook,消息轨迹会调用
if (this.hasSendMessageHook()) {
    context.setSendResult(sendResult);
    this.executeSendMessageHookAfter(context);
}

发送轨迹

Producer启动时注册钩子,该钩子持有负责消息发送的AsyncTraceDispatcher实例,消息发送后进而发送消息轨迹

发送轨迹的消息格式

3.客户端消费轨迹实现

消费轨迹:与消息发送的轨迹实现思路相同

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true);

注册消费钩子 ConsumeMessageTraceHookImpl实现了ConsumeMessageHook,在消费的前后会进行回调

代码语言:javascript
复制
public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
    AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic){
    this.consumerGroup = consumerGroup;
    this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
    defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
    if (enableMsgTrace) {
        try {
            AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
            dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
            traceDispatcher = dispatcher;
            //注册消费hook
            this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
                new ConsumeMessageTraceHookImpl(traceDispatcher));
        } catch (Throwable e) {
            log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
        }
    }
}

ConsumeMessageConcurrentlyService.ConsumeRequest.run消费前执行

代码语言:javascript
复制
//消费前执行hook 消费轨迹会执行
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
    consumeMessageContext = new ConsumeMessageContext();
    consumeMessageContext.setConsumerGroup(defaultMQPushConsumer.getConsumerGroup());
    consumeMessageContext.setProps(new HashMap<String, String>());
    consumeMessageContext.setMq(messageQueue);
    consumeMessageContext.setMsgList(msgs);
    consumeMessageContext.setSuccess(false); //消费状态
    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

消费后执行

代码语言:javascript
复制
if (ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.hasHook()) {
    consumeMessageContext.setStatus(status.toString());
    consumeMessageContext.setSuccess(ConsumeConcurrentlyStatus.CONSUME_SUCCESS == status);
    ConsumeMessageConcurrentlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

消费轨迹格式

分为两部分,一部分为消费前,一部分为消费后

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 瓜农老梁 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档