共享订阅

最近更新时间:2025-06-17 14:05:22

我的收藏

发布订阅模型

MQTT 协议定义了基于发布订阅(Pub / Sub)的消息模型。例如下图场景中,温度传感器将温度读数发布到 “ temperature ” 主题,手机和后端业务系统订阅该主题后,都将获得温度传感器发布的全量数据。


什么是共享订阅?

共享订阅是一种负载均衡的消费方式,属于同一个消费组的多个客户端轮转分配订阅的消息。因此,通常也称为消费者负载均衡。
例如下图中,发布者发布了4条消息:M1、M2、M3、M4。客户端 Client-0 和 Client-1 以共享订阅的方式负载均衡消费订阅的主题。这与默认的 MQTT Pub/Sub 不同,Client-0 仅消费到了 M1, M3; Client-1 仅消费到了 M2, M4


MQTT 3.1、3.1.1

MQTT 3.1及3.1.1协议并未定义共享订阅相关内容,腾讯云通过扩展的方式提供了共享订阅的支持。

使用方式

当期望以负载均衡的方式订阅消息时,订阅 topic-filter 以如下方式配置:
$share/{ShareName}/{TopicFilter}
参数
说明
$share
字面常量,固定字符串。
{ShareName}
是负载均衡组名称,不能包含“ / ”,“ + ” , “ # ”。
{TopicFilter}
与 MQTT TopicFilter 要求 和语义相同。

示例代码

package org.apache.rocketmq.mqtt.example.quickstart;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class SharedSubscriptionQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
String serverUri = "tcp://127.0.0.1:1883";
String clientId = "shared-sub-0";
try (MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence())) {
client.setTimeToWait(3000);
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("YOUR-USERNAME");
options.setPassword("YOUR-PASSWORD".toCharArray());
options.setCleanSession(true);
options.setAutomaticReconnect(true);
client.connect(options);
int total = 1;
CountDownLatch latch = new CountDownLatch(total);

client.setCallback(new MqttCallback() {
public void messageArrived(String topic, MqttMessage message) {
System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n", topic, message.getQos(),
new String(message.getPayload()));
latch.countDown();
}

public void connectionLost(Throwable cause) {
System.out.println("connectionLost: " + cause.getMessage());
}

public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete: " + token.isComplete());
}
});
// 共享订阅表达式
// {ShareName} 这里为 Group0
// {topic-filter} 这里为 home/#
String topic = "$share/Group0/home/#";
// Subscribe client.subscribe(topic, 1); TimeUnit.HOURS.sleep(1); client.disconnect(); } } }

MQTT 5 使用说明

MQTT 5 协议层面明确了 共享订阅的定义

共享订阅离线消息保留策略

${ShareName}下如果存在有效的Session订阅${TopicFilter}, 符合${TopicFilter}的离线消息就会保留,订阅者再次上线时,会从上次进度恢复消费。


Session 有效期

MQTT 3.1、3.1.1 通过clean-session定义Session的生命周期。 当 clean-session = true, Session 的生命周期与传输层生命周期一致。 当 clean-session = false, Session 与传输层生命周期无关。为避免资源浪费,产品定义传输层断开后,Session 最大存续 3 天。


按照 MQTT 5.0 协议,有以下等价语义:
MQTT 3.1、3.1.1
MQTT 5
clean-session = true
clean-start = true
session-expiry-interval = 0
clean-session = false
clean-start = false
session-expiry-interval = 259200