说明:
背景
MQTT 标准规范定义了 Topic Filter 的概念,让订阅者可以根据 MQTT Topic Name 层级结构选择需要订阅的消息。层级结构 和 Wildcard 表达式,提供了比较灵活的消息过滤表达能力。但在一些场景中,例如灰度发布,A/B测试, 系统升级等场景,Topic Filter 无法满足更灵活的需求。
实现原理

当用户 Subscribe User Property 包含 key 为 "$where",Value 为 WHERE子句时,MQTT Server 在推送消息时, 会根据 WHERE 子句对消息进行过滤,仅投递符合过滤条件的消息到订阅者。
使用限制
1. Subscribe User Properties 中只能有一个过滤语句;当有多个 $where --> WHERE-CLAUSE 属性时,仅第一个生效;
2. 消息用户属性中,按照协议允许多个同名的 Key-Value对,同名 Key 的多个键值对情况下,仅最后一个参与过滤表达式计算;
3. 如果过滤表达式包含某个字段在消息属性中未出现, 则该字段的值为 NULL;
4. WHERE 子句支持的操作符包括: AND、OR、NOT、=、!=、>、>=、<、<=、IN、IS NULL、LIKE、CASE WHEN...THEN...ELSE...END;
5. WHERE 子句支持的函数包括: UPPER, LOWER, LENGTH, ABS, COALESCE;
6. 字符串字面量仅支持单引号, 例如 where type = 'string-literal'。
示例
package com.tencent.tdmq.mqtt.quickstart.paho.v5.async;import java.nio.ByteBuffer;import java.nio.charset.StandardCharsets;import java.util.ArrayList;import java.util.List;import java.util.concurrent.TimeUnit;import org.eclipse.paho.mqttv5.client.IMqttToken;import org.eclipse.paho.mqttv5.client.MqttAsyncClient;import org.eclipse.paho.mqttv5.client.MqttCallback;import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;import org.eclipse.paho.mqttv5.common.MqttException;import org.eclipse.paho.mqttv5.common.MqttMessage;import org.eclipse.paho.mqttv5.common.MqttSubscription;import org.eclipse.paho.mqttv5.common.packet.MqttProperties;import org.eclipse.paho.mqttv5.common.packet.UserProperty;public class BasicQuickStart {public static void main(String[] args) throws MqttException, InterruptedException {String serverUri = "tcp://mqtt-xxx.mqtt.tdmqcloud.com:1883";String clientId = "deviceBasic";String topic = "home/room/1";String[] topicFilters = new String[] {"home/#"};int[] qos = new int[] {1};MqttAsyncClient client = new MqttAsyncClient(serverUri, clientId, new MemoryPersistence());MqttConnectionOptions options = new MqttConnectionOptions();options.setUserName("YOUR-USERNAME");options.setPassword("YOUR-PASSWORD".getBytes(StandardCharsets.UTF_8));options.setCleanStart(true);options.setSessionExpiryInterval(TimeUnit.DAYS.toSeconds(1));client.setCallback(new MqttCallback() {@Overridepublic void disconnected(MqttDisconnectResponse response) {System.out.println("Disconnected: " + response.getReasonString());}@Overridepublic void mqttErrorOccurred(MqttException e) {e.printStackTrace();}@Overridepublic void messageArrived(String s, MqttMessage message) throws Exception {byte[] payload = message.getPayload();String content;if (4 == payload.length) {ByteBuffer buf = ByteBuffer.wrap(payload);content = String.valueOf(buf.getInt());} else {content = new String(payload, StandardCharsets.UTF_8);}System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s], properties=%s%n",topic, message.getQos(), content, message.getProperties());}@Overridepublic void deliveryComplete(IMqttToken token) {}@Overridepublic void connectComplete(boolean reconnect, String serverURI) {System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);}@Overridepublic void authPacketArrived(int i, MqttProperties properties) {}});client.connect(options).waitForCompletion();try {// SubscribeMqttSubscription[] subscriptions = new MqttSubscription[topicFilters.length];for (int i = 0; i < topicFilters.length; i++) {subscriptions[i] = new MqttSubscription(topicFilters[i], qos[i]);}MqttProperties subscribeProperties = new MqttProperties();List<UserProperty> userProperties = new ArrayList<>();UserProperty userProperty = new UserProperty("$where", "where $QoS = 1 AND k1 = 'v1'");userProperties.add(userProperty);subscribeProperties.setUserProperties(userProperties);client.subscribe(subscriptions, null, null, subscribeProperties).waitForCompletion();} catch (MqttException e) {e.printStackTrace();}int total = 128;for (int i = 0; i < total; i++) {byte[] payload = new byte[4];ByteBuffer buffer = ByteBuffer.wrap(payload);buffer.putInt(i);MqttMessage message = new MqttMessage(payload);message.setQos(1);MqttProperties properties = new MqttProperties();properties.setContentType("application/json");properties.setResponseTopic("response/topic");message.setProperties(properties);System.out.printf("Prepare to publish message %d%n", i);// P2P topic format: {first-topic}/p2p/{target-client-id}client.publish(topic, message);System.out.printf("Published message %d%n", i);TimeUnit.MILLISECONDS.sleep(100);}TimeUnit.MINUTES.sleep(3);client.disconnect();}}