专栏首页Nodejs技术栈Node.js结合RabbitMQ延迟队列实现定时任务

Node.js结合RabbitMQ延迟队列实现定时任务

实际业务中对于定时任务的需求是不可避免的,例如,订单超时自动取消、每天定时拉取数据等,在Node.js中系统层面提供了setTimeout、setInterval两个API或通过node-schedule这种第三方库来实现。 通过这种方式实现对于简单的定时任务是ok的,过于复杂的、可用性要求较高的系统就会存在以下缺点。

存在的一些问题

  1. 消耗系统内存,如果定时任务很多,长时间得不到释放,将会一直占用系统进程耗费内存。
  2. 单线程如何保障出现系统崩溃后之前的定时任务不受影响?多进程集群模式下一致性的保证?
  3. setTimeout、setInterval会存在时间误差,对于时间精度要求较高的是不行的。

RabbitMQ TTL+DLX 实现定时任务

RabbitMQ本身是不支持的,可以通过它提供的两个特性Time-To-Live and Expiration、Dead Letter Exchanges来实现,通过以下泳道图可以看到一个消息从发布到消费的整个过程。

死信队列

死信队列全称 Dead-Letter-Exchange 简称 DLX 是 RabbitMQ 中交换器的一种类型,消息在一段时间之后没有被消费就会变成死信被重新 publish 到另一个 DLX 交换器队列中,因此称为死信队列。

死信队列产生几种情况

  1. 消息被拒绝
  2. 消息TTL过期
  3. 队列达到最大长度

设置DLX的两个参数:

  1. deadLetterExchange: 设置DLX,当正常队列的消息成为死信后会被路由到DLX中
  2. deadLetterRoutingKey: 设置DLX指定的路由键

注意:Dead-Letter-Exchange也是一种普通的Exchange

消息TTL

消息的TTL指的是消息的存活时间,RabbitMQ支持消息、队列两种方式设置TTL,分别如下:

消息设置TTL:对消息的设置是在发送时进行TTL设置,通过 x-message-ttlexpiration 字段设置,单位为毫秒,代表消息的过期时间,每条消息的TTL可不同。

队列设置TTL:对队列的设置是在消息入队列时计算,通过 x-expires 设置,队列中的所有消息都有相同的过期时间,当超过了队列的超时设置,消息会自动的清除。

注意:如果以上两种方式都做了设置,消息的TTL则以两者之中最小的那个为准。

Nodejs操作RabbitMQ实现延迟队列

推荐采用 amqplib库,一个Node.js实现的RabbitMQ客户端。

  • 初始化RabbitMQ

rabbitmq.js

// npm install amqplibconst amqp = require('amqplib');
let connection = null;
module.exports = {    connection,
    init: () => amqp.connect('amqp://localhost:5672').then(conn => {        connection = conn;
        console.log('rabbitmq connect success');
        return connection;    })}
  • 生产者
/** * 路由一个死信队列 * @param { Object } connnection  */async function producerDLX(connnection) {    const testExchange = 'testEx';    const testQueue = 'testQu';    const testExchangeDLX = 'testExDLX';    const testRoutingKeyDLX = 'testRoutingKeyDLX';
    const ch = await connnection.createChannel();    await ch.assertExchange(testExchange, 'direct', { durable: true });    const queueResult = await ch.assertQueue(testQueue, {        exclusive: false,        deadLetterExchange: testExchangeDLX,        deadLetterRoutingKey: testRoutingKeyDLX,    });    await ch.bindQueue(queueResult.queue, testExchange);    const msg = 'hello world!';    console.log('producer msg:', msg);    await ch.sendToQueue(queueResult.queue, new Buffer(msg), {        expiration: '10000'    });
    ch.close();}
  • 消费者

consumer.js

