前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >CousumeQueue中tag的作用

CousumeQueue中tag的作用

作者头像
CBeann
发布2023-12-25 19:19:09
1150
发布2023-12-25 19:19:09
举报
文章被收录于专栏:CBeann的博客CBeann的博客

问题的提出

存在就是有意义的,那么ConsumeQueue中存消息tag的hashcode是什么目的呢? 查到的资料是用于消息的过滤,因为Consumer可以根据主题和tag消费消息

代码语言:javascript
复制
 consumer.subscribe("TopicTest", "TagA");

那么在消息过滤是在broker还是Conumser呢?按照常理是在broker,因为在broker可以减少流量,实际情况是在broker过滤大部分,Consumer过滤一小部分

ConsumeQueue的结构

ConsumeQueue存的是主题的逻辑信息,如下图所示,代表一条记录。其中记录的信息存储在commitLog中,位置是CommitLog Offset。

在这里插入图片描述
在这里插入图片描述

Offset用于标记消息在CommitLog中的位置 Size用于标记消息的大小 HashCode用于过滤消息

源码跟踪

SubscriptionData的构建(Consumer启动)

Consumer一般会有订阅的主题和tag

代码语言:javascript
复制
consumer.subscribe("TopicTest", "TagA");

跟进去会跟到FilterAPI的buildSubscriptionData方法

代码语言:javascript
复制
public static SubscriptionData buildSubscriptionData(String topic, String subString) throws Exception {
        SubscriptionData subscriptionData = new SubscriptionData();
        subscriptionData.setTopic(topic);
        subscriptionData.setSubString(subString);

        if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {
            subscriptionData.setSubString(SubscriptionData.SUB_ALL);
        } else {
            String[] tags = subString.split("\\|\\|");
            if (tags.length > 0) {
                for (String tag : tags) {
                    if (tag.length() > 0) {
                        String trimString = tag.trim();
                        if (trimString.length() > 0) {
                            //添加tag的set
                            //添加tag的set
                            //添加tag的set
                            subscriptionData.getTagsSet().add(trimString);
                            
                            //添加tag的hashcode的set
                            //添加tag的hashcode的set
                            //添加tag的hashcode的set
                            subscriptionData.getCodeSet().add(trimString.hashCode());
                        }
                    }
                }
            } else {
                throw new Exception("subString split error");
            }
        }

        return subscriptionData;
    }

总结:SubscriptionData包含了tag列表和tag的hashcode列表

broker过滤消息

首先Consumer给broker发送消息,请求code是 RequestCode.PULL_MESSAGE ,因此我们可以跟borker里对这个请求码的处理的processor,最后定位到 PullMessageProcessor###processRequest方法,方法里有如下的代码

代码语言:javascript
复制
final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

跟DefaultMessageStore###getMessage方法

代码语言:javascript
复制
 public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,
        final int maxMsgNums,
        final MessageFilter messageFilter) {
        //省略        
        
                        for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                            //获取消息的偏移量
                            long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                             //获取消息的大小
                            int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
                            //获取消息的tag的hashcode
                            long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

                            maxPhyOffsetPulling = offsetPy;

                            if (nextPhyFileStartOffset != Long.MIN_VALUE) {
                                if (offsetPy < nextPhyFileStartOffset)
                                    continue;
                            }


                            //省略
                            //省略
                            //省略
                            

                             //查看消息tag是否匹配,此时在broker实现过滤
                             //查看消息tag是否匹配,此时在broker实现过滤
                             //查看消息tag是否匹配,此时在broker实现过滤
                            if (messageFilter != null
                                && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {
                                if (getResult.getBufferTotalSize() == 0) {
                                    status = GetMessageStatus.NO_MATCHED_MESSAGE;
                                }

                                continue;
                            }
                              

                          //省略
                          //省略
                          //省略
        return getResult;
    }

跟进匹配方法,此时能发现过滤方法是看subscriptionData里是否有包含tagsCode

代码语言:javascript
复制
//ExpressionMessageFilter###

```java
 @Override
    public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
        
            //省略
            //省略
            //省略
        
            //订阅主题里是否包含这个hashcode
            return subscriptionData.getCodeSet().contains(tagsCode.intValue());
        } else {
           //省略
    }

总结:broker是根据subscriptionData里的tag的hashcode列表去过滤消息,判断从ConsumeQueue中读取的tag的hashcode是否在subscriptionData里的tag的hashcode列表中。

Consumer过滤消息

Consumer开始跟的地方在DefaultMQPushConsumerImpl###pullMessage方法里有一个PullCallback,此方法是一个给broker发送拉取消息后的一个回调方法

代码语言:javascript
复制
PullCallback pullCallback = new PullCallback() {
@Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

         //省略
}

跟一下PullAPIWrapper###processPullResult方法

代码语言:javascript
复制
public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
       
              //省略


          
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                    //Consumer端过滤消息
                    //Consumer端过滤消息
                    //Consumer端过滤消息
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }

            
           
           //省略

        return pullResult;
    }

总结:broker端的消息过滤是通过看subscriptionData里的tag列表是否含有当前消息的tag

总结:broker和Consuemr都会过滤

(1)在Consumer启动的时候会传入topic和tag,然后把tag的string和hashcode封装到SubscriptionData中。 (2)当Conumser去broker拉消息的时候,查看SubscriptionData中的hashcode列表和Consumequeue中读取到的tag.hashcode是否一致,这个地方可以过滤大部分的消息。这是第一次过滤。 (3)当(2)通过过滤的消息会发送到Consumer,Consumer则会SubscriptionData中的tag列表中查看是否和当前tag匹配,这是第二次过滤。

参考

https://www.bilibili.com/video/BV1fE411V7Ho?p=8 (1小时5分开始)

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-03-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 问题的提出
  • ConsumeQueue的结构
  • 源码跟踪
    • SubscriptionData的构建(Consumer启动)
      • broker过滤消息
        • Consumer过滤消息
        • 总结:broker和Consuemr都会过滤
        • 参考
        相关产品与服务
        对象存储
        对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档