专栏首页Nodejs技术栈利用 RabbitMQ 死信队列和 TTL 实现定时任务

利用 RabbitMQ 死信队列和 TTL 实现定时任务

不要想象自己说的每句话,都是真理,但要保证自己说的每句话都是真话。—— 张杰

实际业务中对于定时任务的需求是不可避免的,例如,订单超时自动取消、每天定时拉取数据等,在 Node.js 中系统层面提供了 setTimeout、setInterval 两个 API 或通过 node-schedule 这种第三方库来实现。

作者简介:五月君,Nodejs Developer,慕课网认证作者,热爱技术、喜欢分享的 90 后青年,欢迎关注 Nodejs技术栈 和 Github 开源项目 https://www.nodejs.red

通过系统层面提供的 API 这种方式实现对于简单的定时任务是 ok 的,过于复杂的、可用性要求较高的系统就会存在以下缺点。

存在的一些问题

  • 消耗系统内存,如果定时任务很多,长时间得不到释放,将会一直占用系统进程耗费内存。
  • 单线程如何保障出现系统崩溃后之前的定时任务不受影响?多进程集群模式下一致性的保证?
  • setTimeout、setInterval 会存在时间误差,对于时间精度要求较高的是不行的。

RabbitMQ TTL+DLX 实现定时任务

RabbitMQ 本身是不支持的,可以通过它提供的两个特性 Time-To-Live and Expiration、Dead Letter Exchanges 来实现,通过以下泳道图可以看到一个消息从发布到消费的整个过程。

死信队列

死信队列全称 Dead-Letter-Exchange 简称 DLX 是 RabbitMQ 中交换器的一种类型,消息在一段时间之后没有被消费就会变成死信被重新 publish 到另一个 DLX 交换器队列中,因此称为死信队列。

死信队列产生几种情况

  • 消息被拒绝
  • 消息 TTL 过期
  • 队列达到最大长度

设置DLX的两个参数:

  • deadLetterExchange: 设置 DLX,当正常队列的消息成为死信后会被路由到 DLX 中
  • deadLetterRoutingKey: 设置 DLX 指定的路由键

注意:Dead-Letter-Exchange 也是一种普通的 Exchange

消息 TTL

消息的 TTL 指的是消息的存活时间,RabbitMQ 支持消息、队列两种方式设置 TTL,分别如下:

消息设置 TTL:对消息的设置是在发送时进行 TTL 设置,通过 x-message-ttlexpiration 字段设置,单位为毫秒,代表消息的过期时间,每条消息的 TTL 可不同。

队列设置 TTL:对队列的设置是在消息入队列时计算,通过 x-expires 设置,队列中的所有消息都有相同的过期时间,当超过了队列的超时设置,消息会自动的清除。

注意:如果以上两种方式都做了设置,消息的 TTL 则以两者之中最小的那个为准。

延迟队列实现 Node.js 版

推荐采用 amqplib 库,一个 Node.js 实现的 RabbitMQ 客户端。

初始化 RabbitMQ

// rabbitmq.js
// npm install amqplib
const amqp = require('amqplib');

let connection = null;

module.exports = {
    connection,

    init: () => amqp.connect('amqp://localhost:5672').then(conn => {
        connection = conn;

        console.log('rabbitmq connect success');

        return connection;
    })
}

生产者

/**
 * 路由一个死信队列
 * @param { Object } connnection
 */
async function producerDLX(connnection) {
    const testExchange = 'testEx';
    const testQueue = 'testQu';
    const testExchangeDLX = 'testExDLX';
    const testRoutingKeyDLX = 'testRoutingKeyDLX';

    const ch = await connnection.createChannel();
    await ch.assertExchange(testExchange, 'direct', { durable: true });
    const queueResult = await ch.assertQueue(testQueue, {
        exclusive: false,
        deadLetterExchange: testExchangeDLX,
        deadLetterRoutingKey: testRoutingKeyDLX,
    });
    await ch.bindQueue(queueResult.queue, testExchange);
    const msg = 'hello world!';
    console.log('producer msg:', msg);
    await ch.sendToQueue(queueResult.queue, new Buffer(msg), {
        expiration: '10000'
    });

    ch.close();
}

消费者

// consumer.js
const rabbitmq = require('./rabbitmq.js');

/**
 * 消费一个死信队列
 * @param { Object } connnection
 */
async function consumerDLX(connnection) {
    const testExchangeDLX = 'testExDLX';
    const testRoutingKeyDLX = 'testRoutingKeyDLX';
    const testQueueDLX = 'testQueueDLX';

    const ch = await connnection.createChannel();
    await ch.assertExchange(testExchangeDLX, 'direct', { durable: true });
    const queueResult = await ch.assertQueue(testQueueDLX, {
        exclusive: false,
    });
    await ch.bindQueue(queueResult.queue, testExchangeDLX, testRoutingKeyDLX);
    await ch.consume(queueResult.queue, msg => {
        console.log('consumer msg:', msg.content.toString());
    }, { noAck: true });
}

// 消费消息
rabbitmq.init().then(connection => consumerDLX(connection));

运行查看

分别执行消费者和生产者,可以看到 producer 在 44 秒发布了消息,consumer 是在 54 秒接收到的消息,实现了定时 10 秒种执行

$ node consumer # 执行消费者
[2019-05-07T08:45:23.099] [INFO] default - rabbitmq connect success
[2019-05-07T08:45:54.562] [INFO] default - consumer msg: hello world!
$ node producer # 执行生产者
[2019-05-07T08:45:43.973] [INFO] default - rabbitmq connect success
[2019-05-07T08:45:44.000] [INFO] default - producer msg: hello world!

