配置 SQL 过滤

最近更新时间:2025-08-06 19:50:21

我的收藏
说明:
当前功能灰度中,如控制台提示当前集群暂未开启该功能, 请提交工单联系我们处理。

背景

MQTT 标准规范定义了 Topic Filter 的概念,让订阅者可以根据 MQTT Topic Name 层级结构选择需要订阅的消息。层级结构 和 Wildcard 表达式,提供了比较灵活的消息过滤表达能力。但在一些场景中,例如灰度发布,A/B测试, 系统升级等场景,Topic Filter 无法满足更灵活的需求。

实现原理

MQTT 5.0 引入了 Subscribe User Property,MQTT 产品扩展支持了 Subscribe User Property 的过滤语义。

当用户 Subscribe User Property 包含 key 为 "$where",Value 为 WHERE子句时,MQTT Server 在推送消息时, 会根据 WHERE 子句对消息进行过滤,仅投递符合过滤条件的消息到订阅者。

使用限制

1. Subscribe User Properties 中只能有一个过滤语句;当有多个 $where --> WHERE-CLAUSE 属性时,仅第一个生效;
2. 消息用户属性中,按照协议允许多个同名的 Key-Value对,同名 Key 的多个键值对情况下,仅最后一个参与过滤表达式计算;
3. 如果过滤表达式包含某个字段在消息属性中未出现, 则该字段的值为 NULL;
4. WHERE 子句支持的操作符包括: AND、OR、NOT、=、!=、>、>=、<、<=、IN、IS NULL、LIKE、CASE WHEN...THEN...ELSE...END;
5. WHERE 子句支持的函数包括: UPPER, LOWER, LENGTH, ABS, COALESCE;
6. 字符串字面量仅支持单引号, 例如 where type = 'string-literal'。

示例

package com.tencent.tdmq.mqtt.quickstart.paho.v5.async;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttAsyncClient;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

public class BasicQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
String serverUri = "tcp://mqtt-xxx.mqtt.tdmqcloud.com:1883";
String clientId = "deviceBasic";

String topic = "home/room/1";
String[] topicFilters = new String[] {"home/#"};
int[] qos = new int[] {1};

MqttAsyncClient client = new MqttAsyncClient(serverUri, clientId, new MemoryPersistence());
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName("YOUR-USERNAME");
options.setPassword("YOUR-PASSWORD".getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setSessionExpiryInterval(TimeUnit.DAYS.toSeconds(1));

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String s, MqttMessage message) throws Exception {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s], properties=%s%n",
topic, message.getQos(), content, message.getProperties());
}

@Override
public void deliveryComplete(IMqttToken token) {

}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
}
});
client.connect(options).waitForCompletion();
try {
// Subscribe
MqttSubscription[] subscriptions = new MqttSubscription[topicFilters.length];
for (int i = 0; i < topicFilters.length; i++) {
subscriptions[i] = new MqttSubscription(topicFilters[i], qos[i]);
}
MqttProperties subscribeProperties = new MqttProperties();
List<UserProperty> userProperties = new ArrayList<>();
UserProperty userProperty = new UserProperty("$where", "where $QoS = 1 AND k1 = 'v1'");
userProperties.add(userProperty);
subscribeProperties.setUserProperties(userProperties);
client.subscribe(subscriptions, null, null, subscribeProperties).waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}


int total = 128;
for (int i = 0; i < total; i++) {
byte[] payload = new byte[4];
ByteBuffer buffer = ByteBuffer.wrap(payload);
buffer.putInt(i);
MqttMessage message = new MqttMessage(payload);
message.setQos(1);
MqttProperties properties = new MqttProperties();
properties.setContentType("application/json");
properties.setResponseTopic("response/topic");
message.setProperties(properties);
System.out.printf("Prepare to publish message %d%n", i);
// P2P topic format: {first-topic}/p2p/{target-client-id}
client.publish(topic, message);
System.out.printf("Published message %d%n", i);
TimeUnit.MILLISECONDS.sleep(100);
}
TimeUnit.MINUTES.sleep(3);
client.disconnect();
}
}