使用客户端订阅消息

最近更新时间:2025-09-22 10:39:43

我的收藏
MQTT (Message Queuing Telemetry Transport) 是一种轻量级的、基于发布/订阅(Pub / Sub)模式的物联网消息传输协议。在 MQTT 通信模型中,订阅 (Subscription) 是客户端(订阅者)向服务器(Broker)表达其希望接收特定主题或一类主题消息的核心机制。

发布订阅模型

发布者将消息发送到特定的 Topic,Broker 负责将消息转发给所有订阅了该 Topic 的订阅者。
例如下图场景中,温度传感器将温度读数发布到 “ temperature ” 主题,手机和后端业务系统订阅该主题后,都将获得温度传感器发布的全量数据。


订阅原理

1. 连接建立:客户端首先与 MQTT Broker 建立 TCP 连接,并发送 CONNECT 报文进行认证。
2. 发起订阅:连接成功后,客户端向 Broker 发送一个 SUBSCRIBE 报文。这个报文中包含了一个或多个它想要订阅的主题过滤器 (Topic Filter) 以及对应的服务质量等级 (QoS)。
3. 确认订阅:Broker 收到 SUBSCRIBE 报文后,会回复一个 SUBACK 订阅确认报文。该报文包含了为每个请求订阅的 Topic Filter 所授予的 QoS 等级(可能是客户端请求的,也可能是 Broker 根据自身策略授予的)。
4. 消息路由:此后,一旦有消息发布到匹配该 Topic Filter 的 Topic 上,Broker 就会立即将消息转发给该客户端。
5. 取消订阅:当客户端不再希望接收某些 Topic 的消息时,它会发送一个 UNSUBSCRIBE 报文,Broker 随后停止转发相关消息。

主题过滤器与通配符

