优化RabbitMQ消费者批量使用

内容来源于 Stack Overflow,并遵循CC BY-SA 3.0许可协议进行翻译与使用

  • 回答 (1)
  • 关注 (0)
  • 查看 (1186)

我有一个应用程序,在每个消息消耗我需要查询MySQL数据库的一些信息,并根据该过程消耗消息。我想优化这一点,以防止数据库上的多个查询累加到负载。

我在考虑一种方法,我等待至少x消息y秒。这样我可以批量消费一些消息,即使在某些时候我收到的消息较少,它们也会消耗掉。

示例:假设x = 100y = 10秒

这意味着我等待至少100条消息或10秒,以先到者为准。这样我就可以在一次查询中立即查询数据库中的100个消息。此外,如果我收到少于100条消息,剩余的消息将在最多10秒的窗口中处理。

我正在使用NodeJS amqplib进行消费。我有以下基于RabbitMQ示例的代码:

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
    }, {noAck: true});
  });
});

我正在考虑拥有一个全局对象并在每个consume回调中添加它,并在它到达那些被处理的x消息时检查该对象的计数。仍然,不知道如何在此上添加y秒的上限时间,并确保如果我在时间窗口内得到少于x条消息,那么这些消息将被处理

提问于
用户回答回答于

以下代码将在每个收到的消息之后调用一个函数,该消息将收到的消息聚合在一个数组中。如果在没有消息(带参数null)的情况下调用它或者当它看到消息计数已到达时x,它会将聚合消息发送到数据库函数。否则,它只是将消息添加到数组(在if语句的第二部分)。

参数null通过计时器传递给聚合函数,计时器在y几秒钟后触发。刚刚初始化消息队列时首先设置此计时器,并在聚合器将消息发送到数据库时重置。

var messageStore = [];
var timer;

sendToDatabase = function(messages) {...}

aggregate = function(msg) {
    if (msg == null || messageStore.push(msg) == x) {
        clearTimeout(timer);
        timer = setTimeout(aggregate, 1000*y, null);
        sendToDatabase(messageStore);
        messageStore = [];
    }
}

amqp.connect('amqp://localhost', function(err, conn) {
  conn.createChannel(function(err, ch) {
    var q = 'hello';

    ch.assertQueue(q, {durable: false});
    console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q);
    timer = setTimeout(aggregate, 1000*y, null);
    ch.consume(q, function(msg) {
      console.log(" [x] Received %s", msg.content.toString());
      aggregate(msg);
    }, {noAck: true});
  });
});

注意:我无法测试这个,因为我手边没有消息传递系统。

热门问答

求云函数可用的Pandas压缩包,一直失败,出现多个C extension问题?

配置 https 配置完后报错?

女淘日记

杭州吱吱吱科技 · 站长 (已认证)

推荐已采纳

请检查服务器是否安装了代理?如有请尝试移除代理再尝试

另外,curl访问https时候需要确保服务器已经保存了证书,可以通过 curl -k --tlsv1 过滤

即时通信是否可以给小程序发送小卡片消息?

都快要考试了,从业者认证考试的准考证怎么还没发放?

您好,本月正式考试通知已经全部发送完毕。已安排考试中心重新为您发送,请注意查收站内信、邮件、短信及腾讯云助手公众号消息 如有疑问,可联系考试中心; 电话:400-8006213/13810321135 邮箱:qcloud@ats.org.cn... 展开详请

为什么绑定了域名之后的企业邮箱发送到Gmail被事儿别为垃圾邮件?

女淘日记

杭州吱吱吱科技 · 站长 (已认证)

推荐
可以通过设置DMARC来解决邮件被其他邮局识别为垃圾邮件的问题 TXT记录值为:v=spf1 include:spf.mail.qq.com ~all 详细参考记录: https://work.weixin.qq.com/help?person_id=1&doc_id=524&h...... 展开详请

腾讯云cos如何通过api获取文件的永久url?

galenye

腾讯 · 工程师 (已认证)

对象存储专业搬砖工
推荐
如果文件是公有读的,那直接拼路径即访问URL,格式如 https://<Bucket>.cos.<Region>.myqcloud.com/<Key> ,不需要接口。 如果使用的sdk,比如js sdk,则可以使用 getObjectUrl 方法 var url = cos.ge...... 展开详请

扫码关注云+社区

领取腾讯云代金券