.NET

最近更新时间:2025-11-27 16:21:22

我的收藏

功能概述

MQTTnet 是一个高性能的 .NET MQTT 客户端库,支持 MQTT 协议 3.1.1 和 5.0 版本。该库完全使用 C# 编写,提供了功能齐全的 MQTT 客户端和服务器实现。

主要特性

支持 .NET Framework 4.5.2+ 和 .NET Core/NET 5+
同时支持同步和异步操作
完整支持 MQTT v3.1.1 和 v5.0 协议
支持 TCP、WebSocket 和 TLS/SSL 加密连接
支持共享订阅、保留消息、遗嘱消息等高级特性

云资源准备

请您先参见相关文档完成云资源准备,获取以下信息:
MQTT Broker 地址
端口号(通常为 1883 或 8883)
用户名和密码(如需要认证)
TLS/SSL 证书(如使用加密连接)

环境准备

系统要求

.NET 6.0 或更高版本(推荐)
.NET Framework 4.5.2+ 或 .NET Core 2.0+

安装 MQTTnet SDK

方法一:使用 NuGet 包管理器
dotnet add package MQTTnet
方法二:通过 Visual Studio
1.1 右键单击项目 > "管理 NuGet 程序包"
1.2 搜索 "MQTTnet"
1.3 安装最新稳定版本
方法三:编辑 .csproj 文件

<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="MQTTnet" Version="4.3.7.1207" />
</ItemGroup>
</Project>

示例代码

MQTT5
MQTT5 TLS
MQTT3.1.1
MQTT3.1.1 TLS
using System;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;

class MQTT5_TCP_Sample
{
static async Task Main(string[] args)
{
// ============ 连接配置 ============
String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";
String clientId = "QuickStart";
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";
String pubTopic = "home/test";
String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };
int[] qos = new int[] { 1, 1, 1 };

// 解析URI
var uri = new Uri(serverUri);
// 创建客户端
var factory = new MqttFactory();
var client = factory.CreateMqttClient();

// 配置连接选项
var options = new MqttClientOptionsBuilder()
.WithTcpServer(uri.Host, uri.Port)
.WithCredentials(username, password)
.WithClientId(clientId)
.WithProtocolVersion(MqttProtocolVersion.V500)
.WithCleanStart(true)
.Build();

// 消息接收处理
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};

try
{
// 1. 连接
await client.ConnectAsync(options);
Console.WriteLine("已连接");

// 2. 订阅
for (int i = 0; i < topicFilters.Length; i++)
{
await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i]))
.Build());
}
Console.WriteLine("已订阅");

// 3. 发布
for (int i = 1; i <= 16; i++)
{
await client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(pubTopic)
.WithPayload($"消息 #{i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
await Task.Delay(500);
}
Console.WriteLine("发布完成");

// 4. 等待接收
await Task.Delay(2000);

// 5. 断开连接
await client.DisconnectAsync();
Console.WriteLine("已断开");
}
catch (Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
}
}

using System;
using System.Text;
using System.Threading.Tasks;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;

class MQTT5_TLS_Sample
{
static async Task Main(string[] args)
{

// ============ 连接配置 ============
String serverUri = "ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883";
String clientId = "QuickStart";
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";
String pubTopic = "home/test";
String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };
int[] qos = new int[] { 1, 1, 1 };

// CA证书路径(可选,用于验证服务器证书)
String caCertPath = null; // 例如: "/path/to/ca.crt"

// 解析URI
var uri = new Uri(serverUri);
// 创建客户端
var factory = new MqttFactory();
var client = factory.CreateMqttClient();

// 配置连接选项(包含TLS)
var options = new MqttClientOptionsBuilder()
.WithTcpServer(uri.Host, uri.Port)
.WithCredentials(username, password)
.WithClientId(clientId)
.WithProtocolVersion(MqttProtocolVersion.V500)
.WithCleanStart(true)
.WithTlsOptions(tls =>
{
tls.UseTls();
tls.WithSslProtocols(SslProtocols.Tls12 | SslProtocols.Tls13);
// 如果提供了CA证书,加载它
if (!string.IsNullOrEmpty(caCertPath))
{
var caCert = new X509Certificate2(caCertPath);
tls.WithClientCertificates(new[] { caCert });
}
// 证书验证回调
tls.WithCertificateValidationHandler(context =>
{
var certificate = context.Certificate as X509Certificate2;
// 测试环境: 接受所有证书
// return true;
// 生产环境: 严格证书验证
return ValidateServerCertificate(certificate, context.Chain, context.SslPolicyErrors);
});
})
.Build();

// 消息接收处理
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};

