聊聊rocketmq的enableMsgTrace

本文主要研究一下rocketmq的enableMsgTrace

enableMsgTrace

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {
​
    private final InternalLogger log = ClientLogger.getLog();
​
    //......
​
    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook,
        boolean enableMsgTrace, final String customizedTraceTopic) {
        this.namespace = namespace;
        this.producerGroup = producerGroup;
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
        //if client open the message trace feature
        if (enableMsgTrace) {
            try {
                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
                dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
                traceDispatcher = dispatcher;
                this.getDefaultMQProducerImpl().registerSendMessageHook(
                    new SendMessageTraceHookImpl(traceDispatcher));
            } catch (Throwable e) {
                log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
            }
        }
    }
​
    //......
}
  • DefaultMQProducer的构造器在enableMsgTrace为true时会创建AsyncTraceDispatcher,再创建SendMessageTraceHookImpl,然后执行getDefaultMQProducerImpl().registerSendMessageHook

SendMessageHook

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/hook/SendMessageHook.java

public interface SendMessageHook {
    String hookName();
​
    void sendMessageBefore(final SendMessageContext context);
​
    void sendMessageAfter(final SendMessageContext context);
}
  • SendMessageHook定义了hookName、sendMessageBefore、sendMessageAfter方法

SendMessageTraceHookImpl

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/hook/SendMessageTraceHookImpl.java

public class SendMessageTraceHookImpl implements SendMessageHook {
​
    private TraceDispatcher localDispatcher;
​
    public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {
        this.localDispatcher = localDispatcher;
    }
​
    @Override
    public String hookName() {
        return "SendMessageTraceHook";
    }
​
    @Override
    public void sendMessageBefore(SendMessageContext context) {
        //if it is message trace data,then it doesn't recorded
        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {
            return;
        }
        //build the context content of TuxeTraceContext
        TraceContext tuxeContext = new TraceContext();
        tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
        context.setMqTraceContext(tuxeContext);
        tuxeContext.setTraceType(TraceType.Pub);
        tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));
        //build the data bean object of message trace
        TraceBean traceBean = new TraceBean();
        traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));
        traceBean.setTags(context.getMessage().getTags());
        traceBean.setKeys(context.getMessage().getKeys());
        traceBean.setStoreHost(context.getBrokerAddr());
        traceBean.setBodyLength(context.getMessage().getBody().length);
        traceBean.setMsgType(context.getMsgType());
        tuxeContext.getTraceBeans().add(traceBean);
    }
​
    @Override
    public void sendMessageAfter(SendMessageContext context) {
        //if it is message trace data,then it doesn't recorded
        if (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
            || context.getMqTraceContext() == null) {
            return;
        }
        if (context.getSendResult() == null) {
            return;
        }
​
        if (context.getSendResult().getRegionId() == null
            || !context.getSendResult().isTraceOn()) {
            // if switch is false,skip it
            return;
        }
​
        TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
        TraceBean traceBean = tuxeContext.getTraceBeans().get(0);
        int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
        tuxeContext.setCostTime(costTime);
        if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
            tuxeContext.setSuccess(true);
        } else {
            tuxeContext.setSuccess(false);
        }
        tuxeContext.setRegionId(context.getSendResult().getRegionId());
        traceBean.setMsgId(context.getSendResult().getMsgId());
        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
        traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);
        localDispatcher.append(tuxeContext);
    }
}
  • SendMessageTraceHookImpl实现了SendMessageHook接口,其构造器接收TraceDispatcher参数;其hookName为SendMessageTraceHook
  • 其sendMessageBefore方法会判断topic名是否是AsyncTraceDispatcher要trace的,不是则返回,是的话则构造TraceContext,添加TraceBean
  • 其sendMessageAfter方法会判断topic名是否是AsyncTraceDispatcher要trace的,不是则返回,是的话则判断context.getSendResult()是否为null,是则返回,不是接着判断context.getSendResult().getRegionId()为null或者context.getSendResult().isTraceOn()为false则返回;最后获取TraceContext,更新TraceBean,然后将TraceContext追加到TraceDispatcher

TraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java

public interface TraceDispatcher {
​
    /**
     * Initialize asynchronous transfer data module
     */
    void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;
​
    /**
     * Append the transfering data
     * @param ctx data infomation
     * @return
     */
    boolean append(Object ctx);
​
    /**
     * Write flush action
     *
     * @throws IOException
     */
    void flush() throws IOException;
​
    /**
     * Close the trace Hook
     */
    void shutdown();
}
  • TraceDispatcher接口定义了start、append、flush、shutdown方法

AsyncTraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

