首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >RabbitMQ中的并发性

RabbitMQ中的并发性
EN

Stack Overflow用户
提问于 2016-10-31 08:34:02
回答 2查看 6.7K关注 0票数 6

经过一周的编码和搜索论坛,似乎是时候问.

我有一个C#应用程序,它使用EventingBasicConsumer处理RabbitMQ发送的消息。我想同时处理几条消息,所以我在同一个连接上实例化了几个通道(在本例中是8个),每个通道都有一个使用者。然后,我将一个事件处理程序附加到每个使用者收到的事件中。根据我到目前为止的所有读数,这个设置应该允许事件处理程序由使用者并发触发,每个使用者都在自己的线程中运行。但在我的例子中,消费者只有在先前的使用者确认其消息之后才会顺序地接收消息。

还有其他人经历过这种行为吗?我的理解是,在这种情况下,处理在技术上应该是并行的,这是正确的吗?

下面是更好地说明这一问题的基本代码:

代码语言:javascript
运行
复制
Initialise() {
    ConsumerChannels_ = new IModel[ConsumerCount_];
    Consumers_ = new EventingBasicConsumer[ConsumerCount_];
    for (int i = 0; i < ConsumerCount_; ++i)
    {
         ConsumerChannels_[i] = Connection_.CreateModel();
         Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
         Consumers_[i].Received += MessageReceived;
    }
}

MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
    int id = GetConsumerIndex(sender);
    Log_.Debug("Consumer " + id + ": processing started...");         
    // do some time consuming processing here
    sender.Model.BasicAck(e.DeliveryTag, false);
    Log_.Debug("Consumer " + id + ": processing ended.");
}

我希望看到的是: //并发处理

消费者1:开始处理. 消费者2:开始处理. 消费者3:开始处理..。 ..。 消费者6:处理结束。 消费者7:处理结束。 消费者8:处理结束。

但我得到的却是: //顺序处理

消费者1:开始处理. 消费者1:处理结束。 消费者2:开始处理. 消费者2:处理结束。 ..。 消费者8:开始处理..。 消费者8:处理结束。

任何关于如何进行的想法都将不胜感激。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-10-31 09:49:16

你必须有办法做到这一点:

通过在以下内容中添加自己的线程池来增加并发性:

代码语言:javascript
运行
复制
MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e) {
    int id = GetConsumerIndex(sender);
    Log_.Debug("Consumer " + id + ": processing started...");         
    // do some time consuming processing here
    // PUT your thread-pool here and process the messages inside the thread

    sender.Model.BasicAck(e.DeliveryTag, false);
    Log_.Debug("Consumer " + id + ": processing ended."); }

}

注意:BasicAck可以在不同的线程中调用。

您可以向队列中添加更多的使用者,通过使用QoS=1,您可以在循环中使用消息。

票数 1
EN

Stack Overflow用户

发布于 2021-04-16 12:38:17

在创建ConnectionFactory时,您实际上可以设置并行处理任务的数量!

代码语言:javascript
运行
复制
ConnectionFactory factory = new ConnectionFactory
{
    ConsumerDispatchConcurrency = 2,
};

默认值为1,这是串行/顺序处理。

我是通过解剖.NET客户端源代码发现的。下面是有趣的部分(concurrency是从ConsumerDispatchConcurrency设置的):

代码语言:javascript
运行
复制
Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
    _worker = Task.Run(loopStart);
}
else
{
    var tasks = new Task[concurrency];
    for (int i = 0; i < concurrency; i++)
    {
        tasks[i] = Task.Run(loopStart);
    }
    _worker = Task.WhenAll(tasks);
}

但要小心,这可能会导致比赛条件!该财产有以下说明:

对于大于一个的并发性,这将消除使用者按照接收消息的顺序处理消息的保证。此外,使用者还需要线程/并发安全。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40338774

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档