功能概述
Dart/Flutter 平台提供了两个主要的 MQTT 客户端库:
mqtt_client
用于 MQTT 3.1/3.1.1 协议的客户端库,可用于 Flutter、Dart VM 以及 Web 平台。提供了 MqttServerClient 和 MqttBrowserClient 用于服务端和浏览器环境。
mqtt5_client
专门支持 MQTT 5.0 协议的客户端库,提供了完整的 MQTT 5.0 特性支持,包括增强的认证、消息属性、主题别名等。同样支持服务器和浏览器环境。
云资源准备
环境准备
选择合适的客户端库
根据您需要使用的 MQTT 协议版本选择对应的库:
使用 MQTT 3.1.1(推荐用于生产环境)
dependencies:mqtt_client: ^10.11.0
使用 MQTT 5.0
dependencies:mqtt5_client: ^4.15.2
然后运行:
flutter pub get# 或dart pub get
示例代码
import 'dart:async';import 'dart:convert';import 'package:mqtt5_client/mqtt5_client.dart';import 'package:mqtt5_client/mqtt5_server_client.dart';Future<void> main() async {// 从MQTT控制台获取接入点final server = 'mqtt-xxx.mqtt.tencenttdmq.com';final port = 1883;// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符final clientId = 'QuickStartMqtt5';// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码final username = 'YOUR_USERNAME';final password = 'YOUR_PASSWORD';// 确认一级主题 "home" 在MQTT控制台已经创建final pubTopic = 'home/test';final topicFilters = ['home/test', 'home/#', 'home/+'];final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];final total = 16;final client = MqttServerClient.withPort(server, clientId, port);client.logging(on: true);client.keepAlivePeriod = 60;client.autoReconnect = true;// 设置连接消息final connMessage = MqttConnectMessage().withClientIdentifier(clientId).authenticateAs(username, password).startClean();client.connectionMessage = connMessage;// 连接回调client.onConnected = () {print('Connected to $server');// Subscribefor (var i = 0; i < topicFilters.length; i++) {client.subscribe(topicFilters[i], qos[i]);print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');}};client.onDisconnected = () {print('Disconnected');};client.onAutoReconnect = () {print('Auto reconnecting...');};client.onAutoReconnected = () {print('Auto reconnected');};try {print('Connecting to MQTT broker...');await client.connect();} catch (e) {print('Exception: $e');client.disconnect();return;}if (client.connectionStatus!.state == MqttConnectionState.connected) {print('MQTT client connected');// 订阅消息回调client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {final recMessage = c[0].payload as MqttPublishMessage;final topic = c[0].topic;final payload = recMessage.payload.message;final content = payload != null ? utf8.decode(payload.toList()) : '';print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');});// 发布消息for (var i = 0; i < total; i++) {final builder = MqttPayloadBuilder();builder.addString('Hello MQTT 5.0 - $i');print('Prepare to publish message $i');client.publishMessage(pubTopic, qos[0], builder.payload!);print('Published message $i');await Future.delayed(Duration(seconds: 3));}await Future.delayed(Duration(seconds: 3));client.disconnect();} else {print('Connection failed - status is ${client.connectionStatus}');client.disconnect();}}
import 'dart:async';import 'dart:convert';import 'dart:io';import 'package:mqtt5_client/mqtt5_client.dart';import 'package:mqtt5_client/mqtt5_server_client.dart';Future<void> main() async {// 从MQTT控制台获取接入点final server = 'mqtt-xxx.mqtt.tencenttdmq.com';final port = 8883;// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符final clientId = 'QuickStartMqtt5Tls';// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码final username = 'YOUR_USERNAME';final password = 'YOUR_PASSWORD';// 确认一级主题 "home" 在MQTT控制台已经创建final pubTopic = 'home/test';final topicFilters = ['home/test', 'home/#', 'home/+'];final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];final total = 16;final client = MqttServerClient.withPort(server, clientId, port);client.logging(on: true);client.keepAlivePeriod = 60;client.autoReconnect = true;client.secure = true;// 配置 TLS/SSLSecurityContext context = SecurityContext.defaultContext;client.securityContext = context;client.onBadCertificate = (dynamic certificate) => true; // 开发环境使用,生产环境应验证证书// 设置连接消息final connMessage = MqttConnectMessage().withClientIdentifier(clientId).authenticateAs(username, password).startClean();client.connectionMessage = connMessage;// 连接回调client.onConnected = () {print('Connected to $server');// Subscribefor (var i = 0; i < topicFilters.length; i++) {client.subscribe(topicFilters[i], qos[i]);print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');}};client.onDisconnected = () {print('Disconnected');};client.onAutoReconnect = () {print('Auto reconnecting...');};client.onAutoReconnected = () {print('Auto reconnected');};try {print('Connecting to MQTT broker...');await client.connect();} catch (e) {print('Exception: $e');client.disconnect();return;}if (client.connectionStatus!.state == MqttConnectionState.connected) {print('MQTT client connected');// 订阅消息回调client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {final recMessage = c[0].payload as MqttPublishMessage;final topic = c[0].topic;final payload = recMessage.payload.message;final content = payload != null ? utf8.decode(payload.toList()) : '';print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');});// 发布消息for (var i = 0; i < total; i++) {final builder = MqttPayloadBuilder();builder.addString('Hello MQTT 5.0 TLS - $i');print('Prepare to publish message $i');client.publishMessage(pubTopic, qos[0], builder.payload!);print('Published message $i');await Future.delayed(Duration(seconds: 3));}await Future.delayed(Duration(seconds: 3));client.disconnect();} else {print('Connection failed - status is ${client.connectionStatus}');client.disconnect();}}
import 'dart:async';import 'dart:convert';import 'package:mqtt_client/mqtt_client.dart';import 'package:mqtt_client/mqtt_server_client.dart';Future<void> main() async {// 从MQTT控制台获取接入点:// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;final server = 'mqtt-xxx.mqtt.tencenttdmq.com';final port = 1883;// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059final clientId = 'QuickStart';// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码final username = 'YOUR_USERNAME';final password = 'YOUR_PASSWORD';// 确认一级主题 "home" 在MQTT控制台已经创建final pubTopic = 'home/test';final topicFilters = ['home/test', 'home/#', 'home/+'];final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];final total = 16;final client = MqttServerClient.withPort(server, clientId, port);client.logging(on: true);client.setProtocolV311(); // 使用 MQTT 3.1.1 协议client.keepAlivePeriod = 60;client.autoReconnect = true;client.connectTimeoutPeriod = 3000;// 设置连接消息final connMessage = MqttConnectMessage().withClientIdentifier(clientId).authenticateAs(username, password).startClean().withWillQos(MqttQos.atLeastOnce);client.connectionMessage = connMessage;// 连接回调client.onConnected = () {print('Connected to $server');// Subscribefor (var i = 0; i < topicFilters.length; i++) {client.subscribe(topicFilters[i], qos[i]);print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');}};client.onDisconnected = () {print('Disconnected');};client.onAutoReconnect = () {print('Auto reconnecting...');};client.onAutoReconnected = () {print('Auto reconnected');};try {print('Connecting to MQTT broker...');await client.connect();} catch (e) {print('Exception: $e');client.disconnect();return;}if (client.connectionStatus!.state == MqttConnectionState.connected) {print('MQTT client connected');// 订阅消息回调client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {final recMessage = c[0].payload as MqttPublishMessage;final topic = c[0].topic;final payload = recMessage.payload.message;final content = payload != null ? utf8.decode(payload.toList()) : '';print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');});// 发布消息for (var i = 0; i < total; i++) {final builder = MqttClientPayloadBuilder();builder.addString('Hello MQTT $i');print('Prepare to publish message $i');client.publishMessage(pubTopic, MqttQos.atLeastOnce, builder.payload!);print('Published message $i');await Future.delayed(Duration(seconds: 3));}await Future.delayed(Duration(seconds: 3));client.disconnect();} else {print('Connection failed - status is ${client.connectionStatus}');client.disconnect();}}
import 'dart:async';import 'dart:convert';import 'dart:io';import 'package:mqtt_client/mqtt_client.dart';import 'package:mqtt_client/mqtt_server_client.dart';Future<void> main() async {// 从MQTT控制台获取接入点:// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;final server = 'mqtt-xxx.mqtt.tencenttdmq.com';final port = 8883;// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059final clientId = 'ClientQuickStartTls';// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码final username = 'YOUR_USERNAME';final password = 'YOUR_PASSWORD';// 确认一级主题 "home" 在MQTT控制台已经创建final pubTopic = 'home/test';final topicFilters = ['home/test', 'home/#', 'home/+'];final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];final total = 16;final client = MqttServerClient.withPort(server, clientId, port);client.logging(on: true);client.setProtocolV311(); // 使用 MQTT 3.1.1 协议client.keepAlivePeriod = 60;client.autoReconnect = true;client.connectTimeoutPeriod = 3000;client.secure = true;// 配置 TLS/SSLSecurityContext context = SecurityContext.defaultContext;client.securityContext = context;client.onBadCertificate = (dynamic certificate) => true; // 开发环境使用,生产环境应验证证书// 设置连接消息final connMessage = MqttConnectMessage().withClientIdentifier(clientId).authenticateAs(username, password).startClean().withWillQos(MqttQos.atLeastOnce);client.connectionMessage = connMessage;// 连接回调client.onConnected = () {print('Connected to $server');// Subscribefor (var i = 0; i < topicFilters.length; i++) {client.subscribe(topicFilters[i], qos[i]);print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');}};client.onDisconnected = () {print('Disconnected');};client.onAutoReconnect = () {print('Auto reconnecting...');};client.onAutoReconnected = () {print('Auto reconnected');};try {print('Connecting to MQTT broker...');await client.connect();} catch (e) {print('Exception: $e');client.disconnect();return;}if (client.connectionStatus!.state == MqttConnectionState.connected) {print('MQTT client connected');// 订阅消息回调client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {final recMessage = c[0].payload as MqttPublishMessage;final topic = c[0].topic;final payload = recMessage.payload.message;final content = payload != null ? utf8.decode(payload.toList()) : '';print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');});// 发布消息for (var i = 0; i < total; i++) {final builder = MqttClientPayloadBuilder();builder.addString('Hello MQTT $i');print('Prepare to publish message $i');client.publishMessage(pubTopic, MqttQos.atLeastOnce, builder.payload!);print('Published message $i');await Future.delayed(Duration(seconds: 3));}await Future.delayed(Duration(seconds: 3));client.disconnect();} else {print('Connection failed - status is ${client.connectionStatus}');client.disconnect();}}