前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMq TTL+死信队列 延迟消息问题记录

RabbitMq TTL+死信队列 延迟消息问题记录

原创
作者头像
Java king
修改2023-02-21 17:49:29
1.2K0
修改2023-02-21 17:49:29
举报
文章被收录于专栏:后端Java后端Java

延迟队列存储的对象是对应的延迟消息,所谓的延迟消息是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费

利用RabbitMqTTL 和死信队列 来实现延时消费。

如果设置的是队列统一过期时间放到死信队列,没有什么问题。

如果是延时时间设置到每条消息上的。而不是给队列的。

实现方式为消息存活时间为动态用户页面可配置的。

这就导致了一个问题:

先用一条消息的存活时间是1天。后面又进了一条消息存活时间是1小时。

结果一小时到了,发现这条消息并没有被转发到消费延时过期消息的队列。

原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。

它不会检测每一条消息是否过期。而是顺序检测。

如果first in的消息过期时间很长,会导致它阻塞后进的消息。

不仅无法实现真正的过期时间。还会导致,一个大的过期时间的先进的消息,会堆积一堆后进的过期时间短的消息。

问题解决

这个时候可以使用rabbitMq的一个插件:rabbitmq_delayed_message_exchange

一段时间以来,人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。而rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送

插件安装

需要根据自己的rabbitMq选择对应的版本。我rabbitMq的版本是RabbitMQ 3.11.0,对应的插件版本就是:3.11.1

基于Linux

代码语言:shell
复制
--1、cd到rabbitmq默认安装位置
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins

--2、通过ftp工具将插件上传到此目录下

--3、开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

--4、重启MQ服务
systemctl restart rabbitmq-server

基于Docker

代码语言:shell
复制
--1、通过ftp工具将插件上传到Linux服务器的根目录下

--2、拷贝到docker中rabbitmq插件目录下,rabbitmq_delayed_message_exchange-3.9.0.ez(下载包的全名)
docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins

--3、进入容器
docker exec -it 容器id /bin/bash

--4、查看插件是否存在(确保2中的操作已经将插件拷贝过来了)
cd plugins
ls |grep delay

--5、开启插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

--6、退出容器
exit

--7、重启MQ服务
docker restart 容器ID

安装成功

web界面新建交换机选择类型出现红框标注即表示成功

image.png
image.png

代码实现

1:springBoot配置

代码语言:java
复制
@Configuration
public class DelayRabbitmqConfig {
 
    /**
     * 声明延迟队列
     * @return
     */
    @Bean
    public Queue delayQueue(){
        return new Queue(QueueConstant.DelayQueue,
                true,false,false);
    }
 
    /**
     * 声明延迟自定义交换机类型
     * @return
     */
    @Bean
    public CustomExchange delayCustomExchange(){
        HashMap<String, Object> args = new HashMap<>();
//        设置 x-delayed-type 为 direct,当然也可以是 topic 等 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递
        args.put("x-delayed-type","direct");
        return new CustomExchange(ExchangeConstant.DelayCustomerExchange,
                "x-delayed-message",true,false,args);
    }
 
    /**
     * 绑定延迟交换机和队列
     * @return
     */
    @Bean
    public Binding delayQueueAndCustomExchange(){
        return BindingBuilder.bind(delayQueue())
                .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs();
    }
}

springMvc配置

代码语言:java
复制
    引入依赖:
    xmlns:util="http://www.springframework.org/schema/util"
    http://www.springframework.org/schema/util
    http://www.springframework.org/schema/util/spring-util-4.0.xsd


	<!-- 定义延时队列 -->
	<!-- durable:是否持久化,宕机恢复后会重持久化日志恢复消息队列 -->
	<!-- exclusive: 仅创建者可以使用的私有队列,断开后自动删除 -->
	<!-- auto_delete: 当所有消费客户端连接断开后,是否自动删除队列 -->
	<rabbit:queue id="taskStartQueue" name="taskStartQueue" durable="true" auto-delete="false" exclusive="false"/>

    	<!--定义延时交换机类型-->
	<util:map id="delayExchangeArgs" map-class="java.util.HashMap">
		<entry key="x-delayed-type" value="direct"/>
	</util:map>

    <!-- 自定义交换机 - 延时交换机 : rabbitMq 第三方插件 rabbitmq_delayed_message_exchange 实现延时队列 -->
    <!--注意:使用convertAndSend().发送消息后不会进入下面定义的队列中,而是由第三方插件管理,可以从交换机 messages delayed中查看消息数-->
    <bean id="taskExchange" name="delayExchange" class="org.springframework.amqp.core.CustomExchange">
        <constructor-arg name="name" value="taskExchange"/>
        <constructor-arg name="type" value="x-delayed-message"/>
        <constructor-arg name="durable" value="true"/>
        <constructor-arg name="autoDelete" value="false"/>
        <constructor-arg name="arguments" ref="delayExchangeArgs"/>
    </bean>

    <util:map id="emptyMap" map-class="java.util.HashMap" />

    <bean id="taskStartBind" class="org.springframework.amqp.core.Binding">
        <constructor-arg name="destination" value="taskStartQueue"/>
        <constructor-arg name="destinationType" value="QUEUE"/>
        <constructor-arg name="exchange" value="taskExchange"/>
        <constructor-arg name="routingKey" value="delay.task.start"/>
        <constructor-arg name="arguments" ref="emptyMap"/>
    </bean>

代码实现

代码语言:java
复制
//消息发送
final MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(Integer.valueOf(ttl.toString()));
DisTimingPushDto disTimingPushDto = new DisTimingPushDto();
disTimingPushDto.setOrderId(dispense.getOrderId());
disTimingPushDto.setPushTime(disDispense.getPushTime());
rabbitTemplate.convertAndSend(MsgQueueEnum.TIMING_PUSH.getExchangeName(), MsgQueueEnum.TIMING_PUSH.getQueueName(), disTimingPushDto, messagePostProcessor);

//每条消息时间配置
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;

/**
 * 延迟消息处理器 Processor
 * @author king
 * @date 2022年12月28日 11:14
 */
public class MyMessagePostProcessor implements MessagePostProcessor {

    /**
     * 消息延迟时间,单位:毫秒
     */
    private final Integer TTL;

    public MyMessagePostProcessor(final Integer ttl) {
        this.TTL = ttl;
    }


    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setDelay(TTL);
        return message;
    }
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 这就导致了一个问题:
  • 问题解决
  • 插件安装
    • 基于Linux
      • 基于Docker
      • 安装成功
      • 代码实现
        • 1:springBoot配置
          • springMvc配置
          • 代码实现
          相关产品与服务
          容器服务
          腾讯云容器服务(Tencent Kubernetes Engine, TKE)基于原生 kubernetes 提供以容器为核心的、高度可扩展的高性能容器管理服务,覆盖 Serverless、边缘计算、分布式云等多种业务部署场景,业内首创单个集群兼容多种计算节点的容器资源管理模式。同时产品作为云原生 Finops 领先布道者,主导开源项目Crane,全面助力客户实现资源优化、成本控制。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档