首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >[Blazor] 基于MQTTnet实现MQTT客户端

[Blazor] 基于MQTTnet实现MQTT客户端

作者头像
科控物联
发布2023-02-28 16:26:17
发布2023-02-28 16:26:17
2.7K0
举报
文章被收录于专栏:科控自动化科控自动化

整个项目结构很简单

MQTT_Connector类库就只有2个文件。直接上代码

Worker用于实现长时间运行的类。

P.S.

BackgroundService :用于实现长时间运行的 IHostedService 的基类。

代码语言:javascript
复制
using MQTTnet;
using MQTTnet.Client;
using System.Text;
using System.Text.Json;

namespace MQTT_Connector;

public class Worker : BackgroundService
{
    private readonly MQTTService MqttService;
    private readonly MqttFactory MqttFactory;
    private readonly ILogger<Worker> Logger;
    public  TObject DumpToConsole<TObject>( TObject @object)
    {
        var output = "NULL";
        if (@object != null)
        {
            output = JsonSerializer.Serialize(@object, new JsonSerializerOptions
            {
                WriteIndented = true
            });
        }

        Logger.LogInformation($"[{@object?.GetType().Name}]:\r\n{output}");
        return @object;
    }
    public Worker(MQTTService mQTTService, ILogger<Worker> logger)
    {
        this.MqttService = mQTTService;
        this.MqttFactory = new MqttFactory();
        this.Logger = logger;
    }

    public async Task Update(string message)
    {
        await MqttService.RefreshAsync(message);
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {

        using IMqttClient mqttClient = MqttFactory.CreateMqttClient();        // 1. 创建 MQTT 客户端
        // 2 . 设置 MQTT 客户端选项 (// 设置服务器端地址// 设置鉴权参数// 创建选项)
        var mqttClientOptions = new MqttClientOptionsBuilder()                  
            .WithTcpServer("localhost")                             
           // .WithCredentials( "admin","public")                   
            .Build();

        var response = await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
        Logger.LogInformation($"The MQTT client is connected.");
        DumpToConsole(response);

         var mqttSubscribeOptions = MqttFactory.CreateSubscribeOptionsBuilder()
            .WithTopicFilter(f => { f.WithTopic("#"); })
            .Build();

      var subscribe=  await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);

        //Logger.LogInformation($"{string.Join("/",mqttSubscribeOptions.TopicFilters)}");
        DumpToConsole(subscribe);
        mqttClient.ApplicationMessageReceivedAsync += async delegate (MqttApplicationMessageReceivedEventArgs args)
        {
            var mqttMessage = "";
            if (args.ApplicationMessage.Payload is null)
            {
                mqttMessage = "Empty message";
            }
            else
            {
                mqttMessage = Encoding.UTF8.GetString(args.ApplicationMessage.Payload);
            }

            //Console.WriteLine("Console Response = {0}", mqttMessage);
            Logger.LogInformation($"{args.ApplicationMessage.Topic} ={ mqttMessage}" );
            mqttMessage = ($"{args.ApplicationMessage.Topic} ={mqttMessage}");

            await Update(mqttMessage).ConfigureAwait(false);
        };

        while (!stoppingToken.IsCancellationRequested)
        {
            try
            {
               // Logger.LogInformation("心跳时间5秒");
                if (!await mqttClient.TryPingAsync(cancellationToken: stoppingToken))
                {
                    await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
                    // Subscribe to topics when session is clean etc.
                    Logger.LogInformation("The MQTT client is connected.");
                    await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
                    
                }
                else
                {
                     Logger.LogInformation("心跳测试OK");
                }
            }
            catch (Exception ex)
            {
                // Some message here
                Logger.LogInformation(ex.Message);
            }
            finally
            {
                // Delay 5 seconds
             //   Logger.LogInformation($" Delay 5 seconds");
                await Task.Delay(5 * 1000, stoppingToken);
            }
        }
    }
}

MQTTService主要是订阅信息的解析。

代码语言:javascript
复制
namespace MQTT_Connector
{
    public class MQTTService
    {
        public string? Message { get; set; }
        public event Func<Task> Notify;

        public async Task RefreshAsync(string message)
        {
            if (Notify is { })
            {
                Message = message;
                await Notify.Invoke();
            }
        }
    }
}

Blazor前端项目基于官方模板生成,引入MQTT_Connector项目

注册服务

代码语言:javascript
复制
builder.Services.AddHostedService<Worker>();
builder.Services.AddSingleton<MQTTService>();

添加页面

代码语言:javascript
复制
@inject MQTTService mqttService
@implements IDisposable
@page "/dashboard"
<PageTitle>Dashboard</PageTitle>
<div>
    <h1>Message</h1>

    <p role="status">Current message: @mqttService.Message</p>
</div>

@code {
    protected override  void OnInitialized() => mqttService.Notify += OnNotify;

    public async Task OnNotify() => await InvokeAsync(() =>{StateHasChanged();});

    public void Dispose()
    {
        mqttService.Notify -= OnNotify;
    }
}

运行

视频演示:http://mpvideo.qpic.cn/0bc3gmabkaaaxeahawpaqrrvam6dcuzqafia.f10002.mp4?dis_k=f85c715e3293312c32e0b91c20c4df82&dis_t=1677572713&play_scene=10400&vid=wxv_2785993811067076608&format_id=10002&support_redirect=0&mmversion=false

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-02-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 科控物联 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档