首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >基于多线程的RabbitMQ使用者

基于多线程的RabbitMQ使用者
EN

Stack Overflow用户
提问于 2014-02-18 01:52:38
回答 2查看 24.4K关注 0票数 15

我们有一个windows服务,它侦听单个RabbitMQ队列并处理消息。

我们希望扩展相同的windows服务,以便它能够侦听RabbitMQ的多个队列并处理消息。

不确定是否可以使用多线程,因为每个线程都必须侦听(阻塞)队列。

由于我是非常新的多线程,需要高水平的指导方针,以下几点,这将帮助我开始建立原型。

  1. 是否可以使用线程侦听单个应用程序中的多个队列?
  2. 如何处理如果任何单个线程被关闭(由于异常等原因)的情况,如何在不重新启动整个windows服务的情况下返回。
  3. 任何可以帮助我处理这种情况的设计模式或开源实现。
EN

回答 2

Stack Overflow用户

发布于 2014-02-18 03:10:34

我喜欢你写问题的方式--它的开头非常宽泛,并且专注于细节。我已经成功地实现了一些非常类似的东西,目前正在致力于一个开源项目,以吸取我的经验教训,并将它们反馈给社区。不幸的是,虽然-我还没有把我的代码打包整齐,这对你没有多大帮助!无论如何,回答你的问题:

1. Is it possible to use threading for multiple queues.

是的,但它可能充满了陷阱。也就是说,RabbitMQ .NET库不是最好的代码,我发现它是AMQP协议的一个相对繁琐的实现。最有害的警告之一是它如何处理“接收”或“消费”行为,如果您不小心,这种行为很容易导致死锁。幸运的是,它在API文档中得到了很好的说明。-如果可以的话,使用单例连接对象。然后,在每个线程中,使用连接创建一个新的IModel和相应的使用者。

2. How to gracefully handle exceptions in threads -我相信这是另一个主题,我不会在这里讨论它,因为有几种方法可以使用。

3. Any open-source projects? --我喜欢EasyNetQ背后的想法,尽管我最终还是有了自己的想法。我希望,当我的开源项目完成后,我会记得回去,因为我相信这是一个比EasyNetQ更好的改进。

票数 12
EN

Stack Overflow用户

发布于 2014-02-18 07:46:47

您可能会发现这个答案非常有用。对于RabbitMQ的工作方式,我有一个非常基本的理解,但我可能会按照这里的建议,每个线程使用一个订阅者。

当然,组织线程模型的选项不止一种。实际的实现将取决于您需要如何处理来自多个队列的消息:并行地处理消息,或者通过聚合消息和序列化处理消息。下面的代码是一个控制台应用程序,它实现了对后一种情况的模拟。它使用任务并行库BlockingCollection类(这对于这类任务非常有用)。

代码语言:javascript
运行
复制
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21842880
{
    class Program
    {
        BlockingCollection<object> _commonQueue;

        // process an individual queue
        void ProcessQueue(int id, BlockingCollection<object> queue, CancellationToken token)
        {
            while (true)
            {
                // observe cancellation
                token.ThrowIfCancellationRequested();
                // get a message, this blocks and waits
                var message = queue.Take(token);

                // process this message
                // just place it to the common queue
                var wrapperMessage = "queue " + id + ", message: " + message;
                _commonQueue.Add(wrapperMessage);
            }
        }

        // process the common aggregated queue
        void ProcessCommonQeueue(CancellationToken token)
        {
            while (true)
            {
                // observe cancellation
                token.ThrowIfCancellationRequested();
                // this blocks and waits

                // get a message, this blocks and waits
                var message = _commonQueue.Take(token);

                // process this message
                Console.WriteLine(message.ToString());
            }
        }

        // run the whole process
        async Task RunAsync(CancellationToken token)
        {
            var queues = new List<BlockingCollection<object>>();
            _commonQueue = new BlockingCollection<object>();

            // start individual queue processors
            var tasks = Enumerable.Range(0, 4).Select((i) =>
            {
                var queue = new BlockingCollection<object>();
                queues.Add(queue);

                return Task.Factory.StartNew(
                    () => ProcessQeueue(i, queue, token), 
                    TaskCreationOptions.LongRunning);
            }).ToList();

            // start the common queue processor
            tasks.Add(Task.Factory.StartNew(
                () => ProcessCommonQeueue(token),
                TaskCreationOptions.LongRunning));

            // start the simulators
            tasks.AddRange(Enumerable.Range(0, 4).Select((i) => 
                SimulateMessagesAsync(queues, token)));

            // wait for all started tasks to complete
            await Task.WhenAll(tasks);
        }

        // simulate a message source
        async Task SimulateMessagesAsync(List<BlockingCollection<object>> queues, CancellationToken token)
        {
            var random = new Random(Environment.TickCount);
            while (true)
            {
                token.ThrowIfCancellationRequested();
                await Task.Delay(random.Next(100, 1000));
                var queue = queues[random.Next(0, queues.Count)];
                var message = Guid.NewGuid().ToString() + " " +  DateTime.Now.ToString();
                queue.Add(message);
            }
        }

        // entry point
        static void Main(string[] args)
        {
            Console.WriteLine("Ctrl+C to stop...");

            var cts = new CancellationTokenSource();
            Console.CancelKeyPress += (s, e) =>
            {
                // cancel upon Ctrl+C
                e.Cancel = true;
                cts.Cancel();
            };

            try
            {
                new Program().RunAsync(cts.Token).Wait();
            }
            catch (Exception ex)
            {
                if (ex is AggregateException)
                    ex = ex.InnerException;
                Console.WriteLine(ex.Message);
            }

            Console.WriteLine("Press Enter to exit");
            Console.ReadLine();
        }
    }
}

另一个想法可能是使用反应性扩展(Rx)。如果您可以将到达的消息视为事件,而Rx可以帮助将它们聚合到单个流中。

票数 10
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/21842880

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档