try
{
// 1. 连接
await client.ConnectAsync(options);
Console.WriteLine("已连接(TLS加密)");

// 2. 订阅
for (int i = 0; i < topicFilters.Length; i++)
{
await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i]))
.Build());
}
Console.WriteLine("已订阅");

// 3. 发布
for (int i = 1; i <= 16; i++)
{
await client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(pubTopic)
.WithPayload($"消息 #{i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
await Task.Delay(500);
}
Console.WriteLine("发布完成");

// 4. 等待接收
await Task.Delay(2000);

// 5. 断开连接
await client.DisconnectAsync();
Console.WriteLine("已断开");
}
catch (Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
}

/// <summary>
/// 验证服务器证书
/// </summary>
static bool ValidateServerCertificate(X509Certificate2 certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
// 如果没有错误,直接通过
if (sslPolicyErrors == SslPolicyErrors.None)
{
return true;
}

// 如果证书为空,拒绝
if (certificate == null)
{
Console.WriteLine("证书验证失败: 证书为空");
return false;
}

// 1. 检查证书有效期
DateTime now = DateTime.Now;
if (now < certificate.NotBefore || now > certificate.NotAfter)
{
Console.WriteLine($"证书验证失败: 证书已过期或尚未生效 (有效期: {certificate.NotBefore} - {certificate.NotAfter})");
return false;
}

// 2. 检查证书链
if (chain != null && chain.ChainStatus.Length > 0)
{
foreach (var status in chain.ChainStatus)
{
// 忽略离线吊销检查错误(可选)
if (status.Status == X509ChainStatusFlags.RevocationStatusUnknown ||
status.Status == X509ChainStatusFlags.OfflineRevocation)
{
continue;
}
if (status.Status != X509ChainStatusFlags.NoError)
{
Console.WriteLine($"证书验证失败: {status.StatusInformation}");
return false;
}
}
}

// 3. 检查证书主题名称(可选,根据实际需求)
// string expectedSubject = "CN=*.mqtt.tencenttdmq.com";
// if (!certificate.Subject.Contains(expectedSubject))
// {
// Console.WriteLine($"证书验证失败: 主题不匹配 (期望: {expectedSubject}, 实际: {certificate.Subject})");
// return false;
// }

Console.WriteLine("证书验证通过");
return true;
}
}



using System;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;

class MQTT311_TCP_Sample
{
static async Task Main(string[] args)
{
// ============ 连接配置 ============
String serverUri = "tcp://mqtt-xxx.mqtt.tencenttdmq.com:1883";
String clientId = "QuickStart";
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";
String pubTopic = "home/test";
String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };
int[] qos = new int[] { 1, 1, 1 };

// 解析URI
var uri = new Uri(serverUri);
// 创建客户端
var factory = new MqttFactory();
var client = factory.CreateMqttClient();

// 配置连接选项
var options = new MqttClientOptionsBuilder()
.WithTcpServer(uri.Host, uri.Port)
.WithCredentials(username, password)
.WithClientId(clientId)
.WithProtocolVersion(MqttProtocolVersion.V311)
.WithCleanSession(true)
.Build();

// 消息接收处理
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};

try
{
// 1. 连接
await client.ConnectAsync(options);
Console.WriteLine("已连接");

// 2. 订阅
for (int i = 0; i < topicFilters.Length; i++)
{
await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i]))
.Build());
}
Console.WriteLine("已订阅");

// 3. 发布
for (int i = 1; i <= 16; i++)
{
await client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(pubTopic)
.WithPayload($"消息 #{i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
await Task.Delay(500);
}
Console.WriteLine("发布完成");

// 4. 等待接收
await Task.Delay(2000);

// 5. 断开连接
await client.DisconnectAsync();
Console.WriteLine("已断开");
}
catch (Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
}
}


using System;
using System.Text;
using System.Threading.Tasks;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Net.Security;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Protocol;
using MQTTnet.Formatter;

