首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >MassTransit报文速率上限为10

MassTransit报文速率上限为10
EN

Stack Overflow用户
提问于 2014-06-02 22:40:48
回答 2查看 3.9K关注 0票数 3

我设置了一个公共交通消费者服务来使用RabbitMQ,但我不知道如何提高消费者的速度-它似乎硬限制在每秒接收10条消息。

我已经尝试了这里列出的步骤:https://groups.google.com/forum/#!msg/masstransit-discuss/plP4n2sixrY/xfORgTPqcwsJ,没有成功-将预取和并发消费者设置为25只会增加确认的消息,但它不会增加消息的下载速率。

我的配置如下:

代码语言:javascript
运行
复制
ServiceBusFactory.ConfigureDefaultSettings(x =>
    {
        x.SetConcurrentReceiverLimit(25);
        x.SetConcurrentConsumerLimit(25);
    });

_bus = ServiceBusFactory.New(
    sbc =>
        {
            sbc.UseRabbitMq(x => 
                x.ConfigureHost(
                    "rabbitmq://localhost/Dev/consume?prefetch=25",
                    y =>
                        {
                            y.SetUsername(config.Username);
                            y.SetPassword(config.Password);
                        }));
            sbc.UseLog4Net();
            sbc.ReceiveFrom("rabbitmq://localhost/Dev/consume?prefetch=25");
            sbc.Subscribe(x => RegisterConsumers(x, container));
            sbc.UseJsonSerializer();
            sbc.SetConcurrentConsumerLimit(25);
        });

我在两个地方设置并发消费者限制,因为我不确定是否需要在默认配置或总线配置中设置它,而且消费者是通过unity注册的-我省略了消费者订阅,因为所有订阅者都在接收。

我有点困惑,不知道是否还有其他需要设置的内容,或者是否需要更改设置配置的顺序。

非常感谢您的帮助。

EN

回答 2

Stack Overflow用户

发布于 2015-06-30 19:33:37

在与这个问题度过了一个浪漫的夜晚,并尝试了Chris建议的不同东西后,我发现还有另一件的事情你必须做才能让它像它应该的那样工作。

具体来说,yes,您需要在消费者队列地址上设置预取:

代码语言:javascript
运行
复制
sbc.UseRabbitMq(
                f =>
                    f.ConfigureHost(
                        new Uri( "rabbitmq://guest:guest@localhost/masstransit_consumer" ),
                        c =>
                        {
                        } )
                );

int pf = 20; // prefetch

// set consumer prefetch (required!)
sbc.ReceiveFrom( string.Format( "rabbitmq://guest:guest@localhost/masstransit_consumer?prefetch={0}", pf ) );

但这仍然不够。

这个密钥可以在Chris在他的答案下面的评论中提到的mtstress工具的代码中找到。结果是工具调用:

代码语言:javascript
运行
复制
int _t, _ct;
ThreadPool.GetMinThreads( out _t, out _ct );
ThreadPool.SetMinThreads( pf, _ct );

将此代码添加到我的代码中可以解决此问题。我想知道为什么MSMQ传输不需要这样做……

更新#1

经过进一步调查,我发现了一个可能的罪魁祸首。它在ServiceBusBuilderImpl中。

有一种方法可以提高限制,即ConfigureThreadPool

这里的问题是它调用了CalculateRequiredThreads,它应该返回所需的线程数。不幸的是,后者在我的客户端Windows7和我的Windows Server上都返回了负的值。因此,当调用ThreadPool.SetMin/MaxThreads时,ConfigureThreadPool实际上什么也不做,因为负值会被忽略。

这个负值是怎么回事?CalculateRequiredThreads似乎调用了ThreadPool.GetMinThreadsThreadPool.GetAvailableThreads,并使用一个公式来计算出所需的线程数:

代码语言:javascript
运行
复制
var requiredThreads = consumerThreads + (workerThreads - availableWorkerThreads);

这里的问题是,在我的机器上,这实际上做到了:

代码语言:javascript
运行
复制
40 (my limit) + 8 (workerThreads) - 1023 (availableThreads) 

哪一个当然会返回

代码语言:javascript
运行
复制
-975

结论是:上述来自地铁内部的代码似乎是错误的。当我提前手动提高限制时,ConfigureMinThreads会遵守它(因为只有当它高于读取值时,它才会设置限制)。

如果不事先手动设置限制,限制将无法设置,因此代码会执行与默认线程池限制(在我的机器上似乎是8)一样多的线程。

显然,有人认为这个公式会产生

代码语言:javascript
运行
复制
40 + 8 - 8

在默认情况下。为什么GetMinThreadsGetAvailableThreads返回如此不相关的值还有待确定...

更新#2

改变

代码语言:javascript
运行
复制
    static int CalculateRequiredThreads( int consumerThreads )
    {
        int workerThreads;
        int completionPortThreads;
        ThreadPool.GetMinThreads( out workerThreads, out completionPortThreads );
        int availableWorkerThreads;
        int availableCompletionPortThreads;
        ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
        var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
        return requiredThreads;
    }

代码语言:javascript
运行
复制
    static int CalculateRequiredThreads( int consumerThreads )
    {
        int workerThreads;
        int completionPortThreads;
        ThreadPool.GetMaxThreads( out workerThreads, out completionPortThreads );
        int availableWorkerThreads;
        int availableCompletionPortThreads;
        ThreadPool.GetAvailableThreads( out availableWorkerThreads, out availableCompletionPortThreads );
        var requiredThreads = consumerThreads + ( workerThreads - availableWorkerThreads );
        return requiredThreads;
    }

解决了问题。两者在这里都返回1023,并且公式的输出是正确的预期线程数。

票数 5
EN

Stack Overflow用户

发布于 2015-02-05 04:11:12

您的客户正在执行的工作量是多少?如果运行足够快,.NET运行时很可能不需要创建额外的线程来处理入站消息率。

我们在生产中有许多使用指定计数的系统,其中我们将消费者限制与预取计数相匹配,并且在所有这些负载情况下,RabbitMQ显示的未确认消息计数都等于这些设置。我们通常会看到几乎相同数量的线程处理消息。最初,.NET运行时在所使用的已分配线程方面是保守的,但当使用者只是在等待远程操作时,它会迅速增加到全部线程数。

如果使用者中有一个区域是单线程的,那么它可能会限制基于该瓶颈的线程扩展,因此请验证您的线程模型是否配置正确。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/23997190

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档