参考:http://rocketmq.apache.org/docs/quick-start/
参考:https://github.com/apache/rocketmq/tree/master/example
public interface MessageQueueSelector { MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);}
RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上,RocketMQ默认提供了三种实现,分别是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。MessageQueueSelector的select方法提供了三个入参,分别为消息队列集合、消息和扩展参数。本示例通过使用扩展参数来实现消息通道的定向发送和接收。
RocketMQ在设计的时候就支持tag了,因为他的索引文件就包含了tag的。 后来为了更去的过滤功能,更是扩展格式里,能进一步根据SQL92或者创建时间来过滤了。可以自定义MessageSelector来获取需要的消息。
ConsumeQueue扩展格式:支持sql92标准来过滤 ConsumeQueue标准格式只能通过tags搜索,不能使用用filters和commitTime搜索,于是扩展格式增加了: 参考:http://rocketmq.apache.org/docs/filter-by-sql92-example/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();Message msg = new Message("TopicTest", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// Set some properties.msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);producer.shutdown();
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");// only subsribe messages have property a, also a >=0 and a <= 3consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }});consumer.start();
IndexFile:支持查询消息,topic+key+最多条数+开始时间+结束时间 public QueryOffsetResult queryOffset(String topic,String key,int maxNum,long begin,long end){...}
RocketMQ的物理存储总结: