首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Windows服务通过MSMQ进行通信-我需要服务总线吗?

Windows服务通过MSMQ进行通信-我需要服务总线吗?
EN

Stack Overflow用户
提问于 2011-10-08 11:24:20
回答 2查看 225关注 0票数 2

我遇到了这样的问题:系统包含节点(windows服务),这些节点推送要处理的消息,而其他节点则会拉出消息并处理它们。

这是以一种方式设计的,即推送节点通过维护队列的循环列表和每次发送后的旋转队列来平衡队列之间的负载。因此,消息1将转到队列1,消息2将转到队列2等。到目前为止,这个部分工作得很好。

在消息拉动端,我们设计了一个类似的方法来检索消息-首先从队列1,然后从队列2等等。理论上,每个拉节点位于不同的机器上,在实践中,到目前为止,它只在一个队列上监听消息。但最近的一项要求使我们在一台机器上有一个拉节点,它侦听多个队列:一个通常非常繁忙且充满数百万条消息的队列,以及一个通常只包含少数消息的节点。

我们面临的问题是,我们最初设计的方式--拉节点--从一个队列到另一个队列,直到找到消息为止。如果超时(例如在一秒钟后),则转移到下一个队列。

这不再起作用了,因为Q1 (充满数百万条消息)每条消息将延迟大约1秒钟,因为每次从Q1中提取消息后,我们都会向Q2请求消息(如果它不包含任何消息,我们将等待一秒钟)。

它是这样的:

Q1包含10条消息,Q2不包含任何消息

immediately

  • Pull节点请求来自等待第二次消息的Q1

  • ------------的消息第二个问题

等。

所以这显然是错误的。

我想我是在这里寻找最好的建筑解决方案。消息处理不需要尽可能实时,但需要健壮,不应该丢失任何消息!

我想听听你对这个问题的看法。

提前感谢扬尼斯

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2011-10-26 07:55:34

最后我创建了一组线程--每个需要处理的msmq都有一个线程。在构造函数中,我初始化了这些线程:

代码语言:javascript
运行
复制
Storages.ForEach(queue =>
        {
            Task task = Task.Factory.StartNew(() =>
            {
                LoggingManager.LogInfo("Starting a local thread to read in mime messages from queue " + queue.Name, this.GetType());
                while (true)
                {
                    WorkItem mime = queue.WaitAndRetrieve();
                    if (mime != null)
                    {
                        _Semaphore.WaitOne();
                        _LocalStorage.Enqueue(mime);

                        lock (_locker) Monitor.Pulse(_locker);

                        LoggingManager.LogDebug("Adding no. " + _LocalStorage.Count + " item in queue", this.GetType());
                    }
                }
            });
        });

  • _LocalStorage是一个线程安全队列实现( .NET 4.0中引入的ConcurrentQueue)
  • ,信号量是控制_LocalStorage中插入的计数信号量。_LocalStorage基本上是接收到的消息的缓冲区,但是当处理节点忙于工作时,我们不希望它变得太大。结果可能是我们检索该_LocalStorage中的所有msmq消息,但只忙于处理其中大约5条消息。这对恢复能力(如果程序意外终止,我们会丢失所有这些消息)和性能都是不好的,因为在内存中保存所有这些项目的内存消耗将是巨大的。因此,我们需要控制在_LocalStorage缓冲区队列中持有多少项。
  • 我们脉冲等待工作的线程(参见下面),通过执行简单的Monitor.Pulse

将新项添加到队列

从队列中删除工作项的代码如下所示:

代码语言:javascript
运行
复制
lock (_locker)
            if (_LocalStorage.Count == 0) 
                Monitor.Wait(_locker);

        WorkItem result;
        if (_LocalStorage.TryDequeue(out result))
        {
            _Semaphore.Release();
            return result;
        }

        return null;

我希望这能帮助别人解决类似的问题。

票数 1
EN

Stack Overflow用户

发布于 2011-10-08 17:53:51

也许您可以在ReceiveCompleted类中使用MessageQueue事件?那就不用投票了。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/7696473

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档