前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rocketmq消息中间件中通过message key找消息的问题

Rocketmq消息中间件中通过message key找消息的问题

作者头像
山行AI
发布2019-06-28 11:21:34
7.3K0
发布2019-06-28 11:21:34
举报
文章被收录于专栏:山行AI山行AI

1. Rocketmq的安装布署:

参考:http://rocketmq.apache.org/docs/quick-start/

2. Rocketmq的简单应用

参考:https://github.com/apache/rocketmq/tree/master/example

3. MessageQueueSelector

代码语言:javascript
复制
public interface MessageQueueSelector {    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}

RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上,RocketMQ默认提供了三种实现,分别是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。MessageQueueSelector的select方法提供了三个入参,分别为消息队列集合、消息和扩展参数。本示例通过使用扩展参数来实现消息通道的定向发送和接收。

RocketMQ在设计的时候就支持tag了,因为他的索引文件就包含了tag的。 后来为了更去的过滤功能,更是扩展格式里,能进一步根据SQL92或者创建时间来过滤了。可以自定义MessageSelector来获取需要的消息。

ConsumeQueue扩展格式:支持sql92标准来过滤 ConsumeQueue标准格式只能通过tags搜索,不能使用用filters和commitTime搜索,于是扩展格式增加了: 参考:http://rocketmq.apache.org/docs/filter-by-sql92-example/

  1. 生产者示例:
代码语言:javascript
复制
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();Message msg = new Message("TopicTest",    tag,    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties.msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);producer.shutdown();
  1. 消费者示例:
代码语言:javascript
复制
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");// only subsribe messages have property a, also a >=0 and a <= 3consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");consumer.registerMessageListener(new MessageListenerConcurrently() {    @Override    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    }});consumer.start();

IndexFile:支持查询消息,topic+key+最多条数+开始时间+结束时间 public QueryOffsetResult queryOffset(String topic,String key,int maxNum,long begin,long end){...}

4. 怎样设计IndexFile的物理存储内容才能满足上面的要求?

RocketMQ的物理存储总结:

  • 消息实际内容存储在CommitLog中(这点和Kafka大有不同,这也是RocketMQ没有kafka那么大的吞吐但是吞吐更稳定的原因);
  • 为了能有多个Consumer并行消费,设计了基于(topic,queued)区分的ConsumeQueue;
  • 为了在消费时在Broker上就过滤掉不感兴趣的内容,支持为Message打tag,订阅时只得到相关的tag的消息,将tagCode存储于其上。
  • 为了订阅时能做到除了tag外的更多过滤,设计ConsumeQueueExt格式,通过BloomFilter;
  • 为了满足根据key和时间段进行查询,设计了IndexFile
  • Kafka是不支持broker端过滤的,只能通过offset拿数据,拿到Consumer里,自己把Message解析出来,在Consumer里过滤。
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-06-06,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. Rocketmq的安装布署:
  • 2. Rocketmq的简单应用
  • 3. MessageQueueSelector
  • 4. 怎样设计IndexFile的物理存储内容才能满足上面的要求?
相关产品与服务
对象存储
对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档