Dart SDK

最近更新时间:2025-11-27 16:21:22

我的收藏

功能概述

Dart/Flutter 平台提供了两个主要的 MQTT 客户端库:

mqtt_client

用于 MQTT 3.1/3.1.1 协议的客户端库,可用于 Flutter、Dart VM 以及 Web 平台。提供了 MqttServerClient 和 MqttBrowserClient 用于服务端和浏览器环境。

mqtt5_client

专门支持 MQTT 5.0 协议的客户端库,提供了完整的 MQTT 5.0 特性支持,包括增强的认证、消息属性、主题别名等。同样支持服务器和浏览器环境。

云资源准备

请您先参见 创建资源 操作步骤完成云资源准备。

环境准备

选择合适的客户端库

根据您需要使用的 MQTT 协议版本选择对应的库:
使用 MQTT 3.1.1(推荐用于生产环境)
dependencies:
mqtt_client: ^10.11.0

使用 MQTT 5.0
dependencies:
mqtt5_client: ^4.15.2

然后运行:
flutter pub get
# 或
dart pub get

示例代码

MQTT 5.0
MQTT 5.0 TLS
MQTT 3
MQTT 3 TLS

import 'dart:async';
import 'dart:convert';
import 'package:mqtt5_client/mqtt5_client.dart';
import 'package:mqtt5_client/mqtt5_server_client.dart';

Future<void> main() async {
// 从MQTT控制台获取接入点
final server = 'mqtt-xxx.mqtt.tencenttdmq.com';
final port = 1883;

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
final clientId = 'QuickStartMqtt5';

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
final username = 'YOUR_USERNAME';
final password = 'YOUR_PASSWORD';

// 确认一级主题 "home" 在MQTT控制台已经创建
final pubTopic = 'home/test';
final topicFilters = ['home/test', 'home/#', 'home/+'];
final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];

final total = 16;

final client = MqttServerClient.withPort(server, clientId, port);
client.logging(on: true);
client.keepAlivePeriod = 60;
client.autoReconnect = true;

// 设置连接消息
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
.authenticateAs(username, password)
.startClean();
client.connectionMessage = connMessage;

// 连接回调
client.onConnected = () {
print('Connected to $server');
// Subscribe
for (var i = 0; i < topicFilters.length; i++) {
client.subscribe(topicFilters[i], qos[i]);
print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');
}
};

client.onDisconnected = () {
print('Disconnected');
};

client.onAutoReconnect = () {
print('Auto reconnecting...');
};

client.onAutoReconnected = () {
print('Auto reconnected');
};

try {
print('Connecting to MQTT broker...');
await client.connect();
} catch (e) {
print('Exception: $e');
client.disconnect();
return;
}

if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('MQTT client connected');

// 订阅消息回调
client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final recMessage = c[0].payload as MqttPublishMessage;
final topic = c[0].topic;
final payload = recMessage.payload.message;
final content = payload != null ? utf8.decode(payload.toList()) : '';
print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');
});

// 发布消息
for (var i = 0; i < total; i++) {
final builder = MqttPayloadBuilder();
builder.addString('Hello MQTT 5.0 - $i');
print('Prepare to publish message $i');
client.publishMessage(pubTopic, qos[0], builder.payload!);
print('Published message $i');
await Future.delayed(Duration(seconds: 3));
}

await Future.delayed(Duration(seconds: 3));
client.disconnect();
} else {
print('Connection failed - status is ${client.connectionStatus}');
client.disconnect();
}
}


import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:mqtt5_client/mqtt5_client.dart';
import 'package:mqtt5_client/mqtt5_server_client.dart';

Future<void> main() async {
// 从MQTT控制台获取接入点
final server = 'mqtt-xxx.mqtt.tencenttdmq.com';
final port = 8883;

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
final clientId = 'QuickStartMqtt5Tls';

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
final username = 'YOUR_USERNAME';
final password = 'YOUR_PASSWORD';

// 确认一级主题 "home" 在MQTT控制台已经创建
final pubTopic = 'home/test';
final topicFilters = ['home/test', 'home/#', 'home/+'];
final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];

