首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使BackgroundService任务在没有Console.ReadLine()的情况下接收mqtt消息

如何使BackgroundService任务在没有Console.ReadLine()的情况下接收mqtt消息
EN

Stack Overflow用户
提问于 2022-09-15 15:41:07
回答 2查看 212关注 0票数 0

在我的ASP.Net核心6应用程序中,一个名为MqttClientService的BackgroundService任务运行一个MQTTNet客户端,该客户端处理传入的mqqt消息,并使用消息进行响应以指示其成功。

我已经让来自MQTTNet回购的示例控制台应用程序使用了Console.ReadLine(),但是这感觉就像对我的用例的攻击。是否有更好的方法来保持BackgroundService处理传入消息而不不断地重新启动?

使用Asp.Net Core和MQTTNet版本3的示例,但是它使用由接口实现的句柄,而不是库现在使用的异步事件:MQTTNet升级指南

如有任何信息将不胜感激,谢谢。

MqttClientService.cs in Services/

代码语言:javascript
运行
复制
using MQTTnet;
using MQTTnet.Client;
using System.Text;

namespace MqttClientAspNetCore.Services
{
    public class MqttClientService : BackgroundService
    {
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                await Handle_Received_Application_Message();
            }
        }

        public static async Task Handle_Received_Application_Message()
        {

            var mqttFactory = new MqttFactory();

            using (var mqttClient = mqttFactory.CreateMqttClient())
            {
                var mqttClientOptions = new MqttClientOptionsBuilder()
                    .WithTcpServer("test.mosquitto.org")
                    .Build();

                // Setup message handling before connecting so that queued messages
                // are also handled properly. 
                mqttClient.ApplicationMessageReceivedAsync += e =>
                {
                    Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
                    Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");

                    // Publish successful message in response
                    var applicationMessage = new MqttApplicationMessageBuilder()
                        .WithTopic("keipalatest/1/resp")
                        .WithPayload("OK")
                        .Build();

                    mqttClient.PublishAsync(applicationMessage, CancellationToken.None);

                    Console.WriteLine("MQTT application message is published.");

                    return Task.CompletedTask;
                };

                await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);

                var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
                    .WithTopicFilter(f =>
                    {
                        f.WithTopic("keipalatest/1/post");
                        f.WithAtLeastOnceQoS();
                    })
                    .Build();

                await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);

                Console.WriteLine("MQTT client subscribed to topic.");
                // The line below feels like a hack to keep background service from restarting
                Console.ReadLine();
            }
        }
    }
}

Program.cs

代码语言:javascript
运行
复制
using MqttClientAspNetCore.Services;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddHostedService<MqttClientService>();

var app = builder.Build();

// To check if web server is still responsive
app.MapGet("/", () =>
{
    return "Hello World";
});


app.Run();
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-09-16 07:33:48

不需要Console.ReadLine,甚至不需要循环。BackgroundService应用程序代码当ExecuteAsync返回时不会终止。如果您希望应用程序在ExecuteAsync终止时终止,则必须通过IApplicationLifecycle接口实际告诉它。

当我第一次尝试为命令行工具使用通用主机时,我发现这是一种困难的方式。似乎永远挂着..。

ExecuteAsync可以用来设置MQTT客户端和事件处理程序,并让它们工作。只有在调用StopAsync时,代码才会终止。即使这样,这也是通过发送取消令牌的信号来完成的,而不是通过中止某个工作线程来完成的。

客户端本身可以构建在构造函数中,例如使用配置设置。在ConnectAsync中只需要调用ExecuteAsync

代码语言:javascript
运行
复制
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{

    await _client.ConnectAsync(_clientOptions, CancellationToken.None);
    _logger.LogInformation("Connected");

    await _client.SubscribeAsync(_subscriptionOptions, CancellationToken.None);
    _logger.LogInformation("Subscribed");
}

当调用StopAsync并触发取消令牌时,服务代码将停止。当发生这种情况时,可以使用stoppingToken.Register来调用_client.DisconnectAsync,但是Register不接受异步委托。更好的选择是重写StopAsync本身:

代码语言:javascript
运行
复制
public virtual async Task StopAsync(CancellationToken cancellationToken)
{
    await _client.DisconnectAsync();
    await base.StopAsync(cancellationToken);
}

构造函数可以创建客户端并注册消息处理程序。

代码语言:javascript
运行
复制
public class MqttClientService : BackgroundService
{
    ILogger<MqttClientService> _logger;
    IMqttClient _client=client;

    MqttClientOptions _clientOptions;
    MqttSubscriptionOptions _subscriptionOptions;    
    string _topic;

    public MqttClientService(IOptions<MyMqttOptions> options, 
                            ILogger<MqttClientService> logger)
    {
        _logger=logger;
        _topic=options.Value.Topic;
        var factory = new MqttFactory();
        _client = factory.CreateMqttClient();
        _clientOptions = new MqttClientOptionsBuilder()
                        .WithTcpServer(options.Value.Address)
                        .Build();
        _subscriptionOptions = factory.CreateSubscribeOptionsBuilder()
                    .WithTopicFilter(f =>
                    {
                        f.WithTopic(options.Value.Topic);
                        f.WithAtLeastOnceQoS();
                    })
                    .Build();
        _client.ApplicationMessageReceivedAsync += HandleMessageAsync;
    }

接收到的消息由HandleMessageAsync方法处理:

代码语言:javascript
运行
复制
async Task HandleMessageAsync(ApplicationMessageProcessedEventArgs e)
{
    var payload=Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
    _logger.LogInformation("### RECEIVED APPLICATION MESSAGE ###\n{payload}",payload);
    var applicationMessage = new MqttApplicationMessageBuilder()
                    .WithTopic(_topic)
                    .WithPayload("OK")
                    .Build();

    await _client.PublishAsync(applicationMessage, CancellationToken.None);

    _logger.LogInformation("MQTT application message is published.");
}

最后,由于BackgroundService实现了IDisposable,所以我们可以使用Dispose来释放_client实例:

代码语言:javascript
运行
复制
public void Dispose()
{
    Dispose(true);
}

protected virtual Dispose(bool disposing)
{
    if(disposing)
    {
        _client.Dispose();
        base.Dispose();
    }
    _client=null;
}
票数 1
EN

Stack Overflow用户

发布于 2022-09-15 17:14:19

如果您的服务没有其他有用的功能,它可以等待CancellationToken启动:

代码语言:javascript
运行
复制
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
  try
  {
    await Handle_Received_Application_Message(stoppingToken);
  }
  catch (OperationCanceledException) { }
}

public static async Task Handle_Received_Application_Message(CancellationToken cancellationToken)
{
  ...
  Console.WriteLine("MQTT client subscribed to topic.");
  await Task.Delay(Timeout.Infinite, cancellationToken);
}
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73733965

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档