首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >优化RabbitMQ消费者批量消费

优化RabbitMQ消费者批量消费
EN

Stack Overflow用户
提问于 2018-12-16 16:55:47
回答 1查看 1.7K关注 0票数 1

我有一个应用程序,在该应用程序中,每次消费消息时,我都需要查询MySQL数据库以获取一些信息,并基于此来处理消费的消息。我希望对此进行优化,以防止对数据库的多个查询增加负载。

我正在考虑一种方法,即我至少等待x消息y 。通过这种方式,我可以批量消费一些消息,即使在某个时候我收到的消息更少,它们也会被消费。

示例:假设x = 100y = 10秒

这意味着我至少要等待100条消息或10秒,以最先收到的为准。这样,我可以在一次查询中一次查询数据库中的100条消息。此外,如果我收到的消息少于100条,则剩余的消息将在最多10秒的窗口中处理。

我正在使用带有amqplib的NodeJS进行消费。我有以下基于RabbitMQ示例的代码:

代码语言:javascript
复制
amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
    }, {noAck: true});
  });
});

我在考虑在每个consume回调中添加一个全局对象,并在该对象到达x messages时检查该对象的计数。尽管如此,我仍然不确定如何添加y seconds的时间上限,并确保如果我在时间窗口内收到的消息少于x ,则会处理这些消息

EN

回答 1

Stack Overflow用户

发布于 2018-12-17 05:36:15

以下代码将在每个接收到的消息之后调用一个函数,该函数将接收到的消息聚合到一个数组中。当在不使用消息(带有参数null)的情况下调用它时,或者当它看到消息计数已经达到x时,它会将聚合的消息发送到数据库函数。否则,它只是将消息添加到数组中(在if语句的第二部分中)。

y秒之后触发的计时器将参数null传递给聚合函数。此计时器在消息队列刚初始化时首次设置,每当聚合器将消息发送到数据库时,此计时器将被重置。

代码语言:javascript
复制
var messageStore = [];
var timer;

sendToDatabase = function(messages) {...}

aggregate = function(msg) {
    if (msg == null || messageStore.push(msg) == x) {
        clearTimeout(timer);
        timer = setTimeout(aggregate, 1000*y, null);
        sendToDatabase(messageStore);
        messageStore = [];
    }
}

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    timer = setTimeout(aggregate, 1000*y, null);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
      aggregate(msg);
    }, {noAck: true});
  });
});

注意:我无法测试这一点,因为我手头没有消息传递系统。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53800747

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档