消息过滤

最近更新时间:2023-08-04 14:57:51

我的收藏
本文主要介绍 TDMQ RocketMQ 版中消息过滤的功能、应用场景和使用方式。

功能介绍

消息过滤功能指消息生产者向 Topic 中发送消息时,设置消息属性对消息进行分类,消费者订阅 Topic 时,根据消息属性设置过滤条件对消息进行过滤,只有符合过滤条件的消息才会被投递到消费端进行消费。
消费者订阅 Topic 时若未设置过滤条件,无论消息发送时是否有设置过滤属性,Topic 中的所有消息都将被投递到消费端进行消费。

应用场景

通常,一个 Topic 中存放的是相同业务属性的消息,例如交易流水 Topic 包含了下单流水、支付流水、发货流水等,业务若只想消费者其中一种类别的流水,可在客户端进行过滤,但这种过滤方式会带来带宽的资源浪费。
针对上述场景,TDMQ 提供 Broker 端过滤的方式,用户可在生产消息时设置一个或者多个 Tag 标签,消费时指定 Tag 订阅。




使用方式

TAG 过滤

发送消息

发送消息时,每条消息必须指明 Tag。
Message msg = new Message("TOPIC","TagA","Hello world".getBytes());

订阅消息

订阅所有 Tag:消费者如需订阅某 Topic 下所有类型的消息,Tag 用星号(*)表示。
consumer.subscribe("TOPIC", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
订阅单个 Tag:消费者如需订阅某 Topic 下某一种类型的消息,请明确标明 Tag。
consumer.subscribe("TOPIC", "TagA", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});
订阅多个 Tag:消费者如需订阅某 Topic 下多种类型的消息,请在多个 Tag 之间用两个竖线(||)分隔。
consumer.subscribe("TOPIC", "TagA||TagB", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
System.out.println(message.getMsgID());
return Action.CommitMessage;
}
});

SQL 过滤

发送消息

发送代码和简单的消息没有区别 主要是在构造消息体的时候,带上自定义属性,允许多个。
int totalMessagesToSend = 5;
for (int i = 0; i < totalMessagesToSend; i++) {
Message msg = new Message(TOPIC_NAME,"Hello RocketMQ.".getBytes(StandardCharsets.UTF_8));
msg.putUserProperty("key1","value1");
// 发送消息
SendResult sendResult = producer.send(message);
System.out.println("sendResult = " + sendResult);
}

订阅消息

对于消费消息,主要是订阅的时候,带上对应的SQL表达式,其他的和普通的消费消息流程没有区别。
//订阅所有消息
pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("True"));

// 订阅topic 订阅单个key的sql,最常用
//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key1='value1'"));

//订阅多个属性
//pushConsumer.subscribe(TOPIC_NAME, MessageSelector.bySql("key1 IS NOT NULL AND key2 IS NOT NULL AND key1='value1' AND key2='value2'"));
// 注册回调实现类来处理从broker拉取回来的消息
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
// 消息处理逻辑
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费, 根据消费情况,返回处理状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
pushConsumer.start();
说明
上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 GitHub DemoRocketMQ 官方文档