const rabbitmq = require('./rabbitmq.js');
/** * 消费一个死信队列 * @param { Object } connnection  */async function consumerDLX(connnection) {    const testExchangeDLX = 'testExDLX';    const testRoutingKeyDLX = 'testRoutingKeyDLX';    const testQueueDLX = 'testQueueDLX';
    const ch = await connnection.createChannel();    await ch.assertExchange(testExchangeDLX, 'direct', { durable: true });    const queueResult = await ch.assertQueue(testQueueDLX, {        exclusive: false,    });    await ch.bindQueue(queueResult.queue, testExchangeDLX, testRoutingKeyDLX);    await ch.consume(queueResult.queue, msg => {        console.log('consumer msg:', msg.content.toString());    }, { noAck: true });}
// 消费消息rabbitmq.init().then(connection => consumerDLX(connection));
  • 运行查看

分别执行消费者和生产者,可以看到 producer 在44秒发布了消息,consumer 是在54秒接收到的消息,实现了定时10秒种执行

$ node consumer # 执行消费者[2019-05-07T08:45:23.099] [INFO] default - rabbitmq connect success[2019-05-07T08:45:54.562] [INFO] default - consumer msg: hello world!
$ node producer # 执行生产者[2019-05-07T08:45:43.973] [INFO] default - rabbitmq connect success[2019-05-07T08:45:44.000] [INFO] default - producer msg: hello world!
  • 管理控制台查看

testQu 队列为我们定义的正常队列消息过期,会变成死信,会被路由到 testQueueDLX 队列,形成一个死信队列。

  • 源码地址:RabbitMQ延迟队列实现定时任务(Node.js客户端版Demo)

作者:五月君 链接:https://www.imooc.com/article/286402 来源:慕课网 Node.js技术栈: https://github.com/Q-Angelo/Nodejs-Roadmap

本文分享自微信公众号 - Nodejs技术栈(NodejsDeveloper)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-05-08

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • AST in TypeScript 实践

      最近参与了一个 Node 项目脚手架的开发工作,为了提高编码效率,导师提议写一个 VSCode 的插件,功能上大体有点像 snippets 代码段,但比 s...

    JoviCheng
  • Java开发者应该养成的良好习惯

    作为一个开发者,我们应该养成一些良好的开发习惯。以下是参考网络资源中的摘要Java编程尽可能养成的良好习惯。

    攻城狮的那点事
  • 大厂的面试题

    Qiang
  • 伴随 P5.js 入坑创意编程

    上一篇文章:填坑!完结娱乐圈明星关系图谱 发布后,古柳印象里过往留下的坑貌似只剩下 图像检索(一):因缘际会与前瞻 的后续实践代码(原文里给了参考代码链接)和在...

    古柳_DesertsX
  • springboot集成ueditor富文本编辑器【需要修改ueditor源码】-和上一篇不一样

    最近工作需要重新搭建公司网站,其中需要使用富文本编辑器,货比三家,最后选择了百度团队的UEditor。项目框架为springboot,所以涉及到springbo...

    凯哥Java
  • react实战开发|react+web版聊天室

    基于react+react-dom+react-router-dom+redux+react-redux+webpack2.0+react-photoswipe...

    andy2018
  • JavaScript 一元正号运算符

    一元正号运算符(+)位于其操作数前面,计算其操作数的数值,如果操作数不是一个数值,会尝试将其转换成一个数值。 尽管一元负号也能转换非数值类型,但是一元正号是转换...

    用户3158888
  • 实战之java中线程的虚假唤醒

    如果wait()方法被虚假唤醒,然后doStuffAssumingConditionIsTrue()会被执行,尽管此时condition的值是false。如果用...

    开发架构二三事
  • springboot集成ueditor富文本编辑器(不需修改ueditor源码)

    最近工作需要重新搭建公司网站,其中需要使用富文本编辑器,货比三家,最后选择了百度团队的UEditor。项目框架为springboot,所以涉及到springbo...

    凯哥Java
  • React Native 小记 - TouchableOpacity 单次点击无效

    版权声明:本文为[他叫自己Mr.张]的原创文章,转载请注明出...

    他叫自己MR.张

扫码关注云+社区

领取腾讯云代金券