专栏首页Java旅途《RabbitMQ》什么是死信队列

《RabbitMQ》什么是死信队列

一 什么是死信队列

当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信。

  • 消息被拒绝(basic.reject / basic.nack),并且requeue = false
  • 消息TTL过期
  • 队列达到最大长度

当消息在一个队列中变成一个死信之后,如果配置了死信队列,它将被重新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。

二 实现死信队列

2.1 原理图

2.2 创建消费者

创建一个消费者,绑定消费队列及死信交换机,交换机默认为direct模型,死信交换机也是,arguments绑定死信交换机和key。(注解支持的具体参数文末会附上)

public class DirectConsumer {
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "javatrip",arguments = 
                    {@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
                     @Argument(name="x-dead-letter-routing-key",value = "deadKey")
                    }),
                    exchange = @Exchange(value="javatripDirect"),
                    key = {"info","error","warning"}
            )
    })
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
    System.out.println("消费者1"+message);
}

2.3 创建生产者

public void publishMessage(String message){

    rabbitTemplate.setMandatory(true);
    rabbitTemplate.convertAndSend("javatripDirect","info",message);
}

三 造成死信的三种情况

3.1 拒绝消息,并且禁止重新入队

  1. 设置yml为手动签收模式
spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual
  1. 设置拒绝消息并禁止重新入队
Long deliverTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicNack(deliverTag,false,false);
  1. 绑定死信队列
@RabbitListener(bindings = {
    @QueueBinding(
        value = @Queue(value = "javatripDead"),
        exchange = @Exchange(value = "deadExchange"),
        key = "deadKey"
    )
})
public void receive2(String message){
    System.out.println("我是一条死信:"+message);
}

3.2 消息TTL过期

绑定业务队列的时候,增加消息的过期时长,当消息过期后,消息将被转发到死信队列中。

@RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "javatrip",arguments =
                    {@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
                     @Argument(name="x-dead-letter-routing-key",value = "deadKey"),
                     @Argument(name = "x-message-ttl",value = "3000")
                    }),
                    exchange = @Exchange(value="javatripDirect"),
                    key = {"info","error","warning"}
            )
    })
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
    System.out.println("消费者1"+message);
}

3.3 队列达到最大长度

设置消息队列长度,当队列中的消息达到最大长度后,继续发送消息,消息将被转发到死信队列中。

@RabbitListener(bindings = {
            @QueueBinding(value = @Queue(value = "javatrip",arguments =
                    {@Argument(name="x-dead-letter-exchange",value = "deadExchange"),
                     @Argument(name="x-dead-letter-routing-key",value = "deadKey"),
                     @Argument(name = "x-max-length",value = "3")
                    }),
                    exchange = @Exchange(value="javatripDirect"),
                    key = {"info","error","warning"}
            )
    })
public void receive1(String message, @Headers Map<String,Object> headers, Channel channel)throws Exception{
    System.out.println("消费者1"+message);
}

四 Spring Boot整合RabbitMQ用到的几个注解

  1. @QueueBinding作用就是将队列和交换机进行绑定,主要有以下三个参数:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueBinding {

    /**
     * @return the queue.
     */
    Queue value();

    /**
     * @return the exchange.
     */
    Exchange exchange();

    /**
     * @return the routing key or pattern for the binding.
     * Multiple elements will result in multiple bindings.
     */
    String[] key() default {};
}
  1. @Queue是声明队列及队列的一些属性,主要参数如下:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {

    /**
     * @return the queue name or "" for a generated queue name (default).
     */
    @AliasFor("name")
    String value() default "";

    /**
     * @return the queue name or "" for a generated queue name (default).
     * @since 2.0
     */
    @AliasFor("value")
    String name() default "";

    /**
     * 是否持久化
     */
    String durable() default "";

    /**
     * 是否独享、排外的.
     */
    String exclusive() default "";

    /**
     * 是否自动删除;
     */
    String autoDelete() default "";

    /**
     * 队列的其他属性参数
     * (1)x-message-ttl:消息的过期时间,单位:毫秒;
     *(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
     *(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
     *(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消      * 息;
     *(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-         * head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
     *(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息       * 可指定发送到该交换器中;
     *(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设         * 置,则使用消息的原来的路由键值
     *(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费     * 者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
     *(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
     *(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使         * 用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
     *(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
     */
    Argument[] arguments() default {};
}
  1. @Exchange是声明交换及交换机的一些属性,
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {

    String TRUE = "true";

    String FALSE = "false";

    /**
     * @return the exchange name.
     */
    @AliasFor("name")
    String value() default "";

    /**
     * @return the exchange name.
     * @since 2.0
     */
    @AliasFor("value")
    String name() default "";

    /**
     * 交换机类型,默认DIRECT
     */
    String type() default ExchangeTypes.DIRECT;

    /**
     * 是否持久化
     */
    String durable() default TRUE;

    /**
     * 是否自动删除
     */
    String autoDelete() default FALSE;

    /**
     * @return the arguments to apply when declaring this exchange.
     * @since 1.6
     */
    Argument[] arguments() default {};
}

< END >

本文分享自微信公众号 - Java旅途(Javatrip),作者:Java旅途

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-08-06

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 通过正则表达式限制输入框只能数字,且不能以0开头

    最近在做项目的时候有个需求,输入框输入金额的。这个金额只能是整数,我们都知道金额肯定不能以0开头。

    Java旅途
  • rabbitmq系列(四)死信队列

    当消息在一个队列中变成一个死信之后,它将被重新publish到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将死信投递到一个队列上就是死信队列。具体...

    Java旅途
  • spring-boot-route(十三)整合RabbitMQ消息队列

    这篇是SpringBoot整合消息队列的第一篇文章,我们详细介绍下消息队列的相关内容。

    Java旅途
  • 模板语言

    常用语法 只需要记两种特殊符号: {{  }}和 {% %} 变量相关的用{{}},逻辑相关的用{%%}。 变量 {{ 变量名 }} 变量名由字母数字和下划线组...

    人生不如戏
  • python终极篇 ---django

                                                模板系统                                ...

    py3study
  • 10个最重大的Web应用风险与攻防

    先来看几个出现安全问题的例子 ? ? ? ? ? OWASP TOP10 ? 开发为什么要知道OWASP TOP10 ? TOP1-注入 ? TOP1-注入的示...

    用户1263954
  • 缓存架构之史上讲的最明白的RabbitMQ可靠消息传输实战演练

    比如:某个广告主(如:天猫)想在我们的平台(如:今日头条)投放广告,当通过我们的广告系统新建广告的时候,该消息在同步给redis缓存(es)的时候丢失了,而我们...

    用户1263954
  • null或空值的判断处理

    1,错误用法一: if (name == "") {      //do something } 2,错误用法二: if (name.equals(""))...

    似水的流年
  • null或空值的判断处理

    1,错误用法一: if (name == "") {      //do something } 2,错误用法二: if (name.equal...

    似水的流年
  • null或空值的判断处理

    1,错误用法一: if (name == "") {      //do something } 2,错误用法二: if (name.equals(""))...

    似水的流年

扫码关注云+社区

领取腾讯云代金券