我们有一个windows服务,它侦听单个RabbitMQ队列并处理消息。
我们希望扩展相同的windows服务,以便它能够侦听RabbitMQ的多个队列并处理消息。
不确定是否可以使用多线程,因为每个线程都必须侦听(阻塞)队列。
由于我是非常新的多线程,需要高水平的指导方针,以下几点,这将帮助我开始建立原型。
发布于 2014-02-18 03:10:34
我喜欢你写问题的方式--它的开头非常宽泛,并且专注于细节。我已经成功地实现了一些非常类似的东西,目前正在致力于一个开源项目,以吸取我的经验教训,并将它们反馈给社区。不幸的是,虽然-我还没有把我的代码打包整齐,这对你没有多大帮助!无论如何,回答你的问题:
1. Is it possible to use threading for multiple queues.
是的,但它可能充满了陷阱。也就是说,RabbitMQ .NET库不是最好的代码,我发现它是AMQP协议的一个相对繁琐的实现。最有害的警告之一是它如何处理“接收”或“消费”行为,如果您不小心,这种行为很容易导致死锁。幸运的是,它在API文档中得到了很好的说明。-如果可以的话,使用单例连接对象。然后,在每个线程中,使用连接创建一个新的IModel和相应的使用者。
2. How to gracefully handle exceptions in threads -我相信这是另一个主题,我不会在这里讨论它,因为有几种方法可以使用。
3. Any open-source projects? --我喜欢EasyNetQ背后的想法,尽管我最终还是有了自己的想法。我希望,当我的开源项目完成后,我会记得回去,因为我相信这是一个比EasyNetQ更好的改进。
发布于 2014-02-18 07:46:47
您可能会发现这个答案非常有用。对于RabbitMQ的工作方式,我有一个非常基本的理解,但我可能会按照这里的建议,每个线程使用一个订阅者。
当然,组织线程模型的选项不止一种。实际的实现将取决于您需要如何处理来自多个队列的消息:并行地处理消息,或者通过聚合消息和序列化处理消息。下面的代码是一个控制台应用程序,它实现了对后一种情况的模拟。它使用任务并行库和BlockingCollection类(这对于这类任务非常有用)。
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Console_21842880
{
class Program
{
BlockingCollection<object> _commonQueue;
// process an individual queue
void ProcessQueue(int id, BlockingCollection<object> queue, CancellationToken token)
{
while (true)
{
// observe cancellation
token.ThrowIfCancellationRequested();
// get a message, this blocks and waits
var message = queue.Take(token);
// process this message
// just place it to the common queue
var wrapperMessage = "queue " + id + ", message: " + message;
_commonQueue.Add(wrapperMessage);
}
}
// process the common aggregated queue
void ProcessCommonQeueue(CancellationToken token)
{
while (true)
{
// observe cancellation
token.ThrowIfCancellationRequested();
// this blocks and waits
// get a message, this blocks and waits
var message = _commonQueue.Take(token);
// process this message
Console.WriteLine(message.ToString());
}
}
// run the whole process
async Task RunAsync(CancellationToken token)
{
var queues = new List<BlockingCollection<object>>();
_commonQueue = new BlockingCollection<object>();
// start individual queue processors
var tasks = Enumerable.Range(0, 4).Select((i) =>
{
var queue = new BlockingCollection<object>();
queues.Add(queue);
return Task.Factory.StartNew(
() => ProcessQeueue(i, queue, token),
TaskCreationOptions.LongRunning);
}).ToList();
// start the common queue processor
tasks.Add(Task.Factory.StartNew(
() => ProcessCommonQeueue(token),
TaskCreationOptions.LongRunning));
// start the simulators
tasks.AddRange(Enumerable.Range(0, 4).Select((i) =>
SimulateMessagesAsync(queues, token)));
// wait for all started tasks to complete
await Task.WhenAll(tasks);
}
// simulate a message source
async Task SimulateMessagesAsync(List<BlockingCollection<object>> queues, CancellationToken token)
{
var random = new Random(Environment.TickCount);
while (true)
{
token.ThrowIfCancellationRequested();
await Task.Delay(random.Next(100, 1000));
var queue = queues[random.Next(0, queues.Count)];
var message = Guid.NewGuid().ToString() + " " + DateTime.Now.ToString();
queue.Add(message);
}
}
// entry point
static void Main(string[] args)
{
Console.WriteLine("Ctrl+C to stop...");
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (s, e) =>
{
// cancel upon Ctrl+C
e.Cancel = true;
cts.Cancel();
};
try
{
new Program().RunAsync(cts.Token).Wait();
}
catch (Exception ex)
{
if (ex is AggregateException)
ex = ex.InnerException;
Console.WriteLine(ex.Message);
}
Console.WriteLine("Press Enter to exit");
Console.ReadLine();
}
}
}另一个想法可能是使用反应性扩展(Rx)。如果您可以将到达的消息视为事件,而Rx可以帮助将它们聚合到单个流中。
https://stackoverflow.com/questions/21842880
复制相似问题