前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >消息过滤

消息过滤

作者头像
林一
发布2018-07-24 16:08:44
3K0
发布2018-07-24 16:08:44
举报
文章被收录于专栏:MessageQueue

在MQ模型中,一般都会有Topic模型,Topic表示一类消息的集合。

在实际应用中,往往对一个Topic下的消息还会有不同的细分,消费方会根据细分的类型消费Topic中特定的一部分消息,这就涉及到了消息过滤。

比如对于交易的Topic,内部可能有下单消息、支付消息。其中支付系统只希望消费到交易Topic下的支付消息,面对这个需求,我们应该如何在自己的MQ中去满足呢?

(图片引用自阿里云)

业界实现

RocketMQ

RocketMQ支持在发送消息的时候给消息增加Tag(Tag可以理解为sub-topic,即在Topic下再对消息类型进行区分)。

大多数场景下tag使用起来非常简单,如:

代码语言:javascript
复制
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

Consumer将收到Topic下Tag包含TAGA或TAGB或TAGC的消息。

对于Tag过滤的限制是一条消息只能有一个Tag,这在一些复杂场景下可能没办法满足需求。

RocketMQ提供另一种过滤方式:SQL92

代码语言:javascript
复制
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

仅消费消息属性包含a,且a的值在0-3之间的消息。

RocketMQ对消息过滤的支持比较完善了,通过SQL92这种方式可以满足各种复杂场景的需求了。

Kafka

Kafka目前并没有支持消息过滤,即没有在Topic下提供细分的类型来区分消息。

用户可以在Kafka Streams中实现过滤。

问题分析

大致了解消息过滤的定义和业界的支持情况之后,回头再思考一下,为什么MQ需要做消息过滤、MQ的过滤应该做到什么程度(用使者需要怎么样的过滤方式呢)?试着回答以下的一些问题来弄清楚需求,划清楚问题边界。

1. 为什么需要消息过滤?

业务方(MQ使用方)过滤数据的需求是天然存在的,比如Topic模型也是一种过滤,从众多的数据中订阅自己需要的一部分数据。

在上面这个前提下,逆向考虑这个问题:如果MQ不支持消息过滤(这里的过滤只Topic下的消息细分)但使用方又有过滤的需求,那么会出现什么情况?或者说业务方会怎么去解决这个问题?

可以猜想大致会出现以下两种情况:

  1. 细分Topic,即将Topic再拆分的细一些,把二级类型直接作为Topic
  2. 在Consumer的消费逻辑中根据消息的属性或者内容决定是否过滤消息

第一种情况在一些场景下实际上是无法做到的,比如本文开头的交易场景的例子。一旦对Topic进行了拆分,那么细分后的数据之间的消息顺序就无法保证了,但对于一个订单,它的下单、支付等操作显然是需要顺序被处理的。

对于第二种情况,这也是业务方唯一能做的事情了。当然,也是最灵活的过滤方式了,业务方可以根据自己的需求制定过滤策略。但是带来的问题是所有的消息都需要从服务端先取回到客户端,这里的带宽浪费是比较严重的(取了大量客户端不需要的数据)。

2. MQ对于过滤的需求需要支持到什么程度呢?

对于这个问题,我在思考的时候考虑的是以下几个点:

  1. 业务方的过滤需求有哪些类型,是否可以穷举
  2. MQ的过滤功能能否覆盖掉用户的所有需求
  3. 以及支持消息过滤的成本

显然,用户的过滤需求难以穷举,且业务在不断的变化。但是通过像SQL这样的方式,我们可以认为覆盖了用户所有的过滤需求(就像查MySQL数据,可以组合各种SQL来完成目标数据的获取)。然而还需要去考虑成本的问题,比如机器成本、过滤对消息RT的影响等等。

所以在MQ的消息过滤中,我们期望能在成本和过滤能力之间找到一个平衡点,既能较好的支撑业务的过滤需求同时付出的成本在可接受范围内。

上面这句话的具体含义可以这样理解:

  1. 对消息的写入和消费的RT影响可以忽略
  2. 没有额外的资源需求(业务量不变的情况下,过滤功能不需要额外的机器资源投入)
  3. 覆盖业务的日常过滤需求(满足业务方90%以上的过滤需求)

站在巨人的肩膀上

在理解完需求且清除的知道我们要做什么之后,再来看一下业界“大佬”是怎么解决这个问题的。

RocketMQ Tag过滤

Message包含一个Tag属性,String类型,发送方可以进行设置,通常我们称为打标。

服务端在进行消息存储时,会将消息的Tag属性添加到消息索引中。Rocket的索引结构如下图:

索引元素包含三项内容:

  • offset:消息在存储文件中的偏移量
  • size:消息在存储文件中的大小
  • tag hashcode:消息的Tag属性的HashCode值

为什么这里存的是Tag的哈希值而不是Tag本身的值呢?

索引本身是为了加快消息的查询速度,所以它的元素是定长的,这就决定了无法在索引中直接存储Tag的值。

因为索引中存储了Tag的哈希值,那么在进行消息读取时就可以根据用户的订阅请求进行消息匹配(可以在不读取存储文件的情况下完成消息的匹配,且开销可以不计)。

但是因为这里比较的是HashCode,所以在消息返回到Consumer之后需要再进行一次真实值得比较,以避免消费到非期望的数据。

那么增加了Tag之后,消息的读取流程如下:

  1. 获取用户读取消息的请求中期望的Tag的HashCode(可以是多个且进行||或者&&的运算)
  2. 读取索引元素,对比HashCode是否满足用户的过滤需求
  3. 从存储文件读取满足HashCode过滤条件的消息内容返回给Consumer
  4. Consumer反序列化消息,对比Tag值进一步确认消息是否期望数据