public class AsyncTraceDispatcher implements TraceDispatcher {
​
    private final static InternalLogger log = ClientLogger.getLog();
    private final int queueSize;
    private final int batchSize;
    private final int maxMsgSize;
    private final DefaultMQProducer traceProducer;
    private final ThreadPoolExecutor traceExecutor;
    // The last discard number of log
    private AtomicLong discardCount;
    private Thread worker;
    private ArrayBlockingQueue<TraceContext> traceContextQueue;
    private ArrayBlockingQueue<Runnable> appenderQueue;
    private volatile Thread shutDownHook;
    private volatile boolean stopped = false;
    private DefaultMQProducerImpl hostProducer;
    private DefaultMQPushConsumerImpl hostConsumer;
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private String dispatcherId = UUID.randomUUID().toString();
    private String traceTopicName;
    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AccessChannel accessChannel = AccessChannel.LOCAL;
​
    public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
        // queueSize is greater than or equal to the n power of 2 of value
        this.queueSize = 2048;
        this.batchSize = 100;
        this.maxMsgSize = 128000;
        this.discardCount = new AtomicLong(0L);
        this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
        this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
        if (!UtilAll.isBlank(traceTopicName)) {
            this.traceTopicName = traceTopicName;
        } else {
            this.traceTopicName = MixAll.RMQ_SYS_TRACE_TOPIC;
        }
        this.traceExecutor = new ThreadPoolExecutor(//
            10, //
            20, //
            1000 * 60, //
            TimeUnit.MILLISECONDS, //
            this.appenderQueue, //
            new ThreadFactoryImpl("MQTraceSendThread_"));
        traceProducer = getAndCreateTraceProducer(rpcHook);
    }
​
    public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
        if (isStarted.compareAndSet(false, true)) {
            traceProducer.setNamesrvAddr(nameSrvAddr);
            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
            traceProducer.start();
        }
        this.accessChannel = accessChannel;
        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
        this.worker.setDaemon(true);
        this.worker.start();
        this.registerShutDownHook();
    }
​
    private DefaultMQProducer getAndCreateTraceProducer(RPCHook rpcHook) {
        DefaultMQProducer traceProducerInstance = this.traceProducer;
        if (traceProducerInstance == null) {
            traceProducerInstance = new DefaultMQProducer(rpcHook);
            traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
            traceProducerInstance.setSendMsgTimeout(5000);
            traceProducerInstance.setVipChannelEnabled(false);
            // The max size of message is 128K
            traceProducerInstance.setMaxMessageSize(maxMsgSize - 10 * 1000);
        }
        return traceProducerInstance;
    }
​
    @Override
    public boolean append(final Object ctx) {
        boolean result = traceContextQueue.offer((TraceContext) ctx);
        if (!result) {
            log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
        }
        return result;
    }
​
    @Override
    public void flush() throws IOException {
        // The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
        long end = System.currentTimeMillis() + 500;
        while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                break;
            }
        }
        log.info("------end trace send " + traceContextQueue.size() + "   " + appenderQueue.size());
    }
​
    @Override
    public void shutdown() {
        this.stopped = true;
        this.traceExecutor.shutdown();
        if (isStarted.get()) {
            traceProducer.shutdown();
        }
        this.removeShutdownHook();
    }
​
    public void registerShutDownHook() {
        if (shutDownHook == null) {
            shutDownHook = new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
​
                @Override
                public void run() {
                    synchronized (this) {
                        if (!this.hasShutdown) {
                            try {
                                flush();
                            } catch (IOException e) {
                                log.error("system MQTrace hook shutdown failed ,maybe loss some trace data");
                            }
                        }
                    }
                }
            }, "ShutdownHookMQTrace");
            Runtime.getRuntime().addShutdownHook(shutDownHook);
        }
    }
​
    public void removeShutdownHook() {
        if (shutDownHook != null) {
            Runtime.getRuntime().removeShutdownHook(shutDownHook);
        }
    }
​
    //......
}
  • AsyncTraceDispatcher的构造器创建了traceContextQueue及traceExecutor;append方法会往traceContextQueue添加TraceContext,如果添加不进去则递增discardCount,同时打印info日志
  • 其start方法创建并执行AsyncRunnable,同时执行了registerShutDownHook,该shutDownHook会在shutdown时执行flush;flush方法会不断循环等待traceContextQueue及appenderQueue队列大小为0,但整个等待时间不超过500ms
  • 其shutdown方法会执行traceExecutor.shutdown()、traceProducer.shutdown()、removeShutdownHook方法;removeShutdownHook方法会将shutDownHook从Runtime.getRuntime()中移除

