我遇到了这样的问题:系统包含节点(windows服务),这些节点推送要处理的消息,而其他节点则会拉出消息并处理它们。
这是以一种方式设计的,即推送节点通过维护队列的循环列表和每次发送后的旋转队列来平衡队列之间的负载。因此,消息1将转到队列1,消息2将转到队列2等。到目前为止,这个部分工作得很好。
在消息拉动端,我们设计了一个类似的方法来检索消息-首先从队列1,然后从队列2等等。理论上,每个拉节点位于不同的机器上,在实践中,到目前为止,它只在一个队列上监听消息。但最近的一项要求使我们在一台机器上有一个拉节点,它侦听多个队列:一个通常非常繁忙且充满数百万条消息的队列,以及一个通常只包含少数消息的节点。
我们面临的问题是,我们最初设计的方式--拉节点--从一个队列到另一个队列,直到找到消息为止。如果超时(例如在一秒钟后),则转移到下一个队列。
这不再起作用了,因为Q1 (充满数百万条消息)每条消息将延迟大约1秒钟,因为每次从Q1中提取消息后,我们都会向Q2请求消息(如果它不包含任何消息,我们将等待一秒钟)。
它是这样的:
Q1包含10条消息,Q2不包含任何消息
immediately
等。
所以这显然是错误的。
我想我是在这里寻找最好的建筑解决方案。消息处理不需要尽可能实时,但需要健壮,不应该丢失任何消息!
我想听听你对这个问题的看法。
提前感谢扬尼斯
发布于 2011-10-26 07:55:34
最后我创建了一组线程--每个需要处理的msmq都有一个线程。在构造函数中,我初始化了这些线程:
Storages.ForEach(queue =>
{
Task task = Task.Factory.StartNew(() =>
{
LoggingManager.LogInfo("Starting a local thread to read in mime messages from queue " + queue.Name, this.GetType());
while (true)
{
WorkItem mime = queue.WaitAndRetrieve();
if (mime != null)
{
_Semaphore.WaitOne();
_LocalStorage.Enqueue(mime);
lock (_locker) Monitor.Pulse(_locker);
LoggingManager.LogDebug("Adding no. " + _LocalStorage.Count + " item in queue", this.GetType());
}
}
});
});
将新项添加到队列
从队列中删除工作项的代码如下所示:
lock (_locker)
if (_LocalStorage.Count == 0)
Monitor.Wait(_locker);
WorkItem result;
if (_LocalStorage.TryDequeue(out result))
{
_Semaphore.Release();
return result;
}
return null;
我希望这能帮助别人解决类似的问题。
发布于 2011-10-08 17:53:51
也许您可以在ReceiveCompleted类中使用MessageQueue事件?那就不用投票了。
https://stackoverflow.com/questions/7696473
复制相似问题