优化RabbitMQ消费者批量使用

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (1120)

我有一个应用程序,在每个消息消耗我需要查询MySQL数据库的一些信息,并根据该过程消耗消息。我想优化这一点,以防止数据库上的多个查询累加到负载。

我在考虑一种方法,我等待至少x消息y秒。这样我可以批量消费一些消息,即使在某些时候我收到的消息较少,它们也会消耗掉。

示例:假设x = 100y = 10秒

这意味着我等待至少100条消息或10秒,以先到者为准。这样我就可以在一次查询中立即查询数据库中的100个消息。此外,如果我收到少于100条消息,剩余的消息将在最多10秒的窗口中处理。

我正在使用NodeJS amqplib进行消费。我有以下基于RabbitMQ示例的代码:

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
    }, {noAck: true});
  });
});

我正在考虑拥有一个全局对象并在每个consume回调中添加它,并在它到达那些被处理的x消息时检查该对象的计数。仍然,不知道如何在此上添加y秒的上限时间,并确保如果我在时间窗口内得到少于x条消息,那么这些消息将被处理

提问于
用户回答回答于

以下代码将在每个收到的消息之后调用一个函数,该消息将收到的消息聚合在一个数组中。如果在没有消息(带参数null)的情况下调用它或者当它看到消息计数已到达时x,它会将聚合消息发送到数据库函数。否则,它只是将消息添加到数组(在if语句的第二部分)。

参数null通过计时器传递给聚合函数,计时器在y几秒钟后触发。刚刚初始化消息队列时首先设置此计时器,并在聚合器将消息发送到数据库时重置。

var messageStore = [];
var timer;

sendToDatabase = function(messages) {...}

aggregate = function(msg) {
    if (msg == null || messageStore.push(msg) == x) {
        clearTimeout(timer);
        timer = setTimeout(aggregate, 1000*y, null);
        sendToDatabase(messageStore);
        messageStore = [];
    }
}

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    timer = setTimeout(aggregate, 1000*y, null);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
      aggregate(msg);
    }, {noAck: true});
  });
});

注意:我无法测试这个,因为我手边没有消息传递系统。

扫码关注云+社区

领取腾讯云代金券