前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >03、RabbitMQ延迟队列(死信交换机)

03、RabbitMQ延迟队列(死信交换机)

作者头像
天蝎座的程序媛
发布2023-10-17 11:02:36
1700
发布2023-10-17 11:02:36
举报

1、举例::“订单下单成功后,15分钟未支付自动取消”

1.传统处理超时订单      采取定时任务轮训数据库订单,并且批量处理。其弊端也是显而易见的;对服务器、数据库性会有很大的要求,并且当处理大量订单起来会很力不从心,而且实时性也不是特别好。当然传统的手法还可以再优化一下,即存入订单的时候就算出订单的过期时间插入数据库,设置定时任务查询数据库的时候就只需要查询过期了的订单,然后再做其他的业务操作 

2.rabbitMQ延时队列方案      一台普通的rabbitmq服务器单队列容纳千万级别的消息还是没什么压力的,而且rabbitmq集群扩展支持的也是非常好的,并且队列中的消息是可以进行持久化,即使我们重启或者宕机也能保证数据不丢失  

2、TTL和DLX

rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换机(DLX)和设置过期时间(TTL)结合起来实现延迟队列

1.TTL

TTL是Time To Live的缩写, 也就是生存时间。      RabbitMq支持对消息和队列设置TTL,对消息这设置是在发送的时候指定,对队列设置是从消息入队列开始计算, 只要超过了队列的超时时间配置, 那么消息会自动清除。      如果两种方式一起使用消息的TTL和队列的TTL之间较小的为准,也就是消息5s过期,队列是10s,那么5s的生效。      默认是没有过期时间的,表示消息没有过期时间;如果设置为0,表示消息在投递到消费者的时候直接被消费,否则丢弃。

设置消息的过期时间用 x-message-ttl 参数实现,单位毫秒。 设置队列的过期时间用 x-expires 参数,单位毫秒,注意,不能设置为0。

消息:生产者 -> 交换机 消息在生产者制造消息的时候就开始计算了TTL  TTL=5 队列:生产者 -> 交换机 -> 路由键 -> 队列 当消息送达到队列的时候才开始计算TTL  TTL=10

2.DLX和死信队列

LX即Dead-Letter-Exchange(死信交换机),它其实就是一个正常的交换机,能够与任何队列绑定。

死信队列是指队列(正常)上的消息(过期)变成死信后,能够发送到另外一个交换机(DLX),然后被路由到一个队列上,这个队列,就是死信队列。 成为死信一般有以下几种情况:      消息被拒绝(basic.reject or basic.nack)且带requeue=false参数      消息的TTL-存活时间已经过期      队列长度限制被超越(队列满)

注1:如果队列上存在死信, RabbitMq会将死信消息投递到设置的DLX上去 , 注2:通过在队列里设置x-dead-letter-exchange参数来声明DLX,如果当前DLX是direct类型还要声明x-dead-letter-routing-key参数来指定路由键,如果没有指定,则使用原队列的路由键    

3、延迟队列

通过DLX和TTL模拟出延迟队列的功能,即,消息发送以后,不让消费者拿到,而是等待过期时间,变成死信后,发送给死信交换机再路由到死信队列进行消费

 4、实操(源码)

生产者

代码语言:javascript
复制
package com.zking.rabbitmqproduct.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;
import java.util.HashMap;
import java.util.Map;

/**
 * 死信交换机
 */
@Configuration
public class RabbitmqDeadConfig {

    //定义正常的队列,交换机,路由键
    public static final String NORMAL_QUEUE="normal-queue";
    public static final String NORMAL_EXCHANGE="normal-exchange";
    public static final String NORMAL_ROUTING_KEY="normal-routing-key";

    //定义死信的队列,交换机,路由键
    public static final String DEAD_QUEUE="dead-queue";
    public static final String DEAD_EXCHANGE="dead-exchange";
    public static final String DEAD_ROUTING_KEY="dead-routing-key";

    /**
     * 定义正常的队列,并设置TTL(过期时间)、DLX(死信交换机)、DLK(死信路由键)
     * @return
     */
    @Bean
    public Queue normalQueue(){
        Map<String,Object> map = new HashMap<>();
        //message在该队列queue的存活时间最大为15秒
        map.put("x-message-ttl", 15000);
        //x-dead-letter-exchange参数是设置该队列的死信交换器(DLX)
        map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //x-dead-letter-routing-key参数是给这个DLX指定路由键
        map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
        return new Queue(NORMAL_QUEUE, true, false, false, map);
    }

    /**
     * 定义正常的交换机(直连)
     * @return
     */
    @Bean
    public DirectExchange normalExchange(){
        return new DirectExchange(NORMAL_EXCHANGE,true,false);
    }

    /**
     * 配置正常的队列与正常的交换机的绑定关系
     * @return
     */
    @Bean
    public Binding normalbinding(){
        return BindingBuilder.bind(normalQueue())
                .to(normalExchange())
                .with(NORMAL_ROUTING_KEY);
    }

    /**********************以下为死信队列相关操作***********************/

    @Bean
    public Queue deadQueue(){
        return new Queue(DEAD_QUEUE);
    }

    @Bean
    public DirectExchange deadExchange(){
        return new DirectExchange(DEAD_EXCHANGE,true,false);
    }

    @Bean
    public Binding deadBinding(){
        return BindingBuilder.bind(deadQueue())
                .to(deadExchange())
                .with(DEAD_ROUTING_KEY);
    }
}
代码语言:javascript
复制
@RequestMapping("/DeadSend")
    public Map<String,Object> DeadSend(){
        Map<String,Object> json = new HashMap<String,Object>();
        json.put("msg","OK");
        json.put("code",200);
        //获取发送消息
        Map<String,Object> data= this.json("hello,rabbitmq!!! this is a directExchange");
        //将消息通过绑定键发送到RabbitMQ的扇形交换机中,再由扇形交换机根将消息群发到绑定的队列中(与路由键无关)
        //流程:生产者-->交换机-->队列<-->消费者
        rabbitTemplate.convertAndSend(RabbitmqDeadConfig.NORMAL_EXCHANGE,
                RabbitmqDeadConfig.NORMAL_ROUTING_KEY,data);

        return json;
    }

消费者可以和昨天差不多的写法,以上就是今天的分享了

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2023-05-08,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1、举例::“订单下单成功后,15分钟未支付自动取消”
  • 2、TTL和DLX
    • rabbitMQ中是没有延时队列的,也没有属性可以设置,只能通过死信交换机(DLX)和设置过期时间(TTL)结合起来实现延迟队列
      • 1.TTL
        • 2.DLX和死信队列
        • 3、延迟队列
        •  4、实操(源码)
          • 生产者
            • 消费者可以和昨天差不多的写法,以上就是今天的分享了
            相关产品与服务
            数据库
            云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档