发布订阅模型
MQTT 协议定义了基于发布订阅(Pub / Sub)的消息模型。例如下图场景中,温度传感器将温度读数发布到 “ temperature ” 主题,手机和后端业务系统订阅该主题后,都将获得温度传感器发布的全量数据。

什么是共享订阅?
共享订阅是一种负载均衡的消费方式,属于同一个消费组的多个客户端轮转分配订阅的消息。因此,通常也称为消费者负载均衡。
例如下图中,发布者发布了4条消息:M1、M2、M3、M4。客户端 Client-0 和 Client-1 以共享订阅的方式负载均衡消费订阅的主题。这与默认的 MQTT Pub/Sub 不同,Client-0 仅消费到了 M1, M3; Client-1 仅消费到了 M2, M4。

MQTT 3.1、3.1.1
MQTT 3.1及3.1.1协议并未定义共享订阅相关内容,腾讯云通过扩展的方式提供了共享订阅的支持。
使用方式
当期望以负载均衡的方式订阅消息时,订阅 topic-filter 以如下方式配置:
$share/{ShareName}/{TopicFilter}
参数 | 说明 |
$share | 字面常量,固定字符串。 |
{ShareName} | 是负载均衡组名称,不能包含“ / ”,“ + ” , “ # ”。 |
{TopicFilter} |
示例代码
package org.apache.rocketmq.mqtt.example.quickstart;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class SharedSubscriptionQuickStart {public static void main(String[] args) throws MqttException, InterruptedException {String serverUri = "tcp://127.0.0.1:1883";String clientId = "shared-sub-0";try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {client.setTimeToWait(3000);MqttConnectOptions options = new MqttConnectOptions();options.setUserName("YOUR-USERNAME");options.setPassword("YOUR-PASSWORD".toCharArray());options.setCleanSession(true);options.setAutomaticReconnect(true);client.connect(options);int total = 1;CountDownLatch latch = new CountDownLatch(total);client.setCallback(new MqttCallback() {public void messageArrived(String topic, MqttMessage message) {System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n", topic, message.getQos(),new String(message.getPayload()));latch.countDown();}public void connectionLost(Throwable cause) {System.out.println("connectionLost: " + cause.getMessage());}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete: " + token.isComplete());}});// 共享订阅表达式// {ShareName} 这里为 Group0// {topic-filter} 这里为 home/#String topic = "$share/Group0/home/#";// Subscribe client.subscribe(topic, 1); TimeUnit.HOURS.sleep(1); client.disconnect(); } } }
MQTT 5 使用说明
共享订阅离线消息保留策略
${ShareName}下如果存在有效的Session订阅${TopicFilter}, 符合${TopicFilter}的离线消息就会保留,订阅者再次上线时,会从上次进度恢复消费。
Session 有效期
MQTT 3.1、3.1.1 通过clean-session定义Session的生命周期。 当 clean-session = true, Session 的生命周期与传输层生命周期一致。 当 clean-session = false, Session 与传输层生命周期无关。为避免资源浪费,产品定义传输层断开后,Session 最大存续 3 天。

按照 MQTT 5.0 协议,有以下等价语义:
MQTT 3.1、3.1.1 | MQTT 5 | |
clean-session = true | clean-start = true | session-expiry-interval = 0 |
clean-session = false | clean-start = false | session-expiry-interval = 259200 |