AsyncRunnable

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

    class AsyncRunnable implements Runnable {
        private boolean stopped;
​
        @Override
        public void run() {
            while (!stopped) {
                List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
                for (int i = 0; i < batchSize; i++) {
                    TraceContext context = null;
                    try {
                        //get trace data element from blocking Queue — traceContextQueue
                        context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                    }
                    if (context != null) {
                        contexts.add(context);
                    } else {
                        break;
                    }
                }
                if (contexts.size() > 0) {
                    AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
                    traceExecutor.submit(request);
                } else if (AsyncTraceDispatcher.this.stopped) {
                    this.stopped = true;
                }
            }
​
        }
    }
  • AsyncRunnable实现了Runnable接口,其run方法按batchSize循环从traceContextQueue拉取元素添加到contexts;接着创建AsyncAppenderRequest提交traceExecutor中

AsyncAppenderRequest

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

    class AsyncAppenderRequest implements Runnable {
        List<TraceContext> contextList;
​
        public AsyncAppenderRequest(final List<TraceContext> contextList) {
            if (contextList != null) {
                this.contextList = contextList;
            } else {
                this.contextList = new ArrayList<TraceContext>(1);
            }
        }
​
        @Override
        public void run() {
            sendTraceData(contextList);
        }
​
        public void sendTraceData(List<TraceContext> contextList) {
            Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();
            for (TraceContext context : contextList) {
                if (context.getTraceBeans().isEmpty()) {
                    continue;
                }
                // Topic value corresponding to original message entity content
                String topic = context.getTraceBeans().get(0).getTopic();
                String regionId = context.getRegionId();
                // Use  original message entity's topic as key
                String key = topic;
                if (!StringUtils.isBlank(regionId)) {
                    key = key + TraceConstants.CONTENT_SPLITOR + regionId;
                }
                List<TraceTransferBean> transBeanList = transBeanMap.get(key);
                if (transBeanList == null) {
                    transBeanList = new ArrayList<TraceTransferBean>();
                    transBeanMap.put(key, transBeanList);
                }
                TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
                transBeanList.add(traceData);
            }
            for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
                String[] key = entry.getKey().split(String.valueOf(TraceConstants.CONTENT_SPLITOR));
                String dataTopic = entry.getKey();
                String regionId = null;
                if (key.length > 1) {
                    dataTopic = key[0];
                    regionId = key[1];
                }
                flushData(entry.getValue(), dataTopic, regionId);
            }
        }
​
        /**
         * Batch sending data actually
         */
        private void flushData(List<TraceTransferBean> transBeanList, String dataTopic, String regionId) {
            if (transBeanList.size() == 0) {
                return;
            }
            // Temporary buffer
            StringBuilder buffer = new StringBuilder(1024);
            int count = 0;
            Set<String> keySet = new HashSet<String>();
​
            for (TraceTransferBean bean : transBeanList) {
                // Keyset of message trace includes msgId of or original message
                keySet.addAll(bean.getTransKey());
                buffer.append(bean.getTransData());
                count++;
                // Ensure that the size of the package should not exceed the upper limit.
                if (buffer.length() >= traceProducer.getMaxMessageSize()) {
                    sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
                    // Clear temporary buffer after finishing
                    buffer.delete(0, buffer.length());
                    keySet.clear();
                    count = 0;
                }
            }
            if (count > 0) {
                sendTraceDataByMQ(keySet, buffer.toString(), dataTopic, regionId);
            }
            transBeanList.clear();
        }
​
        /**
         * Send message trace data
         *
         * @param keySet the keyset in this batch(including msgId in original message not offsetMsgId)
         * @param data the message trace data in this batch
         */
        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
            String traceTopic = traceTopicName;
            if (AccessChannel.CLOUD == accessChannel) {
                traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
            }
            final Message message = new Message(traceTopic, data.getBytes());
            // Keyset of message trace includes msgId of or original message
            message.setKeys(keySet);
            try {
                Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), traceTopic);
                SendCallback callback = new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
​
                    }
​
                    @Override
                    public void onException(Throwable e) {
                        log.info("send trace data ,the traceData is " + data);
                    }
                };
                if (traceBrokerSet.isEmpty()) {
                    // No cross set
                    traceProducer.send(message, callback, 5000);
                } else {
                    traceProducer.send(message, new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Set<String> brokerSet = (Set<String>) arg;
                            List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
                            for (MessageQueue queue : mqs) {
                                if (brokerSet.contains(queue.getBrokerName())) {
                                    filterMqs.add(queue);
                                }
                            }
                            int index = sendWhichQueue.getAndIncrement();
                            int pos = Math.abs(index) % filterMqs.size();
                            if (pos < 0) {
                                pos = 0;
                            }
                            return filterMqs.get(pos);
                        }
                    }, traceBrokerSet, callback);
                }
