我设置了一个公共交通消费者服务来使用RabbitMQ,但我不知道如何提高消费者的速度-它似乎硬限制在每秒接收10条消息。
我已经尝试了这里列出的步骤:https://groups.google.com/forum/#!msg/masstransit-discuss/plP4n2sixrY/xfORgTPqcwsJ,没有成功-将预取和并发消费者设置为25只会增加确认的消息,但它不会增加消息的下载速率。
我的配置如下:
ServiceBusFactory.ConfigureDefaultSettings(x =>
{
x.SetConcurrentReceiverLimit(25);
x.SetConcurrentConsumerLimit(25);
});
_bus = ServiceBusFactory.New(
sbc =>
{
sbc.UseRabbitMq(x =>
x.ConfigureHost(
"rabbitmq://localhost/Dev/consume?prefetch=25",
y =>
{
y.SetUsername(config.Username);
y.SetPassword(config.Password);
}));
sbc.UseLog4Net();
sbc.ReceiveFrom("rabbitmq://localhost/Dev/consume?prefetch=25");
sbc.Subscribe(x => RegisterConsumers(x, container));
sbc.UseJsonSerializer();
sbc.SetConcurrentConsumerLimit(25);
});
我在两个地方设置并发消费者限制,因为我不确定是否需要在默认配置或总线配置中设置它,而且消费者是通过unity注册的-我省略了消费者订阅,因为所有订阅者都在接收。
我有点困惑,不知道是否还有其他需要设置的内容,或者是否需要更改设置配置的顺序。
非常感谢您的帮助。
发布于 2015-06-30 19:33:37
在与这个问题度过了一个浪漫的夜晚,并尝试了Chris建议的不同东西后,我发现还有另一件的事情你必须做才能让它像它应该的那样工作。
具体来说,yes,您需要在消费者队列地址上设置预取:
sbc.UseRabbitMq(
f =>
f.ConfigureHost(
new Uri( "rabbitmq://guest:guest@localhost/masstransit_consumer" ),
c =>
{
} )
);
int pf = 20; // prefetch
// set consumer prefetch (required!)
sbc.ReceiveFrom( string.Format( "rabbitmq://guest:guest@localhost/masstransit_consumer?prefetch={0}", pf ) );
但这仍然不够。
这个密钥可以在Chris在他的答案下面的评论中提到的mtstress
工具的代码中找到。结果是工具调用:
int _t, _ct;
ThreadPool.GetMinThreads( out _t, out _ct );
ThreadPool.SetMinThreads( pf, _ct );
将此代码添加到我的代码中可以解决此问题。我想知道为什么MSMQ传输不需要这样做……
更新#1
经过进一步调查,我发现了一个可能的罪魁祸首。它在ServiceBusBuilderImpl
中。
有一种方法可以提高限制,即ConfigureThreadPool
。
这里的问题是它调用了CalculateRequiredThreads
,它应该返回所需的线程数。不幸的是,后者在我的客户端Windows7和我的Windows Server上都返回了负的值。因此,当调用ThreadPool.SetMin/MaxThreads
时,ConfigureThreadPool
实际上什么也不做,因为负值会被忽略。
这个负值是怎么回事?CalculateRequiredThreads
似乎调用了ThreadPool.GetMinThreads
和ThreadPool.GetAvailableThreads
,并使用一个公式来计算出所需的线程数:
var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);
这里的问题是,在我的机器上,这实际上做到了:
40 (my limit) + 8 (workerThreads) - 1023 (availableThreads)
哪一个当然会返回
-975
结论是:上述来自地铁内部的代码似乎是错误的。当我提前手动提高限制时,ConfigureMinThreads
会遵守它(因为只有当它高于读取值时,它才会设置限制)。
如果不事先手动设置限制,限制将无法设置,因此代码会执行与默认线程池限制(在我的机器上似乎是8)一样多的线程。
显然,有人认为这个公式会产生
40 + 8 - 8
在默认情况下。为什么GetMinThreads
和GetAvailableThreads
返回如此不相关的值还有待确定...
更新#2
改变
static int CalculateRequiredThreads( int consumerThreads )
{
int workerThreads;
int completionPortThreads;
ThreadPool.GetMinThreads( out workerThreads, out completionPortThreads );
int availableWorkerThreads;
int availableCompletionPortThreads;
ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
return requiredThreads;
}
至
static int CalculateRequiredThreads( int consumerThreads )
{
int workerThreads;
int completionPortThreads;
ThreadPool.GetMaxThreads( out workerThreads, out completionPortThreads );
int availableWorkerThreads;
int availableCompletionPortThreads;
ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
return requiredThreads;
}
解决了问题。两者在这里都返回1023,并且公式的输出是正确的预期线程数。
发布于 2015-02-05 04:11:12
您的客户正在执行的工作量是多少?如果运行足够快,.NET运行时很可能不需要创建额外的线程来处理入站消息率。
我们在生产中有许多使用指定计数的系统,其中我们将消费者限制与预取计数相匹配,并且在所有这些负载情况下,RabbitMQ显示的未确认消息计数都等于这些设置。我们通常会看到几乎相同数量的线程处理消息。最初,.NET运行时在所使用的已分配线程方面是保守的,但当使用者只是在等待远程操作时,它会迅速增加到全部线程数。
如果使用者中有一个区域是单线程的,那么它可能会限制基于该瓶颈的线程扩展,因此请验证您的线程模型是否配置正确。
https://stackoverflow.com/questions/23997190
复制相似问题