final total = 16;

final client = MqttServerClient.withPort(server, clientId, port);
client.logging(on: true);
client.keepAlivePeriod = 60;
client.autoReconnect = true;
client.secure = true;

// 配置 TLS/SSL
SecurityContext context = SecurityContext.defaultContext;
client.securityContext = context;
client.onBadCertificate = (dynamic certificate) => true; // 开发环境使用,生产环境应验证证书

// 设置连接消息
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
.authenticateAs(username, password)
.startClean();
client.connectionMessage = connMessage;

// 连接回调
client.onConnected = () {
print('Connected to $server');
// Subscribe
for (var i = 0; i < topicFilters.length; i++) {
client.subscribe(topicFilters[i], qos[i]);
print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');
}
};

client.onDisconnected = () {
print('Disconnected');
};

client.onAutoReconnect = () {
print('Auto reconnecting...');
};

client.onAutoReconnected = () {
print('Auto reconnected');
};

try {
print('Connecting to MQTT broker...');
await client.connect();
} catch (e) {
print('Exception: $e');
client.disconnect();
return;
}

if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('MQTT client connected');

// 订阅消息回调
client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final recMessage = c[0].payload as MqttPublishMessage;
final topic = c[0].topic;
final payload = recMessage.payload.message;
final content = payload != null ? utf8.decode(payload.toList()) : '';
print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');
});

// 发布消息
for (var i = 0; i < total; i++) {
final builder = MqttPayloadBuilder();
builder.addString('Hello MQTT 5.0 TLS - $i');
print('Prepare to publish message $i');
client.publishMessage(pubTopic, qos[0], builder.payload!);
print('Published message $i');
await Future.delayed(Duration(seconds: 3));
}

await Future.delayed(Duration(seconds: 3));
client.disconnect();
} else {
print('Connection failed - status is ${client.connectionStatus}');
client.disconnect();
}
}


import 'dart:async';
import 'dart:convert';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

Future<void> main() async {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
final server = 'mqtt-xxx.mqtt.tencenttdmq.com';
final port = 1883;

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
final clientId = 'QuickStart';

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
final username = 'YOUR_USERNAME';
final password = 'YOUR_PASSWORD';

// 确认一级主题 "home" 在MQTT控制台已经创建
final pubTopic = 'home/test';
final topicFilters = ['home/test', 'home/#', 'home/+'];
final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];

final total = 16;

final client = MqttServerClient.withPort(server, clientId, port);
client.logging(on: true);
client.setProtocolV311(); // 使用 MQTT 3.1.1 协议
client.keepAlivePeriod = 60;
client.autoReconnect = true;
client.connectTimeoutPeriod = 3000;

// 设置连接消息
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
.authenticateAs(username, password)
.startClean()
.withWillQos(MqttQos.atLeastOnce);
client.connectionMessage = connMessage;

// 连接回调
client.onConnected = () {
print('Connected to $server');
// Subscribe
for (var i = 0; i < topicFilters.length; i++) {
client.subscribe(topicFilters[i], qos[i]);
print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');
}
};

client.onDisconnected = () {
print('Disconnected');
};

client.onAutoReconnect = () {
print('Auto reconnecting...');
};

client.onAutoReconnected = () {
print('Auto reconnected');
};

try {
print('Connecting to MQTT broker...');
await client.connect();
} catch (e) {
print('Exception: $e');
client.disconnect();
return;
}

if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('MQTT client connected');

// 订阅消息回调
client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final recMessage = c[0].payload as MqttPublishMessage;
final topic = c[0].topic;
final payload = recMessage.payload.message;
final content = payload != null ? utf8.decode(payload.toList()) : '';
print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');
});

