前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于RabbitMq的实现消息延时发送的优点以及其局限性;

基于RabbitMq的实现消息延时发送的优点以及其局限性;

作者头像
名字是乱打的
发布2021-12-24 08:55:09
1.7K2
发布2021-12-24 08:55:09
举报
文章被收录于专栏:软件工程

我们消息中心是负责承载各个业务(比如电商,物流,营销中心,券中心,会员中心,积分中心,停车场等等)的消息发送需求,那么消息呢就可能有延迟需求,比如物流到货后十五分钟进行一次邀请评价的需求。而且做个消息延迟发送,我们不可能让业务自己去写消息啥时候发送,到发送时间了再调我们接口这种逻辑,这样不合理,我们需要做比较强大的消息中心功能。

那么延迟消息的实现有多种多样的实现,我们前阵子想实现延迟消息,对此做了一定的讨论,最后发现许多方案要么不支持分布式项目,要么平白的对机器性能损耗比较大,要么可能存在系统崩溃数据丢失的风险,最后我们采用了RabbitMQ的方式实现延迟队列。

一. rabbitmq的延迟消息实现方式

1.死信队列方式
1.1我先大白话解释一下啥叫死信队列:
  • 首先死信队列是普通队列
  • 死信队列是在其他队列里的消息死亡后进入的队列
  • 死信队列本身不具有死信功能,需要绑定
  • 比如A绑定了死信队列是B,那么A中死亡的消息就会进入B内,B就被称之为死信队列

上面提到的消息死亡有几种类型 消息被拒绝(basic.reject / basic.nack),并且requeue = false 消息TTL过期 队列达到最大长度

1.2延迟队列+死信队列实现延迟消息发送

RabbitMQ支持给队列内的消息设置过期时间和给消息单独过期时间,那么结合死信队列我们就可以做到消息的延迟发送了; 大概是以下步骤 1.创建延迟队列并设置消息的过期时间,绑定一个死信队列 2.不创建该队列的消费者,让其内部消息根据过期时间自动过期 3.创建死信队列的消费者,使其每次消费死亡的消息;

死信队列结构图

看到之前有的人写的博文写的比较复杂,还把交换机写进来了,其实完全没必要,死信队列根本上只是队列之间的绑定以及数据交换,具体代码就不说了,因为重点不在这里;

死信这种方式有个致命的缺点,导致我们这边无法使用:
  • 1.时序问题:如果我们消息使用的是同一个队列,然后我们给消息本身设置过期时间,那么同一个队列中消息消费是按顺序来的,而不是过期时间,也就说说如果我们正常队列有两个数据A ttl15秒 B ttl 3秒,A在队列前面B在后面;那么我们消费的时候及时B过期时间更短,我们也不会先消费B而是会先消费A,因为同一队列有顺序问题。
  • 2.过多队列问题: 前面说了如果单个队列那么消费就有顺序问题,那我们可以按过期时间分别绑定多个队列啊。 但是如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有6s和60s两个时间选项,如果需要一个小时后处理,那么就需要增加TTL为一个小时的队列,我们业务方指定的时间是无法确定的,不可能去限制业务方让他们只能某个时间发,那么我们要创建无数个队列??显然这是不可能的;

因此这种方式直接pass了;

2.借助rabbitmq的延迟插件
2.1 延迟队列插件rabbitmq_delayed_message_exchange的安装
  • 1.首先去RabbitMQ 插件下载网站下载自己版本对应的ez文件类型插件
  • 2.插件rabbitmq_delayed_message_exchange下载完放入rabbitmq 的plugins文件下
  • 3.进入到rabbit文件的sbin目录,执行
代码语言:javascript
复制
#查看插件目录
rabbitmq-plugins list
#安装延迟队列插件 (rabbitmq-plugins enable 插件名)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
2.2 RabbitMq插件实现消息延迟的优点;

这个优点就是解决了上面方式的缺点。。。让消息延迟性绑定到消息本身上,使的每个消息有自己的过期时间;

2.3 实现方式;
  • 1.创建一个delay类型的exchange,绑定对应的队列
  • 2.这里delay类型和direct topic fanout等消息路由方式相对独立,也就是说我们可以和之前一样,指定exchange是direct还是topic还是fanout都可以。根本上来说:我们指定其是delay类型不过是决定了其什么时候投递到指定路由队列
  • 3.绑定路由队列进行消费

我们目前有两种不同类型的延迟,因此我这里用的topic模式结合延迟插件实现延迟队列;

topic结合delay插件实现延迟消息架构

如上所示,我们延迟的消息首先都放置到Mq里,然后延迟时间到了之后呢就会被路由到指定队列上去;