客户端在订阅时使用的不是完全确定的 Topic 名,而是可以使用通配符的 Topic Filter,这使得它可以灵活地订阅多个 Topic。
通配符
说明
示例
+
单层通配符,代表匹配一个且仅一个任意主题层级
订阅 sensor/+/temperature
匹配 sensor/bedroom/temperature
匹配 sensor/living_room/temperature
不匹配 sensor/bedroom/upstairs/temperature (因为+只能匹配一层)
不匹配 sensor/temperature (因为 sensor 和 temperature 之间缺少一层)
#
多层通配符,代表匹配零个或多个任意主题层级,必须是主题过滤器的最后一个字符
订阅 sensor/bedroom/#
匹配 sensor/bedroom/temperature
匹配 sensor/bedroom/humidity
匹配 sensor/bedroom/light/intensity
匹配 sensor/bedroom (# 可以匹配零层)
不匹配 sensor/living_room/temperature (因为 bedroom 是固定的)

服务质量 (QoS)

QoS(Quality of Service)指代消息传输的服务质量,每条消息都可以在发送时单独设置 QoS。它包括以下级别:
QoS 等级
工作流程
优点
缺点
适用场景
QoS = 0
(最多分发一次)
发送者将消息发送出去,不要求接收方进行确认
传输速度最快,开销最小
消息可能会丢失,例如网络故障、接收方不在线情况
非关键性的数据,允许偶尔丢失,例如周期性的传感器数据(温度、湿度),丢失下一个数据点很快会补上。
QoS = 1
(至少达到一次)
1. 发送者发送消息并保留一个副本。
2. 接收者收到消息后,必须回复一个 PUBACK (发布确认) 报文。
3. 发送者收到 PUBACK 后,才会丢弃消息副本。
4. 如果发送者在合理时间内未收到 PUBACK,它会重新发送该消息。
保证了消息不会丢失
可能导致消息重复
需要保证消息必达,但可以接受偶尔重复的场景,例如控制指令(“开关灯”),重复执行一次可能也没问题。
QoS = 2
(仅到达一次)
1. PUBLISH:发送者发送消息,并保留副本。
2. PUBREC:接收者收到后回复一个“已收到”确认。如果发送者没收到 PUBREC,会重发 PUBLISH。
3. PUBREL:发送者收到 PUBREC 后,发送一个“发布释放”报文,并可以丢弃消息副本。它现在只需要等待最终确认。
4. PUBCOMP:接收者收到 PUBREL 后,回复一个“发布完成”确认。之后才将消息交付给应用。发送者收到 PUBCOMP,流程结束。如果任何一方没收到应答,都会重发上一条报文。
既保证了消息不丢失,又确保了不会重复
速度最慢,开销最大
对可靠性和准确性要求极高的关键业务,例如计费系统、金融交易、关键状态同步,任何重复或丢失都会造成严重后果。

保留消息 (Retained Messages)

发布一条消息时,可以设置retained参数等于true,Broker 会为这个 Topic 保存最新的这条保留消息。
当一个新的客户端订阅匹配的 Topic 时,Broker 会立即将这条最新的保留消息发送给它,而无需等待其下一次发布,这对于获取设备的最新状态非常有用。
例如,一个温度传感器每隔小时发布一次当前温度并设置为保留消息。任何新上线的客户端在订阅 sensor/temperature 后,能立刻收到当前的最新温度,而不必等待下一个小时。

使用示例

package com.tencent.tdmq.mqtt.example.paho.v5;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.eclipse.paho.mqttv5.client.IMqttToken;
import org.eclipse.paho.mqttv5.client.MqttCallback;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttDisconnectResponse;
import org.eclipse.paho.mqttv5.client.persist.MemoryPersistence;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;

public class SubscriberQuickStart {
public static void main(String[] args) throws MqttException, InterruptedException {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
String clientId = "SubscriberQuickStart";

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
String username = "user0";
String password = "secret0";

// MQTT topic filters
String[] topicFilters = new String[]{"home/test", "home/#", "home/+"};
int[] qos = new int[]{1, 1, 1};

MqttClient client = new MqttClient(serverUri, clientId, new MemoryPersistence());
client.setTimeToWait(3000);
MqttConnectionOptions options = new MqttConnectionOptions();
options.setUserName(username);
options.setPassword(password.getBytes(StandardCharsets.UTF_8));
options.setCleanStart(true);
options.setAutomaticReconnect(true);

client.setCallback(new MqttCallback() {
@Override
public void disconnected(MqttDisconnectResponse response) {
System.out.println("Disconnected: " + response.getReasonString());
}

@Override
public void mqttErrorOccurred(MqttException e) {
e.printStackTrace();
}

@Override
public void messageArrived(String topic, MqttMessage message) {
byte[] payload = message.getPayload();
String content;
if (4 == payload.length) {
ByteBuffer buf = ByteBuffer.wrap(payload);
content = String.valueOf(buf.getInt());
} else {
content = new String(payload, StandardCharsets.UTF_8);
}

System.out.printf("Message arrived, topic=%s, QoS=%d content=[%s]%n",
topic, message.getQos(), content);
List<UserProperty> userProperties = message.getProperties().getUserProperties();
printUserProperties(userProperties);
}

@Override
public void deliveryComplete(IMqttToken token) {
System.out.println("Delivery complete for packet-id: " + token.getMessageId());
}

@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println(reconnect ? "Reconnected" : "Connected" + " to " + serverURI);
try {
// Subscribe
IMqttToken token = client.subscribe(topicFilters, qos);
int[] reasonCodes = token.getReasonCodes();
for (int i = 0; i < reasonCodes.length; i++) {
System.out.printf("Subscribed to topic %s with QoS=%d, Granted-QoS: %d%n",
topicFilters[i], qos[i], reasonCodes[i]);
}

if (token.isComplete()) {
List<UserProperty> userProperties = token.getResponseProperties().getUserProperties();
printUserProperties(userProperties);
}
} catch (MqttException e) {
e.printStackTrace();
}
}

@Override
public void authPacketArrived(int i, MqttProperties properties) {
System.out.println("Received auth packet with id: " + i);
}
});

client.connect(options);

TimeUnit.MINUTES.sleep(5);

client.disconnect();

client.close();

}

static void printUserProperties(List<UserProperty> userProperties) {
if (null != userProperties) {
for (UserProperty userProperty : userProperties) {
System.out.printf("User property: %s = %s%n", userProperty.getKey(), userProperty.getValue());
}
}
}
}