功能概述
MQTTnet 是一个高性能的 .NET MQTT 客户端库,支持 MQTT 协议 3.1.1 和 5.0 版本。该库完全使用 C# 编写,提供了功能齐全的 MQTT 客户端和服务器实现。
主要特性
支持 .NET Framework 4.5.2+ 和 .NET Core/NET 5+
同时支持同步和异步操作
完整支持 MQTT v3.1.1 和 v5.0 协议
支持 TCP、WebSocket 和 TLS/SSL 加密连接
支持共享订阅、保留消息、遗嘱消息等高级特性
云资源准备
请您先参见相关文档完成云资源准备,获取以下信息:
MQTT Broker 地址
端口号(通常为 1883 或 8883)
用户名和密码(如需要认证)
TLS/SSL 证书(如使用加密连接)
环境准备
系统要求
.NET 6.0 或更高版本(推荐)
.NET Framework 4.5.2+ 或 .NET Core 2.0+
安装 MQTTnet SDK
方法一:使用 NuGet 包管理器
dotnet add package MQTTnet
方法二:通过 Visual Studio
1.1 右键单击项目 > "管理 NuGet 程序包"
1.2 搜索 "MQTTnet"
1.3 安装最新稳定版本
方法三:编辑 .csproj 文件
<Project Sdk="Microsoft.NET.Sdk"><PropertyGroup><OutputType>Exe</OutputType><TargetFramework>net6.0</TargetFramework></PropertyGroup><ItemGroup><PackageReference Include="MQTTnet" Version="4.3.7.1207" /></ItemGroup></Project>
示例代码
using System;using System.Text;using System.Threading.Tasks;using MQTTnet;using MQTTnet.Client;using MQTTnet.Protocol;using MQTTnet.Formatter;class MQTT5_TCP_Sample{static async Task Main(string[] args){// ============ 连接配置 ============String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";String clientId = "QuickStart";String username = "YOUR_USERNAME";String password = "YOUR_PASSWORD";String pubTopic = "home/test";String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };int[] qos = new int[] { 1, 1, 1 };// 解析URIvar uri = new Uri(serverUri);// 创建客户端var factory = new MqttFactory();var client = factory.CreateMqttClient();// 配置连接选项var options = new MqttClientOptionsBuilder().WithTcpServer(uri.Host, uri.Port).WithCredentials(username, password).WithClientId(clientId).WithProtocolVersion(MqttProtocolVersion.V500).WithCleanStart(true).Build();// 消息接收处理client.ApplicationMessageReceivedAsync += e =>{Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");return Task.CompletedTask;};try{// 1. 连接await client.ConnectAsync(options);Console.WriteLine("已连接");// 2. 订阅for (int i = 0; i < topicFilters.Length; i++){await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i])).Build());}Console.WriteLine("已订阅");// 3. 发布for (int i = 1; i <= 16; i++){await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(pubTopic).WithPayload($"消息 #{i}").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build());await Task.Delay(500);}Console.WriteLine("发布完成");// 4. 等待接收await Task.Delay(2000);// 5. 断开连接await client.DisconnectAsync();Console.WriteLine("已断开");}catch (Exception ex){Console.WriteLine($"错误: {ex.Message}");}}}
using System;using System.Text;using System.Threading.Tasks;using System.Security.Authentication;using System.Security.Cryptography.X509Certificates;using System.Net.Security;using MQTTnet;using MQTTnet.Client;using MQTTnet.Protocol;using MQTTnet.Formatter;class MQTT5_TLS_Sample{static async Task Main(string[] args){// ============ 连接配置 ============String serverUri = "ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883";String clientId = "QuickStart";String username = "YOUR_USERNAME";String password = "YOUR_PASSWORD";String pubTopic = "home/test";String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };int[] qos = new int[] { 1, 1, 1 };// CA证书路径(可选,用于验证服务器证书)String caCertPath = null; // 例如: "/path/to/ca.crt"// 解析URIvar uri = new Uri(serverUri);// 创建客户端var factory = new MqttFactory();var client = factory.CreateMqttClient();// 配置连接选项(包含TLS)var options = new MqttClientOptionsBuilder().WithTcpServer(uri.Host, uri.Port).WithCredentials(username, password).WithClientId(clientId).WithProtocolVersion(MqttProtocolVersion.V500).WithCleanStart(true).WithTlsOptions(tls =>{tls.UseTls();tls.WithSslProtocols(SslProtocols.Tls12 | SslProtocols.Tls13);// 如果提供了CA证书,加载它if (!string.IsNullOrEmpty(caCertPath)){var caCert = new X509Certificate2(caCertPath);tls.WithClientCertificates(new[] { caCert });}// 证书验证回调tls.WithCertificateValidationHandler(context =>{var certificate = context.Certificate as X509Certificate2;// 测试环境: 接受所有证书// return true;// 生产环境: 严格证书验证return ValidateServerCertificate(certificate, context.Chain, context.SslPolicyErrors);});}).Build();// 消息接收处理client.ApplicationMessageReceivedAsync += e =>{Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");return Task.CompletedTask;};try{// 1. 连接await client.ConnectAsync(options);Console.WriteLine("已连接(TLS加密)");// 2. 订阅for (int i = 0; i < topicFilters.Length; i++){await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i])).Build());}Console.WriteLine("已订阅");// 3. 发布for (int i = 1; i <= 16; i++){await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(pubTopic).WithPayload($"消息 #{i}").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build());await Task.Delay(500);}Console.WriteLine("发布完成");// 4. 等待接收await Task.Delay(2000);// 5. 断开连接await client.DisconnectAsync();Console.WriteLine("已断开");}catch (Exception ex){Console.WriteLine($"错误: {ex.Message}");}}/// <summary>/// 验证服务器证书/// </summary>static bool ValidateServerCertificate(X509Certificate2 certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors){// 如果没有错误,直接通过if (sslPolicyErrors == SslPolicyErrors.None){return true;}// 如果证书为空,拒绝if (certificate == null){Console.WriteLine("证书验证失败: 证书为空");return false;}// 1. 检查证书有效期DateTime now = DateTime.Now;if (now < certificate.NotBefore || now > certificate.NotAfter){Console.WriteLine($"证书验证失败: 证书已过期或尚未生效 (有效期: {certificate.NotBefore} - {certificate.NotAfter})");return false;}// 2. 检查证书链if (chain != null && chain.ChainStatus.Length > 0){foreach (var status in chain.ChainStatus){// 忽略离线吊销检查错误(可选)if (status.Status == X509ChainStatusFlags.RevocationStatusUnknown ||status.Status == X509ChainStatusFlags.OfflineRevocation){continue;}if (status.Status != X509ChainStatusFlags.NoError){Console.WriteLine($"证书验证失败: {status.StatusInformation}");return false;}}}// 3. 检查证书主题名称(可选,根据实际需求)// string expectedSubject = "CN=*.mqtt.tencenttdmq.com";// if (!certificate.Subject.Contains(expectedSubject))// {// Console.WriteLine($"证书验证失败: 主题不匹配 (期望: {expectedSubject}, 实际: {certificate.Subject})");// return false;// }Console.WriteLine("证书验证通过");return true;}}
using System;using System.Text;using System.Threading.Tasks;using MQTTnet;using MQTTnet.Client;using MQTTnet.Protocol;using MQTTnet.Formatter;class MQTT311_TCP_Sample{static async Task Main(string[] args){// ============ 连接配置 ============String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";String clientId = "QuickStart";String username = "YOUR_USERNAME";String password = "YOUR_PASSWORD";String pubTopic = "home/test";String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };int[] qos = new int[] { 1, 1, 1 };// 解析URIvar uri = new Uri(serverUri);// 创建客户端var factory = new MqttFactory();var client = factory.CreateMqttClient();// 配置连接选项var options = new MqttClientOptionsBuilder().WithTcpServer(uri.Host, uri.Port).WithCredentials(username, password).WithClientId(clientId).WithProtocolVersion(MqttProtocolVersion.V311).WithCleanSession(true).Build();// 消息接收处理client.ApplicationMessageReceivedAsync += e =>{Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");return Task.CompletedTask;};try{// 1. 连接await client.ConnectAsync(options);Console.WriteLine("已连接");// 2. 订阅for (int i = 0; i < topicFilters.Length; i++){await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i])).Build());}Console.WriteLine("已订阅");// 3. 发布for (int i = 1; i <= 16; i++){await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(pubTopic).WithPayload($"消息 #{i}").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build());await Task.Delay(500);}Console.WriteLine("发布完成");// 4. 等待接收await Task.Delay(2000);// 5. 断开连接await client.DisconnectAsync();Console.WriteLine("已断开");}catch (Exception ex){Console.WriteLine($"错误: {ex.Message}");}}}
using System;using System.Text;using System.Threading.Tasks;using System.Security.Authentication;using System.Security.Cryptography.X509Certificates;using System.Net.Security;using MQTTnet;using MQTTnet.Client;using MQTTnet.Protocol;using MQTTnet.Formatter;class MQTT311_TLS_Sample{static async Task Main(string[] args){// ============ 连接配置 ============String serverUri = "ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883";String clientId = "QuickStart";String username = "YOUR_USERNAME";String password = "YOUR_PASSWORD";String pubTopic = "home/test";String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };int[] qos = new int[] { 1, 1, 1 };// CA证书路径(可选,用于验证服务器证书)String caCertPath = null; // 例如: "/path/to/ca.crt"// 解析URIvar uri = new Uri(serverUri);// 创建客户端var factory = new MqttFactory();var client = factory.CreateMqttClient();// 配置连接选项(包含TLS)var options = new MqttClientOptionsBuilder().WithTcpServer(uri.Host, uri.Port).WithCredentials(username, password).WithClientId(clientId).WithProtocolVersion(MqttProtocolVersion.V311).WithCleanSession(true).WithTlsOptions(tls =>{tls.UseTls();tls.WithSslProtocols(SslProtocols.Tls12 | SslProtocols.Tls13);// 如果提供了CA证书,加载它if (!string.IsNullOrEmpty(caCertPath)){var caCert = new X509Certificate2(caCertPath);tls.WithClientCertificates(new[] { caCert });}// 证书验证回调tls.WithCertificateValidationHandler(context =>{var certificate = context.Certificate as X509Certificate2;// 测试环境: 接受所有证书// return true;// 生产环境: 严格证书验证return ValidateServerCertificate(certificate, context.Chain, context.SslPolicyErrors);});}).Build();// 消息接收处理client.ApplicationMessageReceivedAsync += e =>{Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");return Task.CompletedTask;};try{// 1. 连接await client.ConnectAsync(options);Console.WriteLine("已连接(TLS加密)");// 2. 订阅for (int i = 0; i < topicFilters.Length; i++){await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder().WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i])).Build());}Console.WriteLine("已订阅");// 3. 发布for (int i = 1; i <= 16; i++){await client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic(pubTopic).WithPayload($"消息 #{i}").WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).Build());await Task.Delay(500);}Console.WriteLine("发布完成");// 4. 等待接收await Task.Delay(2000);// 5. 断开连接await client.DisconnectAsync();Console.WriteLine("已断开");}catch (Exception ex){Console.WriteLine($"错误: {ex.Message}");}}/// <summary>/// 验证服务器证书/// </summary>static bool ValidateServerCertificate(X509Certificate2 certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors){// 如果没有错误,直接通过if (sslPolicyErrors == SslPolicyErrors.None){return true;}// 如果证书为空,拒绝if (certificate == null){Console.WriteLine("证书验证失败: 证书为空");return false;}// 1. 检查证书有效期DateTime now = DateTime.Now;if (now < certificate.NotBefore || now > certificate.NotAfter){Console.WriteLine($"证书验证失败: 证书已过期或尚未生效 (有效期: {certificate.NotBefore} - {certificate.NotAfter})");return false;}// 2. 检查证书链if (chain != null && chain.ChainStatus.Length > 0){foreach (var status in chain.ChainStatus){// 忽略离线吊销检查错误(可选)if (status.Status == X509ChainStatusFlags.RevocationStatusUnknown ||status.Status == X509ChainStatusFlags.OfflineRevocation){continue;}if (status.Status != X509ChainStatusFlags.NoError){Console.WriteLine($"证书验证失败: {status.StatusInformation}");return false;}}}// 3. 检查证书主题名称(可选,根据实际需求)// string expectedSubject = "CN=*.mqtt.tencenttdmq.com";// if (!certificate.Subject.Contains(expectedSubject))// {// Console.WriteLine($"证书验证失败: 主题不匹配 (期望: {expectedSubject}, 实际: {certificate.Subject})");// return false;// }Console.WriteLine("证书验证通过");return true;}}
参数说明
参数 | 说明 |
ADDRESS | broker 连接地址,在控制台目标集群基本信息 > 接入信息模块中复制。位置如下图所示。格式:mqtt-xxx-gz.mqtt.qcloud.tencenttdmq.com:1883。 ![]() |
CLIENTID | 客户端 ID,在控制台集群详情页客户端管理页面获取。 ![]() |
USERNAME | 用户名,在控制台认证管理页面获取; |
PASSWORD | 密码,在控制台认证管理页面获取; |