这么做有个小问题,如果我们延迟消息过多的话,那么必然存在着rabbitmq挤压消息,占用空间的问题,当然解决方案也比较简单

  • 延迟不超过一天的我们直接进入rabbitmq
  • 把延迟超过第一天的消息先进入mysql,每天定时扫第二天要发的数据,扫进mq里
  • 这样的话RabbitMQ就做到了只存储当天消息的能力;

如果我们消息非常非常多,可以把消息分发区间划的更细点,比如只存储每12小时的消息,甚至只存储每个小时要发送的消息,这都是完全OK的;

代码也很简单,这里提供一个绑定了两种业务的延迟队列的小demo: 延迟队列配置:

代码语言:javascript
复制
@Configuration
public class DelayedRabbitMQConfig {
  /**
     * 声明延时队列hangfire
     * 不设置TTL
     */
    @Bean("hDelayQueue")
    public Queue hangfireDelayQueue() {
        return new Queue(BaseDict.DELAY_QUEUE_NAME_H);
    }


    /**
     * 业务调用延迟队列
     *
     */
    @Bean("mDelayQueue")
    public Queue moduleDelayQueue() {
        return new Queue(BaseDict.DELAY_QUEUE_NAME_M);
    }


    /**
     * 延迟交换机
     *
     */
    @Bean(name = "delayExchange")
    public CustomExchange delayExchange() {
        Map<String, Object> args = new HashMap<>(2);
        args.put("x-delayed-type", "topic");
        return new CustomExchange(BaseDict.DELAY_EXCHANGE_DISPATCHER, "x-delayed-message", true, false, args);
    }


    @Bean
    public Binding hangfireBinding(@Qualifier("hDelayQueue") Queue queue,
                                   @Qualifier("delayExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(BaseDict.DELAY_QUEUE_ROUTING_KEY_H).noargs();
    }


    @Bean
    public Binding moduleBinding(@Qualifier("mDelayQueue") Queue queue,
                                 @Qualifier("delayExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(BaseDict.DELAY_QUEUE_ROUTING_KEY_M).noargs();
    }
}

消息生产

代码语言:javascript
复制
    @PostMapping("addMsgToMDelayQueue")
    public void addMsgToMQueue(@RequestBody XXX request) {
        sendDelayMsg(BaseDict.DELAY_EXCHANGE_DISPATCHER,BaseDict.DELAY_QUEUE_ROUTING_KEY_M, JSONObject.toJSONString(request), 10*24*60*60*1000);
    }



    @PostMapping("addMsgToHDelay")
    public void addMsgToHQueue(@RequestBody List<AAA> request) {
        sendDelayMsg(BaseDict.DELAY_EXCHANGE_DISPATCHER,BaseDict.DELAY_QUEUE_ROUTING_KEY_H, JSONObject.toJSONString(request), 10*24*60*60*1000);
    }


    /**
     * 消息发送到延迟交换机上
     */
    public void sendDelayMsg(String exchange,String routingKey, String msg, Integer delayTime) {
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, a -> {
            a.getMessageProperties().setDelay(delayTime);
            return a;
        });
    }

消息消费

代码语言:javascript
复制
@RabbitListener(queues = BaseDict.DELAY_QUEUE_NAME_H, concurrency = "10-30")
    public void hangfireReceive(Message message, Channel channel) throws IOException {
        final List<AAA> sendDetails = JSONObject.parseArray(JSONObject.toJSONString(message.getBody()), AAA.class);
        Optional.ofNullable(sendDetails).ifPresent(item->item.forEach(System.out::println));
    }


    @RabbitListener(queues = BaseDict.DELAY_QUEUE_NAME_M, concurrency = "10-20")
    public void receiveQ(Message message, Channel channel) throws IOException {
        final XXX request = JSONObject.parseObject(message.getBody(), XXX.class);
        System.out.println("消费"+request.toString());
    }
3.基于RabbitMQ延迟插件实现延迟消息的局限性

我们在第一次使用这个延迟插件的时候做了一个压测,大约100W数据量的延迟会导致内存和Cpu使用量的急速上升,查了一些文档没搞明白后,去了官网看了下,发现其对此有以下解释,大致是讲目前这个延迟插件还不足以支持那么大的数据量,建议数据量不要太大 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/issues/72

因此我们如果想用好延迟插件,目前来说需要做一些额外的配合,尽量使其延时最近的数据,并且数据量维持到一个比较低的程度

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/8/24 下,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. rabbitmq的延迟消息实现方式
    • 1.死信队列方式
      • 1.1我先大白话解释一下啥叫死信队列:
      • 1.2延迟队列+死信队列实现延迟消息发送
    • 2.借助rabbitmq的延迟插件
      • 2.1 延迟队列插件rabbitmq_delayed_message_exchange的安装
        • 2.2 RabbitMq插件实现消息延迟的优点;
          • 2.3 实现方式;
            • 3.基于RabbitMQ延迟插件实现延迟消息的局限性
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档