// 发布消息
for (var i = 0; i < total; i++) {
final builder = MqttClientPayloadBuilder();
builder.addString('Hello MQTT $i');
print('Prepare to publish message $i');
client.publishMessage(pubTopic, MqttQos.atLeastOnce, builder.payload!);
print('Published message $i');
await Future.delayed(Duration(seconds: 3));
}

await Future.delayed(Duration(seconds: 3));
client.disconnect();
} else {
print('Connection failed - status is ${client.connectionStatus}');
client.disconnect();
}
}




import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';

Future<void> main() async {
// 从MQTT控制台获取接入点:
// 通过Private Link实现VPC网络打通的用户, 使用内网接入点;
// 通过公网访问的用户, 确保公网安全策略允许, 程序运行机器有公网接入;
final server = 'mqtt-xxx.mqtt.tencenttdmq.com';
final port = 8883;

// 合法的Client Identifier包含 数字0-9, 小写英文字母a-z, 以及大写英文字母A-Z, 总长度为1-23个字符
// 参考 https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901059
final clientId = 'ClientQuickStartTls';

// 在控制台 --> 认证 Tab页创建账户, 复制用户名和密码
final username = 'YOUR_USERNAME';
final password = 'YOUR_PASSWORD';

// 确认一级主题 "home" 在MQTT控制台已经创建
final pubTopic = 'home/test';
final topicFilters = ['home/test', 'home/#', 'home/+'];
final qos = [MqttQos.atLeastOnce, MqttQos.atLeastOnce, MqttQos.atLeastOnce];

final total = 16;

final client = MqttServerClient.withPort(server, clientId, port);
client.logging(on: true);
client.setProtocolV311(); // 使用 MQTT 3.1.1 协议
client.keepAlivePeriod = 60;
client.autoReconnect = true;
client.connectTimeoutPeriod = 3000;
client.secure = true;

// 配置 TLS/SSL
SecurityContext context = SecurityContext.defaultContext;
client.securityContext = context;
client.onBadCertificate = (dynamic certificate) => true; // 开发环境使用,生产环境应验证证书

// 设置连接消息
final connMessage = MqttConnectMessage()
.withClientIdentifier(clientId)
.authenticateAs(username, password)
.startClean()
.withWillQos(MqttQos.atLeastOnce);
client.connectionMessage = connMessage;

// 连接回调
client.onConnected = () {
print('Connected to $server');
// Subscribe
for (var i = 0; i < topicFilters.length; i++) {
client.subscribe(topicFilters[i], qos[i]);
print('Subscribed to topic ${topicFilters[i]} with QoS=${qos[i].index}');
}
};

client.onDisconnected = () {
print('Disconnected');
};

client.onAutoReconnect = () {
print('Auto reconnecting...');
};

client.onAutoReconnected = () {
print('Auto reconnected');
};

try {
print('Connecting to MQTT broker...');
await client.connect();
} catch (e) {
print('Exception: $e');
client.disconnect();
return;
}

if (client.connectionStatus!.state == MqttConnectionState.connected) {
print('MQTT client connected');

// 订阅消息回调
client.updates?.listen((List<MqttReceivedMessage<MqttMessage>> c) {
final recMessage = c[0].payload as MqttPublishMessage;
final topic = c[0].topic;
final payload = recMessage.payload.message;
final content = payload != null ? utf8.decode(payload.toList()) : '';
print('Message arrived, topic=$topic, QoS=${recMessage.payload.header!.qos.index} content=[$content]');
});

// 发布消息
for (var i = 0; i < total; i++) {
final builder = MqttClientPayloadBuilder();
builder.addString('Hello MQTT $i');
print('Prepare to publish message $i');
client.publishMessage(pubTopic, MqttQos.atLeastOnce, builder.payload!);
print('Published message $i');
await Future.delayed(Duration(seconds: 3));
}

await Future.delayed(Duration(seconds: 3));
client.disconnect();
} else {
print('Connection failed - status is ${client.connectionStatus}');
client.disconnect();
}
}