RocketMQ SQL92过滤
  1. Broker通过Consumer的心跳,在ConsumerFilterManager组件中保存Consumer的过滤信息(Expression)
  2. 当Consumer尝试读取消息时,Broker构造MessageFilter来过滤需要的数据

RocketMQ SQL92过滤文档

Tag VS SQL92

Tag过滤

SQL过滤

覆盖场景

支持简单过滤(消息单Tag,可以订阅多Tag或按逻辑运算订阅Tag)

支持复杂过滤

实现成本

实现简单

实现复杂,涉及到SQL解析等

对服务端的影响

服务端只进行简单的long值比较,代价低

服务端需要复杂的计算,代价高

用户的使用成本

简单的Tag运算逻辑,对用户要求低

用户需要掌握一些SQL语法,相对来说复杂一些

结论:

  • SQL覆盖场景更多,满足用户所有需求
  • Tag覆盖场景少,但是无论从实现成本或者使用成本上都要小一些

所以在开发资源有限的情况下(比如没有足够的人手)要实现MQ中的过滤功能的话,Tag方式是一个更好的选择,SQL则作为不断完善的一个补充(没有也可以接受,有就最好了)。

“万恶”的业务方

“消息能不能支持多个Tag,这样发送的时候一条消息从不同的维度打上Tag来实现灵活的过滤需求”——from业务方

比如一条订单消息可以按照支付方式打标,也可以按照商品品类打表,这样订阅时可以灵活的过滤出目标数据。

代码语言:javascript
复制
message0.setTags(TagA);
message1.setTags(TagB);
message2.setTags(TagA,TagB,TagC);

consumer0.subscribe(topic, TagA&&TagB);==>message2
consumer1.subscribe(topic, TagA||TagC);==>message0, message2

一是用户提出了这样的需求,二是在支持Tag之后我们也会去考虑Tag的方式还有没有优化空间。

能不能支持一条消息有多个Tag?

消息多Tag的问题其实和索引中无法存储Tag原始值的问题是一致的,都是导致索引结构的变化:索引存Tag值或者存多个Tag的HashCode都会导致索引元素的长度不固定,进而无法快速定位消息。

此时最容易想到的方案就是扩展索引。

扩展实现多Tag

扩展索引的方式能保持消息索引依旧是定长的,把Tag相关的数据单独存储,只在有必要的时候读取Tag信息(用户有过滤需求时),如下图所示:

  • 索引定长的,那么读消息时依旧可以快速定位到消息
  • 只要过滤的情况下读取Tag信息,对于读流程多了一次读Tag的操作
  • 对于写流程,除了原本的写存储文件和写索引文件外,另外需要写一份Tag文件

这种方式实现多Tag需求是最直观的,缺陷也是最明显的:读写操作都多了一次Tag的操作。

更进一步,有没有办法在多Tag的情况下避免掉这一次Tag的读写操作呢?

不定长索引实现多Tag

既然不能独立出Tag的存储文件,那么只能直接扩展原来的索引文件了,直接将多个Tag的HashCode存到索引中。

面临的问题也非常清晰:不定长索引如何解决读取消息时索引的定位问题?

因为每个索引元素的长度是不确定的,当用户需要读Msg2时,就无法通过2*element size来计算索引位置。只能遍历索引了吗?但是遍历显然又是无法接受的!

思考一下写消息的过程,我们是怎么确定消息在文件中的写入位置的呢?——追加到末尾。追加的过程其实是记录上一条消息写入后的位置,那么当前的消息就从之前的位置继续写。

其实消息读取的过程也是一样的,虽然索引是不定长的,但是只要知道了上一条索引的位置和大小,就能定位到下一条消息索引的位置了。

  • 读第0条消息时,直接从索引的开头文件读取即可
  • 读第1条消息时,只需要知道第0条消息的大小即可:0 + msg0 size
  • 读第2条消息时,只需要知道第1条消息末尾的位置即可:msg1 offset + msg1 size
  • 读第N条消息时,只需知道第N-1条消息的索引的位置和大小即可

那么,在读取第N条消息时其实只要知道第N-1条消息的索引位置就能快速定位出第N条消息的索引。

而在消息的场景中,99.999%的情况下读完第N条消息时,下一次都会读取第N+1条。

只有在少数异常的情况下需要修改offset的信息来读取之前或者之后的消息(而这种异常的场景下,可以通过一些优化的手段减少扫描的索引未见的数量来查找速度)。

不定长索引的寻址过程如下:

此方案虽然解决了上一种扩展索引的方案带来的问题,但是并不能做到和RocketMQ通过SQL的方式支持灵活的过滤需求。

总结

本文从消息过滤的问题出发,介绍了RocketMQ的过滤功能实现,分析了消息过滤的需求,然后总结了不同的多Tag功能的实现方案。

对于消息过滤的实现,没有哪一种方案是完美的,我们应当从自身的场景出发,考虑现实面对的成本等问题,综合考虑,选择一种最适用于自身业务场景的方案。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-03-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 MessageQueue 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 业界实现
    • RocketMQ
      • Kafka
      • 问题分析
      • 站在巨人的肩膀上
        • RocketMQ Tag过滤
          • RocketMQ SQL92过滤
            • Tag VS SQL92
            • “万恶”的业务方
              • 能不能支持一条消息有多个Tag?
                • 扩展实现多Tag
                  • 不定长索引实现多Tag
                  • 总结
                  相关产品与服务
                  云数据库 MySQL
                  腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
                  领券
                  问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档