专栏首页Nodejs技术栈高并发场景下 RabbitMQ 消费端服务限流实践

高并发场景下 RabbitMQ 消费端服务限流实践

生活的乐趣取决于生活都本身,而不是取决于工作或地点。—— 爱默生

应用范围为服务访问量突然剧增,原因可能有多种外部的调用或内部的一些问题导致消息积压,对服务的访问超过服务所能处理的最大峰值,导致系统超时负载从而崩溃。

作者简介:五月君,Nodejs Developer,慕课网认证作者,热爱技术、喜欢分享的 90 后青年,欢迎关注 Nodejs技术栈 和 Github 开源项目 https://www.nodejs.red

业务场景

举一些我们平常生活中的消费场景,例如:火车票、机票等,通常来说这些服务在下单之后,后续的出票结果都是异步通知的,如果服务本身只支持每秒 1000 访问量,由于外部服务的原因突然访问量增加到每秒 2000 并发,这个时候服务接收者因为流量的剧增,超过了自己系统本身所能处理的最大峰值,如果没有对消息做限流措施,系统在这段时间内就会造成不可用,在生产环境这是一个很 严重 的问题,实际应用场景不止于这些,本文通过 RabbitMQ 来讲解如何对消费端做限流措施。

消费端限流机制

RabbitMQ 提供了服务质量保证 ( QOS) 功能,对 channel(通道)预先设置一定的消息数目,每次发送的消息条数都是基于预先设置的数目,如果消费端一旦有未确认的消息,这时服务端将不会再发送新的消费消息,直到消费端将消息进行完全确认,注意:此时消费端不能设置自动签收,否则会无效。

RabbitMQv3.3.0 之后,放宽了限制,除了对 channel 设置之外,还可以对每个消费者进行设置。

Node.js 版

以下为 Node.js 开发语言 amqplib 库对于限流实现提供的接口方法 prefetch

export interface Channel extends events.EventEmitter {
    prefetch(count: number, global?: boolean): Promise<Replies.Empty>;
    ...
}

prefetch 参数说明

  • count:每次推送给消费端 N 条消息数目,如果这 N 条消息没有被ack,生产端将不会再次推送直到这 N 条消息被消费。
  • global:在哪个级别上做限制,ture 为 channel 上做限制,false 为消费端上做限制,默认为 false。

Java 版

同上面讲解的 Node.js 版本都是一样,第一个参数 prefetchSize 是指预读取的消息内容大小上限,可以简单理解为消息有效载荷字节数组的最大长度限制,0 表示无上限

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);

代码实践 Node.js 版

建立生产端

const amqp = require('amqplib');

async function producer() {
    // 1. 创建链接对象
    const connection = await amqp.connect('amqp://localhost:5672');

    // 2. 获取通道
    const channel = await connection.createChannel();

    // 3. 声明参数
    const exchangeName = 'qosEx';
    const routingKey = 'qos.test001';
    const msg = 'Producer:';

    // 4. 声明交换机
    await channel.assertExchange(exchangeName, 'topic', { durable: true });

    for (let i=0; i<5; i++) {
        // 5. 发送消息
        await channel.publish(exchangeName, routingKey, Buffer.from(`${msg} 第${i}条消息`));
    }

    await channel.close();
}

producer();

建立消费端

const amqp = require('amqplib');

async function consumer() {
    // 1. 创建链接对象
    const connection = await amqp.connect('amqp://localhost:5672');

    // 2. 获取通道
    const channel = await connection.createChannel();

    // 3. 声明参数
    const exchangeName = 'qosEx';
    const queueName = 'qosQueue';
    const routingKey = 'qos.#';

    // 4. 声明交换机、对列进行绑定
    await channel.assertExchange(exchangeName, 'topic', { durable: true });
    await channel.assertQueue(queueName);
    await channel.bindQueue(queueName, exchangeName, routingKey);

    // 5. 限流参数设置
    await channel.prefetch(1, false);

    // 6. 限流,noAck参数必须设置为false
    await channel.consume(queueName, msg => {
        console.log('Consumer:', msg.content.toString());

        // channel.ack(msg);
    }, { noAck: false });
}

consumer();

未确认消息情况测试

