首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用来自队列的Azure服务总线消息的Windows服务

使用来自队列的Azure服务总线消息的Windows服务
EN

Stack Overflow用户
提问于 2020-12-04 09:06:19
回答 1查看 1.3K关注 0票数 1

我已经创建了一个简单的windows服务来使用Azure服务总线队列中的消息。我使用to创建了windows服务。下面是以下示例中的代码片段:https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

代码语言:javascript
运行
复制
var hf = HostFactory.New(x =>
{
    x.Service<ServiceBusHelper>(s =>
    {
        s.ConstructUsing(serviceProvider.GetService<ServiceBusHelper>);
        s.WhenStarted(async service => await service.ReceiveMessagesAsync());
        s.WhenStopped(async service => await service.Stop());
    });               

    x.RunAsNetworkService()
        .StartAutomatically()
        .EnableServiceRecovery(rc => rc.RestartService(1));

    x.SetServiceName("MyWindowsService");
    x.SetDisplayName("MyWindowsService");
    x.SetDescription("MyWindowsService");
});

hf.Run();

ServiceBusHelper类:

代码语言:javascript
运行
复制
public async Task ReceiveMessagesAsync()
{
    var connectionString = _configuration.GetValue<string>("ServiceBusConnectionString");
    var queueName = _configuration.GetValue<string>("ServiceBusQueueName");

    await using (ServiceBusClient client = new ServiceBusClient(connectionString))
    {       
        ServiceBusProcessor processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());        
        processor.ProcessMessageAsync += MessageHandler;        
        processor.ProcessErrorAsync += ErrorHandler;
        
        await processor.StartProcessingAsync();

        System.Threading.Thread.Sleep(1000);//Wait for a minute before stop processing
        
        await processor.StopProcessingAsync();               
    }
}

public async Task MessageHandler(ProcessMessageEventArgs args)
{
    string body = args.Message.Body.ToString(); 
    
    var messageBytes = Encoding.ASCII.GetBytes(body);
    ProcessMessage(messageBytes);   
    await args.CompleteMessageAsync(args.Message);
}


public Task ErrorHandler(ProcessErrorEventArgs args)
{
    return Task.CompletedTask;
}

public Task Stop()
{
    return Task.CompletedTask;
}

Window服务安装成功,状态显示为正在运行。但是,它不会自动使用来自服务总线的消息。如果我手动停止和启动服务,它将从队列中提取消息。我不确定我在这个实现中遗漏了什么。如有任何建议,欢迎光临。

EN

回答 1

Stack Overflow用户

发布于 2021-06-19 01:27:16

.NetCore 3.1引入了一个新的扩展,可以同时使用Microsoft.AspNetCore.Hosting添加NuGet包Microsoft.Extensions.Hosting.WindowsServices,您可以添加.UseWindowsService()。这将允许您将其作为windows服务或控制台应用程序运行。

代码语言:javascript
运行
复制
 public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .UseWindowsService()
                .ConfigureAppConfiguration((context, config) =>
                {

                    // configure the app here.
                })
                .ConfigureServices((hostContext, services) =>
                {                        
                    services.AddHostedService<QueueWorker>();
                }).UseSerilog();
    }

然后,您可以创建一个后台工作进程来启动和停止处理servicebus队列。下面是我的实现:

代码语言:javascript
运行
复制
public class QueueWorker : BackgroundService, IDisposable
    {
        protected ILogger<QueueWorker> _logger;
        protected IQueueMessageReceiver _queueProcessor;

        public QueueWorker()
        {

        }

        public QueueWorker(ILogger<QueueWorker> logger, IQueueMessageReceiver queueMessageReceiver)
        {
            _logger = logger;
            _queueProcessor = queueMessageReceiver;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            await Task.CompletedTask.ConfigureAwait(false);
        }

        public override Task StartAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Service Starting");
            var task = _queueProcessor.StartProcessor(cancellationToken);
            task.Wait();
            if (task.IsFaulted)
            {
                throw new Exception("Unable to start Processor");
            }
            return base.StartAsync(cancellationToken);
        }

        public override async Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Stopping Service");
            await _queueProcessor.StopProcessor().ConfigureAwait(false);
            await base.StopAsync(cancellationToken).ConfigureAwait(false);
        }

        public override void Dispose()
        {
            _logger.LogInformation("Disposing Service");
            var loopCount = 0;

            while (_queueProcessor != null && !_queueProcessor.IsClosedOrClosing() && loopCount < 5)
            {
                var task = Task.Delay(600);
                task.Wait();
                loopCount++;
            }
            base.Dispose();
            GC.SuppressFinalize(this);
        }

和实际的处理器:

代码语言:javascript
运行
复制
public class QueueMessageReceiver : IQueueMessageReceiver
    {
        private readonly ServiceBusClient _queueClient;
        private ServiceBusProcessor _processor;
        private readonly ReceiverConfiguration _configuration;
        private readonly ILogger _logger;
        private readonly ILoggerFactory _loggerFactory;
        private Dictionary<string, string> _executionMatrix;
        private readonly IServiceProvider _provider;
        private CancellationToken _cancellationToken;

        public QueueMessageReceiver(ReceiverConfiguration configuration, ILogger<QueueMessageReceiver> logger, IExecutionMatrix executionMatrix, ILoggerFactory loggerFactory, IServiceProvider serviceProvider)
        {
            if (configuration == null) throw new ArgumentException($"Configuration is missing from the expected ");
            _configuration = configuration;
            _logger = logger;
            _loggerFactory = loggerFactory;
            _executionMatrix = executionMatrix.GetExecutionMatrix();
            _provider = serviceProvider;
            _queueClient = new ServiceBusClient(_configuration.ConnectionString);
            if (string.IsNullOrWhiteSpace(configuration.ConnectionString)) throw new ArgumentException($"ServiceBusConnectionString Object missing from the expected configuration under ConnectionStrings ");

            if (configuration.QueueName == null) throw new ArgumentException($"Queue Name value missing from the expected configuration");
        }

        public async Task StartProcessor(CancellationToken cancellationToken)
        {
            if (!IsClosedOrClosing())
            {
                throw new FatalSystemException("ServiceBusProcessor is already running. ");
            }
            _cancellationToken = cancellationToken;
            var options = new ServiceBusProcessorOptions
            {
                AutoCompleteMessages = _configuration.AutoComplete,
                MaxConcurrentCalls = _configuration.MaxConcurrentCalls,
                MaxAutoLockRenewalDuration = _configuration.MaxAutoRenewDuration
            };
            _processor = _queueClient.CreateProcessor(_configuration.QueueName, options);
            _processor.ProcessMessageAsync += ProcessMessagesAsync;
            _processor.ProcessErrorAsync += ProcessErrorAsync;
            await _processor.StartProcessingAsync().ConfigureAwait(false);
        }
        public async Task StopProcessor()
        {
            await _processor.StopProcessingAsync();
            await _processor.CloseAsync();
        }
    
        private Task ProcessErrorAsync(ProcessErrorEventArgs args)
        {
            _logger.LogError(args.Exception, "Uncaught handled exception", args.ErrorSource, args.FullyQualifiedNamespace, args.EntityPath);
            return Task.CompletedTask;
        }
        private async Task ProcessMessagesAsync(ProcessMessageEventArgs args)
        {
            var message = args.Message;
            // Process the message.
            var sbMessage = $"Received message: SequenceNumber:{message.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}";
            _logger.LogInformation(sbMessage);

          //Handle your message
        }

        public bool IsClosedOrClosing()
        {
            return ((_processor == null) || _processor.IsClosed || !_processor.IsProcessing);
        }
    }
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65136500

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档