整个项目结构很简单

MQTT_Connector类库就只有2个文件。直接上代码

Worker用于实现长时间运行的类。
P.S.
BackgroundService :用于实现长时间运行的 IHostedService 的基类。
using MQTTnet;
using MQTTnet.Client;
using System.Text;
using System.Text.Json;
namespace MQTT_Connector;
public class Worker : BackgroundService
{
private readonly MQTTService MqttService;
private readonly MqttFactory MqttFactory;
private readonly ILogger<Worker> Logger;
public TObject DumpToConsole<TObject>( TObject @object)
{
var output = "NULL";
if (@object != null)
{
output = JsonSerializer.Serialize(@object, new JsonSerializerOptions
{
WriteIndented = true
});
}
Logger.LogInformation($"[{@object?.GetType().Name}]:\r\n{output}");
return @object;
}
public Worker(MQTTService mQTTService, ILogger<Worker> logger)
{
this.MqttService = mQTTService;
this.MqttFactory = new MqttFactory();
this.Logger = logger;
}
public async Task Update(string message)
{
await MqttService.RefreshAsync(message);
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
using IMqttClient mqttClient = MqttFactory.CreateMqttClient(); // 1. 创建 MQTT 客户端
// 2 . 设置 MQTT 客户端选项 (// 设置服务器端地址// 设置鉴权参数// 创建选项)
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("localhost")
// .WithCredentials( "admin","public")
.Build();
var response = await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
Logger.LogInformation($"The MQTT client is connected.");
DumpToConsole(response);
var mqttSubscribeOptions = MqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f => { f.WithTopic("#"); })
.Build();
var subscribe= await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
//Logger.LogInformation($"{string.Join("/",mqttSubscribeOptions.TopicFilters)}");
DumpToConsole(subscribe);
mqttClient.ApplicationMessageReceivedAsync += async delegate (MqttApplicationMessageReceivedEventArgs args)
{
var mqttMessage = "";
if (args.ApplicationMessage.Payload is null)
{
mqttMessage = "Empty message";
}
else
{
mqttMessage = Encoding.UTF8.GetString(args.ApplicationMessage.Payload);
}
//Console.WriteLine("Console Response = {0}", mqttMessage);
Logger.LogInformation($"{args.ApplicationMessage.Topic} ={ mqttMessage}" );
mqttMessage = ($"{args.ApplicationMessage.Topic} ={mqttMessage}");
await Update(mqttMessage).ConfigureAwait(false);
};
while (!stoppingToken.IsCancellationRequested)
{
try
{
// Logger.LogInformation("心跳时间5秒");
if (!await mqttClient.TryPingAsync(cancellationToken: stoppingToken))
{
await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
// Subscribe to topics when session is clean etc.
Logger.LogInformation("The MQTT client is connected.");
await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
}
else
{
Logger.LogInformation("心跳测试OK");
}
}
catch (Exception ex)
{
// Some message here
Logger.LogInformation(ex.Message);
}
finally
{
// Delay 5 seconds
// Logger.LogInformation($" Delay 5 seconds");
await Task.Delay(5 * 1000, stoppingToken);
}
}
}
}
MQTTService主要是订阅信息的解析。
namespace MQTT_Connector
{
public class MQTTService
{
public string? Message { get; set; }
public event Func<Task> Notify;
public async Task RefreshAsync(string message)
{
if (Notify is { })
{
Message = message;
await Notify.Invoke();
}
}
}
}Blazor前端项目基于官方模板生成,引入MQTT_Connector项目

注册服务
builder.Services.AddHostedService<Worker>();
builder.Services.AddSingleton<MQTTService>();
添加页面

@inject MQTTService mqttService
@implements IDisposable
@page "/dashboard"
<PageTitle>Dashboard</PageTitle>
<div>
<h1>Message</h1>
<p role="status">Current message: @mqttService.Message</p>
</div>
@code {
protected override void OnInitialized() => mqttService.Notify += OnNotify;
public async Task OnNotify() => await InvokeAsync(() =>{StateHasChanged();});
public void Dispose()
{
mqttService.Notify -= OnNotify;
}
}运行


视频演示:http://mpvideo.qpic.cn/0bc3gmabkaaaxeahawpaqrrvam6dcuzqafia.f10002.mp4?dis_k=f85c715e3293312c32e0b91c20c4df82&dis_t=1677572713&play_scene=10400&vid=wxv_2785993811067076608&format_id=10002&support_redirect=0&mmversion=false