在 consumer 中我们暂且将 channel.ack(msg) 注释掉,分别启动生产者和消费者,看看是什么情况?

如上图所示,总共5条消息按照预先设置的发送了一条消息,因为我将 channel.ack(msg) 注释掉了,服务端在未得到 ack 确认,将不会在发送剩下已 Ready 消息。

确认消息测试

修改 consumer 代码,打开确认消息注释,重新启动消费端进行测试

await channel.consume(queueName, msg => {
    console.log('Consumer:', msg.content.toString());

    channel.ack(msg); // 打开注释
}, { noAck: false });

如上图所示,Unacked 为0,消息已全部消费成功。

源码地址

https://github.com/Q-Angelo/project-training/tree/master/nodejs/rabbitmq-prefetch

代码实践 Java 版

重点是在消费端变化,第一步增加限流设置,第二步设置限流的 autoAck 为 false

构建消费者

// 设置限流 prefetchCount 表示每次处理多少条
// void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);
channel.basicQos(0, 1, false);

// 设置 channel autoAck 限流模式一定要设置为 false
channel.basicConsume(qosQueueName, false, myConsumer);

源码地址

https://github.com/Q-Angelo/SpringBoot-Course/tree/master/chapter8/chapter8-1/src/main/java/com/may/rabbitmq/qos/helloworld

RabbitMQ 限流使用总结

限流在我们的实际工作中还是很有意义的,在使用上生产端没有变化,重点在消费端,着重看以下两点:

  • 增加限流参数设置
  • 限流情况 ack 设置为手动签收

本文分享自微信公众号 - Nodejs技术栈(NodejsDeveloper)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 轻拢慢捻,微服务熔断大总管

    我这篇文章来的晚了些,因为hystrix已经进入维护模式。但已经有非常多的同学入坑了,那么本篇文章就是及时雨。本文将说明熔断使用的一些注意事项,可能会细的让你厌...

    xjjdog
  • 第27次文章:简单了解JDBC

    本周开始接触数据库了,第一次接触,倒腾了好久才把环境弄好,这周的学习内容有点少咯,下周补起来!嘿嘿,加油!

    鹏-程-万-里
  • 聊聊NacosDiscoveryHealthIndicatorAutoConfiguration

    本文主要研究一下NacosDiscoveryHealthIndicatorAutoConfiguration

    codecraft
  • Android SurfaceView onTouchEvent进阶操作OpenCV显示

    前一篇文章《Android SurfaceView onTouchEvent配合OpenCV显示》介绍了Android SurfaceView中通过onTouc...

    Vaccae
  • 第25次文章:行为型模式

    关注系统中对象之间的相互交互,研究系统在运行时对象之间的相互通信和协作,进一步明确对象的职责,共有11种模式。

    鹏-程-万-里
  • Spring MVC 运行流程

    Spring MVC 运行流程如图所示: Spring MVC 的入口函数,也就是前端控制器 DispatcherServlet 的作用是接收请求,响应结果 。

    happyJared
  • 一文彻底搞懂 Flink 网络流控与反压机制

    分析一个简单的 Flink 流任务,下图是一个简单的Flink流任务执行图:任务首先从 Kafka 中读取数据、 map 算子对数据进行转换、keyBy 按照指...

    zhisheng
  • 看完你就应该能明白的悲观锁和乐观锁

    Java 按照锁的实现分为乐观锁和悲观锁,乐观锁和悲观锁并不是一种真实存在的锁,而是一种设计思想,乐观锁和悲观锁对于理解 Java 多线程和数据库来说至关重要,...

    cxuan
  • 【转】一文看懂JVM内存布局及GC原理

    杨俊明,携程云客服平台研发部软件技术专家。从事IT行业10余年,腾讯云+社区、阿里云栖社区、华为云社区认证专家。近年来主要研究分布式架构、微服务、Java技术等...

    yiduwangkai
  • 第34次文章:SORM框架(四)

    本周我们在上周SORMv1.0框架的基础上对其进行升级,加入了一些设计模式,连接池等改造,大大的提高了整个框架运行的效率,得到现在的SORMv1.8版本。

    鹏-程-万-里

扫码关注云+社区

领取腾讯云代金券