前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ延迟消息发送

RabbitMQ延迟消息发送

作者头像
兜兜毛毛
发布2019-10-23 15:57:03
2.6K0
发布2019-10-23 15:57:03
举报
文章被收录于专栏:兜兜毛毛兜兜毛毛

为什么使用延迟消息?

不同于同步消息,有些业务场景下希望可以实现延迟一定时间再消费消息。

典型的场景有微信、支付宝等第三方支付回调接口,会在用户支付后3秒、5秒、30秒等等时间后向应用服务器发送回调请求,确保应用服务器可以正确收到消息。

那有些朋友就会说了,把需要定时处理的数据存到数据库中用定时任务就可以实现,为什么还弄个异步消息。增加后台维护成本。

使用定时任务当然没有问题可以实现该问题。在小数据量情况下没有问题。但当数据量交大的时候怎么办?如果每个任务的延迟时间不同怎么办?

其他方式实现消息队列

名称

实现方式

详细说明

Redis

使用zset数据结构

使用zset的score属性存放执行时间戳,起一个死循环的线程不断的取第一个Key值,如果当前时间戳大于该Key的socre 值时将它取出来消费,注意不需要遍历整个Zset集合,以免造成性能浪费

定时任务

给定周期扫描待处理消息

使用该方式间隔时间不好控制,给短会造成无意义的扫描,增加数据库压力,给长了误差较大

定时任务

动态创建唯一性定时任务

一次性的任务会增加数据库存储,需要定时清理,如相差时间较近的任务较多,也会造成性能较差

时间轮

自定义

自定义一个时间轮的数据结构,启动一个后台线程,延迟一秒,获取时间轮中的任务启动子线程独立执行时间轮的任务

如何选择消息中间件?

中间件

是否原生支持

说明

RocketMQ

支持

不支持任意时间的延迟消息的设置,仅支持内置预设值的延迟时间间隔的延迟消息。预设值的延迟时间间隔为:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h

RabbitMQ

不支持

可使用消息的TTL和死信Exchange实现

Kafka

不支持

可使用TimingWheel 实现

AcitveMQ

支持

因自己在使用RabbitMQ做为消息中间件,所以直接选用了RabbitMQ来实现。

实现之前

在实现之前我们先需要知道RabbitMQ以下两个概念。

  • TTL(Time To Live)消息过期时间。

消息如果在队列中一直没有被消费并且存在时间超过了TTL,消息就会变成了"死信" (Dead Message),后续无法再被消费了。

  • DLX(Dead-Letter-Exchange)死信交换器。

它的作用其实是用来接收死信消息(dead message)的。

  1. 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
  2. 消息过期
  3. 队列达到最大长度

因为消息如果未被正常消费并设置了requeue为false时会进入死信队列,我们可以监控消费死信队列中消息,来观察和分析系统的问题。

RabbitMQ可以从两种维度设置消息过期时间,分别是队列和消息本身。两种方式哪个时间小先执行哪个。

实现思路

想到有两种实现方式和效果。甚至可以结合使用。

第一种:设定固定几个延迟时间(像RocketMQ中间件)

固定延迟消息队列
固定延迟消息队列

第二种:实现自定义任意时间延迟

自定义任意时间延迟
自定义任意时间延迟

以上两种方式各有优缺点,我自己实现的是第二种,下面详细说明

图中后半段死信路由与应用消费基本相同,只要在消费端绑将一个正常队列与死信路由绑定就行。

代码语言:javascript
复制
/**
 * @Author: maomao
 * @Date: 2019-09-04 18:34
 */
@Slf4j
@Component
public class FreeCloudMQConsume {
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "free.cloud.out.mq",durable = "true"),
                                 exchange = @Exchange(value = "free.cloud.die.exchange",type = ExchangeTypes.TOPIC),
                                 key = "free.cloud.out.mq.dead.message.#")})
    public void print(String message){
        log.info("print 5 ---- > {}",message);
    }
}

调用方发送消息

代码语言:javascript
复制
/**
     * 创建延迟队列,会随指定延迟时间+5秒后删除队列
     * @param queueName
     * @param delayMillis
     * @return
     */
    private static Queue createDelayQueue(String queueName, Integer delayMillis) {
        /**
         * 队列名称  //死信时间 ,死信重新投递的交换机 ,路由到队列的routingKey
         */
        String time = String.valueOf(System.currentTimeMillis());
        String delayQueueName = queueName + ".delay_" + delayMillis + "_" + time;
        return QueueBuilder.durable(delayQueueName)
                //设置消息失效时间
                .withArgument("x-message-ttl",delayMillis * 1000)
                //设置队列自动删除时间 ,比消息延迟时间多5秒
                .withArgument("x-expires", (delayMillis + 5) * 1000)
                //设置死信路由
                .withArgument("x-dead-letter-exchange", "free.cloud.die.exchange")
                //设置死信路由routingKey
                .withArgument("x-dead-letter-routing-key", queueName + ".dead.message." + time)
                .build();
    }

    /**
     * 发送延迟消息
     * @param queueName
     * @param message
     * @param delayMillis
     */
    public static void sendDelayMessage(String queueName,Object message,Integer delayMillis){
        //死信消息队列(动态创建,会销毁)
        Queue delayQueue = createDelayQueue(queueName, delayMillis);
        //创建队列
        addQueue(delayQueue);
        //延迟消息路由Key
        StringBuilder delayRoutingKey = new StringBuilder(queueName + ".delay");
        delayRoutingKey.append(".").append(message.hashCode() + "_" + RandomUtil.randomString(5));
        //绑定延迟路由
        RabbitMqUtil.addBinding(delayQueue,delayExchange,delayRoutingKey.toString());
        getRabbitTemplate().convertAndSend("free.cloud.delay.exchange",delayRoutingKey.toString(),message);
    }

以上是自定义延迟消息的关键实现代码,完整代码可以 点击这里 获取

效果

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档