在我的ASP.Net核心6应用程序中,一个名为MqttClientService的BackgroundService任务运行一个MQTTNet客户端,该客户端处理传入的mqqt消息,并使用消息进行响应以指示其成功。
我已经让来自MQTTNet回购的示例控制台应用程序使用了Console.ReadLine()
,但是这感觉就像对我的用例的攻击。是否有更好的方法来保持BackgroundService处理传入消息而不不断地重新启动?
有使用Asp.Net Core和MQTTNet版本3的示例,但是它使用由接口实现的句柄,而不是库现在使用的异步事件:MQTTNet升级指南。
如有任何信息将不胜感激,谢谢。
MqttClientService.cs in Services/
using MQTTnet;
using MQTTnet.Client;
using System.Text;
namespace MqttClientAspNetCore.Services
{
public class MqttClientService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
await Handle_Received_Application_Message();
}
}
public static async Task Handle_Received_Application_Message()
{
var mqttFactory = new MqttFactory();
using (var mqttClient = mqttFactory.CreateMqttClient())
{
var mqttClientOptions = new MqttClientOptionsBuilder()
.WithTcpServer("test.mosquitto.org")
.Build();
// Setup message handling before connecting so that queued messages
// are also handled properly.
mqttClient.ApplicationMessageReceivedAsync += e =>
{
Console.WriteLine("### RECEIVED APPLICATION MESSAGE ###");
Console.WriteLine($"+ Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
// Publish successful message in response
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic("keipalatest/1/resp")
.WithPayload("OK")
.Build();
mqttClient.PublishAsync(applicationMessage, CancellationToken.None);
Console.WriteLine("MQTT application message is published.");
return Task.CompletedTask;
};
await mqttClient.ConnectAsync(mqttClientOptions, CancellationToken.None);
var mqttSubscribeOptions = mqttFactory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f =>
{
f.WithTopic("keipalatest/1/post");
f.WithAtLeastOnceQoS();
})
.Build();
await mqttClient.SubscribeAsync(mqttSubscribeOptions, CancellationToken.None);
Console.WriteLine("MQTT client subscribed to topic.");
// The line below feels like a hack to keep background service from restarting
Console.ReadLine();
}
}
}
}
Program.cs
using MqttClientAspNetCore.Services;
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddHostedService<MqttClientService>();
var app = builder.Build();
// To check if web server is still responsive
app.MapGet("/", () =>
{
return "Hello World";
});
app.Run();
发布于 2022-09-16 07:33:48
不需要Console.ReadLine,甚至不需要循环。BackgroundService应用程序代码当ExecuteAsync返回时不会终止。如果您希望应用程序在ExecuteAsync
终止时终止,则必须通过IApplicationLifecycle
接口实际告诉它。
当我第一次尝试为命令行工具使用通用主机时,我发现这是一种困难的方式。似乎永远挂着..。
ExecuteAsync
可以用来设置MQTT客户端和事件处理程序,并让它们工作。只有在调用StopAsync
时,代码才会终止。即使这样,这也是通过发送取消令牌的信号来完成的,而不是通过中止某个工作线程来完成的。
客户端本身可以构建在构造函数中,例如使用配置设置。在ConnectAsync
中只需要调用ExecuteAsync
。
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await _client.ConnectAsync(_clientOptions, CancellationToken.None);
_logger.LogInformation("Connected");
await _client.SubscribeAsync(_subscriptionOptions, CancellationToken.None);
_logger.LogInformation("Subscribed");
}
当调用StopAsync并触发取消令牌时,服务代码将停止。当发生这种情况时,可以使用stoppingToken.Register
来调用_client.DisconnectAsync
,但是Register
不接受异步委托。更好的选择是重写StopAsync
本身:
public virtual async Task StopAsync(CancellationToken cancellationToken)
{
await _client.DisconnectAsync();
await base.StopAsync(cancellationToken);
}
构造函数可以创建客户端并注册消息处理程序。
public class MqttClientService : BackgroundService
{
ILogger<MqttClientService> _logger;
IMqttClient _client=client;
MqttClientOptions _clientOptions;
MqttSubscriptionOptions _subscriptionOptions;
string _topic;
public MqttClientService(IOptions<MyMqttOptions> options,
ILogger<MqttClientService> logger)
{
_logger=logger;
_topic=options.Value.Topic;
var factory = new MqttFactory();
_client = factory.CreateMqttClient();
_clientOptions = new MqttClientOptionsBuilder()
.WithTcpServer(options.Value.Address)
.Build();
_subscriptionOptions = factory.CreateSubscribeOptionsBuilder()
.WithTopicFilter(f =>
{
f.WithTopic(options.Value.Topic);
f.WithAtLeastOnceQoS();
})
.Build();
_client.ApplicationMessageReceivedAsync += HandleMessageAsync;
}
接收到的消息由HandleMessageAsync
方法处理:
async Task HandleMessageAsync(ApplicationMessageProcessedEventArgs e)
{
var payload=Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
_logger.LogInformation("### RECEIVED APPLICATION MESSAGE ###\n{payload}",payload);
var applicationMessage = new MqttApplicationMessageBuilder()
.WithTopic(_topic)
.WithPayload("OK")
.Build();
await _client.PublishAsync(applicationMessage, CancellationToken.None);
_logger.LogInformation("MQTT application message is published.");
}
最后,由于BackgroundService
实现了IDisposable
,所以我们可以使用Dispose
来释放_client
实例:
public void Dispose()
{
Dispose(true);
}
protected virtual Dispose(bool disposing)
{
if(disposing)
{
_client.Dispose();
base.Dispose();
}
_client=null;
}
发布于 2022-09-15 17:14:19
如果您的服务没有其他有用的功能,它可以等待CancellationToken
启动:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
try
{
await Handle_Received_Application_Message(stoppingToken);
}
catch (OperationCanceledException) { }
}
public static async Task Handle_Received_Application_Message(CancellationToken cancellationToken)
{
...
Console.WriteLine("MQTT client subscribed to topic.");
await Task.Delay(Timeout.Infinite, cancellationToken);
}
https://stackoverflow.com/questions/73733965
复制相似问题