经过一周的编码和搜索论坛,似乎是时候问.
我有一个C#应用程序,它使用EventingBasicConsumer处理RabbitMQ发送的消息。我想同时处理几条消息,所以我在同一个连接上实例化了几个通道(在本例中是8个),每个通道都有一个使用者。然后,我将一个事件处理程序附加到每个使用者收到的事件中。根据我到目前为止的所有读数,这个设置应该允许事件处理程序由使用者并发触发,每个使用者都在自己的线程中运行。但在我的例子中,消费者只有在先前的使用者确认其消息之后才会顺序地接收消息。
还有其他人经历过这种行为吗?我的理解是,在这种情况下,处理在技术上应该是并行的,这是正确的吗?
下面是更好地说明这一问题的基本代码:
Initialise() {
ConsumerChannels_ = new IModel[ConsumerCount_];
Consumers_ = new EventingBasicConsumer[ConsumerCount_];
for (int i = 0; i < ConsumerCount_; ++i)
{
ConsumerChannels_[i] = Connection_.CreateModel();
Consumers_[i] = new EventingBasicConsumer(ConsumerChannels_[i]);
Consumers_[i].Received += MessageReceived;
}
}
MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e)
{
int id = GetConsumerIndex(sender);
Log_.Debug("Consumer " + id + ": processing started...");
// do some time consuming processing here
sender.Model.BasicAck(e.DeliveryTag, false);
Log_.Debug("Consumer " + id + ": processing ended.");
}
我希望看到的是: //并发处理
消费者1:开始处理. 消费者2:开始处理. 消费者3:开始处理..。 ..。 消费者6:处理结束。 消费者7:处理结束。 消费者8:处理结束。
但我得到的却是: //顺序处理
消费者1:开始处理. 消费者1:处理结束。 消费者2:开始处理. 消费者2:处理结束。 ..。 消费者8:开始处理..。 消费者8:处理结束。
任何关于如何进行的想法都将不胜感激。
发布于 2016-10-31 09:49:16
你必须有办法做到这一点:
通过在以下内容中添加自己的线程池来增加并发性:
MessageReceived(IBasicConsumer sender, BasicDeliverEventArgs e) {
int id = GetConsumerIndex(sender);
Log_.Debug("Consumer " + id + ": processing started...");
// do some time consuming processing here
// PUT your thread-pool here and process the messages inside the thread
sender.Model.BasicAck(e.DeliveryTag, false);
Log_.Debug("Consumer " + id + ": processing ended."); }
}
注意:BasicAck
可以在不同的线程中调用。
或
您可以向队列中添加更多的使用者,通过使用QoS=1
,您可以在循环中使用消息。
发布于 2021-04-16 12:38:17
在创建ConnectionFactory
时,您实际上可以设置并行处理任务的数量!
ConnectionFactory factory = new ConnectionFactory
{
ConsumerDispatchConcurrency = 2,
};
默认值为1,这是串行/顺序处理。
我是通过解剖.NET客户端源代码发现的。下面是有趣的部分(concurrency
是从ConsumerDispatchConcurrency
设置的):
Func<Task> loopStart = ProcessChannelAsync;
if (concurrency == 1)
{
_worker = Task.Run(loopStart);
}
else
{
var tasks = new Task[concurrency];
for (int i = 0; i < concurrency; i++)
{
tasks[i] = Task.Run(loopStart);
}
_worker = Task.WhenAll(tasks);
}
但要小心,这可能会导致比赛条件!该财产有以下说明:
对于大于一个的并发性,这将消除使用者按照接收消息的顺序处理消息的保证。此外,使用者还需要线程/并发安全。
https://stackoverflow.com/questions/40338774
复制相似问题