RabbitMQ 死信队列

https://www.rabbitmq.com/dlx.html

DLX 即 Dead-Letter-Exchange 也叫做死信交换机。

死信队列是指队列上的消息变成死信后,能够后发送到另外一个交换机,这个交换机 就是 DLX 。

一般有几种情况会变成死信:

  • 消息被拒绝( Basic.reject 或者 basic.nack)并且设置 requeue 参数为 false
  • 消息 过期 设置了 message TTL
  • 队列达到最大的长度

死信交换机是正常的交换机,能够在任何队列上被指定。其实死信交换机和一般的交换机没啥区别,只是添加了死信交换机的属性。如果队列上存在死信, RabbitMq 会将死信消息投递到设置的 DLX 上去 ,然后被路由到一个队列上,这个队列,就是死信队列。

流程如下:

生产者:

import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;



public class Producer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("xxxx");
        connectionFactory.setUsername("xxx");
        connectionFactory.setPassword("xxx");
        connectionFactory.setPort(5672);
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        String exchangeName = "normal_exchange";
        String routingkey = "dlx.dlx";
        String msg = "test dlx message";
        String queueName = "normal_queueName";
        Map<String,Object>map =new HashMap<>();
        //注意:x-dead-letter-exchange 这个key是固定这样写的,value是你自定义的。
        map.put("x-dead-letter-exchange","exchange.dlx");
        //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
        channel.queueDeclare(queueName, true, false, false, map);
        channel.exchangeDeclare(exchangeName,"direct",true,false,null);
        channel.queueBind(queueName,exchangeName,"dlx.dlx");
        for (int i = 0; i < 3; i++) {
            // deliveryMode=2 持久化,expiration 消息有效时间
            AMQP.BasicProperties properties=new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .contentEncoding("utf-8")
                    .expiration("7000")
                    .build();
            channel.basicPublish(exchangeName, routingkey, true, properties, msg.getBytes());
        }
    }
}

消费者:

import java.io.IOException;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("/");
        connectionFactory.setHost("xxxx");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("xxxx");
        connectionFactory.setPassword("xxxx");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //死信交换机声明
        channel.exchangeDeclare("exchange.dlx","topic",true,false,null);
        //死信队列声明
        channel.queueDeclare("queue.dlx",true,false,false,null);
        //routingkey指定为#就行,表示只要路由到死信队列的都接收
        channel.queueBind("queue.dlx","exchange.dlx","#");

        channel.basicConsume("queue.dlx", false, "myConsumer Tag", new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                  String routingKey = envelope.getRoutingKey();
                  String convernType = properties.getContentType();
                  long deliveryTag = envelope.getDeliveryTag();
                  System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body));
                  channel.basicAck(deliveryTag, false);
            }

        });

    }
}

上面的代码可以看到:

消息通过正常交换机 normal_exchange 到达了正常队列 normal_queue。

 map.put("x-dead-letter-exchange","exchange.dlx");
 //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
 channel.queueDeclare(queueName, true, false, false, map);

正常的队列 normal_queue 声明了下面参数下设置了一个 x-dead-letter-exchange 当消息过期时,将消息发送到死信交换机 exchange.dlx

死信交换机下面绑定了一个队列 queue.dlx

 channel.exchangeDeclare("exchange.dlx","topic",true,false,null);
 //死信队列声明
channel.queueDeclare("queue.dlx",true,false,false,null);
//routingkey指定为#就行,表示只要路由到死信队列的都接收
channel.queueBind("queue.dlx","exchange.dlx","#");

最后将消息发送到了死信队列上,消费者,消费死信队列 queue.dlx 上的消息即可

      channel.basicConsume("queue.dlx", false, "myConsumer Tag", new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
                  String routingKey = envelope.getRoutingKey();
                  String convernType = properties.getContentType();
                  long deliveryTag = envelope.getDeliveryTag();
                  System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body));
                  channel.basicAck(deliveryTag, false);
            }

        });

本文分享自微信公众号 - 程序员开发者社区(gh_016ffe40d550),作者:猿星人

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-08-22

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • RabbitMQ启动出现的问题与解决办法

    尝试下面的操作: 禁用 SELinux ,修改 /etc/selinux/config SELINUX=disabled

    王小明_HIT
  • Go channel 关闭和广播

    一个通道相当于 FIFO 的队列, 通道中各个元素之间都是严格按照发送的顺序队列,先发送的队列一定会被先接收,元素值的发送和传输和发送都使用到操作符 <-

    王小明_HIT
  • 你知道哪些设计模式

    创建型模式是对对象创建过程的各种问题和解决方案的总结,包括各种工厂模式(Factσry、 Abstract factory)、单例模式(siη geton、构建...

    王小明_HIT
  • Xcode 清理存储空间

    用户1890628
  • 【程序源代码】工作流Activiti不错的学习资料总结

    在第一家公司工作的时候主要任务就是开发OA系统,当然基本都是有工作流的支持,不过当时使用的工作流引擎是公司一些牛人开发的(据说是用一个开源的引擎修改的),名称叫...

    程序源代码
  • 基于Visual Studio Code

    下载路径:https://code.visualstudio.com/Download,注意系统类型和版本;

    py3study
  • 如何做智能合约审计?

    你可以自己学习,或者你可以使用这份便利的一步步的指南来准确地知道在什么时候该做什么,并对合约进行审计。

    辉哥
  • ElasticSearch(7.2.2)-倒排索引

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 ...

    cwl_java
  • Visual Studio 2017(V

    由于我本人之前一直使用VS进行C、C++编程,所以对这个平台有着很大的好感,这次我要开始学习Python的编程,便决定继续沿用这个平台。

    py3study
  • 国庆节前端技术栈充实计划(6):Web 应用的 13 个优化步骤

    时过境迁,Web 应用比以往任何时候都更具交互性,搞定性能可以帮助你极大地改善终端用户的体验。阅读以下的技巧并学以致用,看看哪些可以用来改善延迟,渲染时间以及整...

    疯狂的技术宅

扫码关注云+社区

领取腾讯云代金券