​
            } catch (Exception e) {
                log.info("send trace data,the traceData is" + data);
            }
        }
​
        private Set<String> tryGetMessageQueueBrokerSet(DefaultMQProducerImpl producer, String topic) {
            Set<String> brokerSet = new HashSet<String>();
            TopicPublishInfo topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
            if (null == topicPublishInfo || !topicPublishInfo.ok()) {
                producer.getTopicPublishInfoTable().putIfAbsent(topic, new TopicPublishInfo());
                producer.getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
                topicPublishInfo = producer.getTopicPublishInfoTable().get(topic);
            }
            if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
                for (MessageQueue queue : topicPublishInfo.getMessageQueueList()) {
                    brokerSet.add(queue.getBrokerName());
                }
            }
            return brokerSet;
        }
    }
  • AsyncAppenderRequest实现了Runnable接口,其run方法执行sendTraceData;该方法遍历contextList,将TraceTransferBean按topic归类到transBeanMap;之后遍历transBeanMap,执行flushData;
  • flushData方法遍历transBeanList,将transData添加到StringBuilder,如果buffer大小大于等于traceProducer.getMaxMessageSize()则执行sendTraceDataByMQ,并重置count,遍历完之后再次判断count是否大于0,是则再次执行sendTraceDataByMQ方法
  • sendTraceDataByMQ方法首先通过tryGetMessageQueueBrokerSet获取traceBrokerSet,如果traceBrokerSet为空则执行traceProducer.send(message, callback, 5000),否则创建MessageQueueSelector再执行send方法

小结

DefaultMQProducer的构造器在enableMsgTrace为true时会创建AsyncTraceDispatcher,再创建SendMessageTraceHookImpl,然后执行getDefaultMQProducerImpl().registerSendMessageHook

doc

原创声明,本文系作者授权云+社区发表,未经许可,不得转载。

如有侵权,请联系 yunjia_community@tencent.com 删除。

编辑于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏业余草

面试再问ThreadLocal,别说你不会

以前面试的时候问到ThreadLocal总是一脸懵逼,只知道有这个哥们,不了解他是用来做什么的,更不清楚他的原理了。表面上看他是和多线程,线程同步有关的一个工具...

6910
来自专栏芋道源码1024

一个 Java 对象到底有多大?

编写Java代码的时候,大多数情况下,我们很少关注一个Java对象究竟有多大(占据多少内存),更多的是关注业务与逻辑。但是殊不知,在我们不经意间,大量的内存被无...

10410
来自专栏丑胖侠

面试官,Java8 JVM内存结构变了,永久代到元空间

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

10660
来自专栏北京宏哥

Java自动化测试框架-04 - TestNG之Test Method篇 - 道法自然,法力无边(详细教程)

测试方法是可以带有参数的。每个测试方法都可以带有任意数量的参数,并且可以通过使用TestNG的@Parameters向方法传递正确的参数。

13020
来自专栏Java3y

慌了!面试官又拿JVM开怼!

对于Java人来说,JVM无疑是进阶时必须迈过的坎。不管初入职场还是跳槽升职,JVM更是面试时的必考题。如果不懂JVM的话,薪酬会非常吃亏(近70%的面试者挂在...

8440
来自专栏码洞

每个阿里程序员都必须搞懂的Maven基础知识

以前我们写代码时,jar包都默认放在一个叫 /lib 的目录下,然后把该目录设置为classpath可以读取到的目录,如下图所示:

9320
来自专栏Java识堂

帮你少写一大半参数校验代码的小技巧

几乎每个web网站都会对用户提交的参数进行校验,前端要做,后端也要做。防止用户直接通过接口调用的方式来请求或保存数据,从而导致产生脏数据等其他严重的后果。

10520
来自专栏搜云库技术团队

常问的15个顶级Java多线程面试题

在任何Java面试当中多线程和并发方面的问题都是必不可少的一部分。如果你想获得更多职位,那么你应该准备很多关于多线程的问题。

10530
来自专栏Crossin的编程教室

【Python 第75课】可迭代对象和迭代器

for 循环是我们在 Python 里非常常用的一个语法,但你有没有思考过 for 循环是怎样实现的?

10520
来自专栏JAVA杂谈

Java日志Log4j或者Logback的NDC和MDC功能

Java中使用的日志的实现框架有很多种,常用的log4j和logback以及java.util.logging,而log4j是apache实现的一个开源日志组件...

15120

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励