class MQTT311_TLS_Sample
{
static async Task Main(string[] args)
{

// ============ 连接配置 ============
String serverUri = "ssl://mqtt-xxx.mqtt.tencenttdmq.com:8883";
String clientId = "QuickStart";
String username = "YOUR_USERNAME";
String password = "YOUR_PASSWORD";
String pubTopic = "home/test";
String[] topicFilters = new String[] { pubTopic, "home/#", "home/+" };
int[] qos = new int[] { 1, 1, 1 };

// CA证书路径(可选,用于验证服务器证书)
String caCertPath = null; // 例如: "/path/to/ca.crt"

// 解析URI
var uri = new Uri(serverUri);
// 创建客户端
var factory = new MqttFactory();
var client = factory.CreateMqttClient();

// 配置连接选项(包含TLS)
var options = new MqttClientOptionsBuilder()
.WithTcpServer(uri.Host, uri.Port)
.WithCredentials(username, password)
.WithClientId(clientId)
.WithProtocolVersion(MqttProtocolVersion.V311)
.WithCleanSession(true)
.WithTlsOptions(tls =>
{
tls.UseTls();
tls.WithSslProtocols(SslProtocols.Tls12 | SslProtocols.Tls13);
// 如果提供了CA证书,加载它
if (!string.IsNullOrEmpty(caCertPath))
{
var caCert = new X509Certificate2(caCertPath);
tls.WithClientCertificates(new[] { caCert });
}
// 证书验证回调
tls.WithCertificateValidationHandler(context =>
{
var certificate = context.Certificate as X509Certificate2;
// 测试环境: 接受所有证书
// return true;
// 生产环境: 严格证书验证
return ValidateServerCertificate(certificate, context.Chain, context.SslPolicyErrors);
});
})
.Build();

// 消息接收处理
client.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine($"收到消息: {e.ApplicationMessage.Topic} -> {Encoding.UTF8.GetString(e.ApplicationMessage.PayloadSegment)}");
return Task.CompletedTask;
};

try
{
// 1. 连接
await client.ConnectAsync(options);
Console.WriteLine("已连接(TLS加密)");

// 2. 订阅
for (int i = 0; i < topicFilters.Length; i++)
{
await client.SubscribeAsync(new MqttClientSubscribeOptionsBuilder()
.WithTopicFilter(f => f.WithTopic(topicFilters[i]).WithQualityOfServiceLevel((MqttQualityOfServiceLevel)qos[i]))
.Build());
}
Console.WriteLine("已订阅");

// 3. 发布
for (int i = 1; i <= 16; i++)
{
await client.PublishAsync(new MqttApplicationMessageBuilder()
.WithTopic(pubTopic)
.WithPayload($"消息 #{i}")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build());
await Task.Delay(500);
}
Console.WriteLine("发布完成");

// 4. 等待接收
await Task.Delay(2000);

// 5. 断开连接
await client.DisconnectAsync();
Console.WriteLine("已断开");
}
catch (Exception ex)
{
Console.WriteLine($"错误: {ex.Message}");
}
}

/// <summary>
/// 验证服务器证书
/// </summary>
static bool ValidateServerCertificate(X509Certificate2 certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors)
{
// 如果没有错误,直接通过
if (sslPolicyErrors == SslPolicyErrors.None)
{
return true;
}

// 如果证书为空,拒绝
if (certificate == null)
{
Console.WriteLine("证书验证失败: 证书为空");
return false;
}

// 1. 检查证书有效期
DateTime now = DateTime.Now;
if (now < certificate.NotBefore || now > certificate.NotAfter)
{
Console.WriteLine($"证书验证失败: 证书已过期或尚未生效 (有效期: {certificate.NotBefore} - {certificate.NotAfter})");
return false;
}

// 2. 检查证书链
if (chain != null && chain.ChainStatus.Length > 0)
{
foreach (var status in chain.ChainStatus)
{
// 忽略离线吊销检查错误(可选)
if (status.Status == X509ChainStatusFlags.RevocationStatusUnknown ||
status.Status == X509ChainStatusFlags.OfflineRevocation)
{
continue;
}
if (status.Status != X509ChainStatusFlags.NoError)
{
Console.WriteLine($"证书验证失败: {status.StatusInformation}");
return false;
}
}
}

// 3. 检查证书主题名称(可选,根据实际需求)
// string expectedSubject = "CN=*.mqtt.tencenttdmq.com";
// if (!certificate.Subject.Contains(expectedSubject))
// {
// Console.WriteLine($"证书验证失败: 主题不匹配 (期望: {expectedSubject}, 实际: {certificate.Subject})");
// return false;
// }

Console.WriteLine("证书验证通过");
return true;
}
}




参数说明

参数
说明
ADDRESS
broker 连接地址,在控制台目标集群基本信息 > 接入信息模块中复制。位置如下图所示。格式:mqtt-xxx-gz.mqtt.qcloud.tencenttdmq.com:1883。

CLIENTID
客户端 ID,在控制台集群详情页客户端管理页面获取。

USERNAME
用户名,在控制台认证管理页面获取;
PASSWORD
密码,在控制台认证管理页面获取;