管理控制台查看

testQu 队列为我们定义的正常队列消息过期,会变成死信,会被路由到 testQueueDLX 队列,形成一个死信队列。

注意问题

一个队列里的某个消息即使比同队列中的其它消息提前过期,也不会优先进入到死信队列,只有当过期的消息到了队列的顶端,才会被真正的丢弃或者进入死信队列。

源码地址

https://github.com/Q-Angelo/project-training/tree/master/nodejs/rabbitmq-timed-task

延迟队列实现 Java 版

构建生产者

生产者主要变动为设置消息过期时间

AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
        .deliveryMode(2)
        .contentEncoding("UTF-8")
        .expiration("5000") // 设置 5 秒中过期
        .build();

channel.basicPublish(dlxExchangeName, dlxRoutingKey, basicProperties, dlxMsg.getBytes());

构建消费者

核心代码变动如下,除了绑定一个正常的交换机、队列之外,还需在绑定一个死信的交换机、队列

public static void main(String[] args) throws Exception {
    ...

    // 定义正常交换机、队列等信息
    String dlxExchangeName = "dlx_exchange_name";
    String exchangeType = "direct";
    String dlxRoutingKey = "dlx_routingKey";
    String dlxQueueName = "dlx_queue";

    // 定义死信队列交换机、队列等信息
    String dlxTestExchangeName = "dlx_test_exchange_name";
    String dlxTestRoutingKey = "dlx_test_routingKey";
    String dlxTestQueueName = "dlx_test_queue";

    // 声明一个正常的交换机、队列和绑定关系
    channel.exchangeDeclare(dlxExchangeName, exchangeType, true, false, null);
    // 声明死信队列交换机
    Map<String, Object> arguments = new HashMap<String, Object>();
    arguments.put("x-dead-letter-exchange", dlxTestExchangeName);
    channel.queueDeclare(dlxQueueName, true, false, false, arguments);
    channel.queueBind(dlxQueueName, dlxExchangeName, dlxRoutingKey);


    // 死信队列的交换机、队列声明和绑定关系
    channel.exchangeDeclare(dlxTestExchangeName, "direct", true, false, null);
    channel.queueDeclare(dlxTestQueueName, true, false, false, null);
    channel.queueBind(dlxTestQueueName, dlxTestExchangeName, dlxRoutingKey);

    ...

    // 6. 设置 channel
    channel.basicConsume(dlxTestQueueName, true, consumer);

    System.out.println("消费端启动成功!");
}

源码地址

https://github.com/Q-Angelo/SpringBoot-Course/tree/master/chapter8/chapter8-1/src/main/java/com/may/rabbitmq/dlx/helloworld

总结

延迟队列在现实业务场景中,还是有很多实际用途的,订单的超时取消、重试等,都可以借助此来完成,通过本文希望你能掌握什么是延迟队列,在 RabbitMQ 中的实现主要是根据 TTL + 死信队列来完成的,本文最后采用了 Node.js 和 Java 分别进行了实践,希望能帮助到你。

本文分享自微信公众号 - Nodejs技术栈(NodejsDeveloper)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Android SurfaceView onTouchEvent进阶操作OpenCV显示

    前一篇文章《Android SurfaceView onTouchEvent配合OpenCV显示》介绍了Android SurfaceView中通过onTouc...

    Vaccae
  • 第27次文章:简单了解JDBC

    本周开始接触数据库了,第一次接触,倒腾了好久才把环境弄好,这周的学习内容有点少咯,下周补起来!嘿嘿,加油!

    鹏-程-万-里
  • 看完你就应该能明白的悲观锁和乐观锁

    Java 按照锁的实现分为乐观锁和悲观锁,乐观锁和悲观锁并不是一种真实存在的锁,而是一种设计思想,乐观锁和悲观锁对于理解 Java 多线程和数据库来说至关重要,...

    cxuan
  • 第25次文章:行为型模式

    关注系统中对象之间的相互交互,研究系统在运行时对象之间的相互通信和协作,进一步明确对象的职责,共有11种模式。

    鹏-程-万-里
  • 【转】一文看懂JVM内存布局及GC原理

    杨俊明,携程云客服平台研发部软件技术专家。从事IT行业10余年,腾讯云+社区、阿里云栖社区、华为云社区认证专家。近年来主要研究分布式架构、微服务、Java技术等...

    yiduwangkai
  • 聊聊NacosDiscoveryHealthIndicatorAutoConfiguration

    本文主要研究一下NacosDiscoveryHealthIndicatorAutoConfiguration

    codecraft
  • Spring MVC 运行流程

    Spring MVC 运行流程如图所示: Spring MVC 的入口函数,也就是前端控制器 DispatcherServlet 的作用是接收请求,响应结果 。

    happyJared
  • 一文彻底搞懂 Flink 网络流控与反压机制

    分析一个简单的 Flink 流任务,下图是一个简单的Flink流任务执行图:任务首先从 Kafka 中读取数据、 map 算子对数据进行转换、keyBy 按照指...

    zhisheng
  • 第34次文章:SORM框架(四)

    本周我们在上周SORMv1.0框架的基础上对其进行升级,加入了一些设计模式,连接池等改造,大大的提高了整个框架运行的效率,得到现在的SORMv1.8版本。

    鹏-程-万-里
  • 轻拢慢捻,微服务熔断大总管

    我这篇文章来的晚了些,因为hystrix已经进入维护模式。但已经有非常多的同学入坑了,那么本篇文章就是及时雨。本文将说明熔断使用的一些注意事项,可能会细的让你厌...

    xjjdog

扫码关注云+社区

领取腾讯云代金券