前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Rabbit 高级操作

Rabbit 高级操作

作者头像
张小驰出没
发布2021-12-06 16:44:44
3530
发布2021-12-06 16:44:44
举报

Rabbit 高级操作

1.过期时间TTL

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了时间之后消息将自动被删除。

RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。

  • 第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
  • 第二种方法是对消息进行单独设置,每条消息TTL可以不同。

如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

1.1 设置队列TTL

1.1.1 配置文件方式
代码语言:javascript
复制
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!--定义过期队列及其属性,不存在则自动创建-->
    <rabbit:queue id="my_ttl_queue" name="my_ttl_queue" auto-declare="true">
        <rabbit:queue-arguments>
            <!--投递到该队列的消息如果没有消费都将在6秒之后被删除-->
            <entry key="x-message-ttl" value-type="long" value="6000"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    
</beans>
1.1.2 配置类方式
代码语言:javascript
复制
@Configuration
public class RabbitMQConfig {
    @Bean("my_ttl_queue")
    public Queue queue() {
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("x-message-ttl", 6000); // 队列中的消息未被消费 10 秒后过期
        return new Queue("my_ttl_queue", true, false, false, map);
    }
}
1.1.3 创建测试类
代码语言:javascript
复制
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 测试过期队列消息
     * 投递到该队列的消息如果没有消费都将在6秒之后被删除
     */
    @Test
    public void ttlQueueTest(){
        //路由键与队列同名
        rabbitTemplate.convertAndSend("my_ttl_queue", "发送到过期队列my_ttl_queue,6秒内不消费则过期。");
    }
}

运行之后,可以进入网站中查看该队列:

刚运行时:

1
1

运行6s后:

2
2

1.2 设置消息TTL

代码语言:javascript
复制
/**
     * 过期消息
     * 该消息投递任何交换机或队列中的时候;如果到了过期时间则将从该队列中删除
     */
@Test
public void ttlMessageTest(){
    MessageProperties messageProperties = new MessageProperties();
    //设置消息的过期时间,5秒
    messageProperties.setExpiration("5000");
    Message message = new Message("测试过期消息,5秒钟过期".getBytes(), messageProperties);
    //路由键与队列同名
    rabbitTemplate.convertAndSend("my_ttl_queue", message);
}

这样可以给这条message 设置过期时间,如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。所以会 5s 过期。

2.死信队列

DLX,全称为Dead-Letter-Exchange , 可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX ,绑定DLX的队列就称之为死信队列。

消息变成死信,可能是由于以下的原因:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。

要想使用死信队列,只需要在定义队列的时候设置队列参数 x-dead-letter-exchange 指定交换机即可。

2.1 配置文件方法

2.1.1 消息过期情况
定义死信队列
代码语言:javascript
复制
<!--定义定向交换机中的持久化死信队列,不存在则自动创建-->
<rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>
<!--定义广播类型交换机;并绑定队列-->
<rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true">
    <rabbit:bindings>
        <!--绑定路由键my_ttl_dlx,可以将过期的消息转移到my_dlx_queue队列-->
        <rabbit:binding key="my_ttl_dlx" queue="my_dlx_queue"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
队列设置死信交换机
代码语言:javascript
复制
<!--定义过期队列及其属性,不存在则自动创建-->
<rabbit:queue id="my_ttl_dlx_queue" name="my_ttl_dlx_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <!--投递到该队列的消息如果没有消费都将在6秒之后被投递到死信交换机-->
        <entry key="x-message-ttl" value-type="long" value="6000"/>
        <!--设置当消息过期后投递到对应的死信交换机-->
        <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<!--定义定向交换机 根据不同的路由key投递消息-->
<rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true">
    <rabbit:bindings>
        <rabbit:binding key="my_ttl_dlx" queue="my_ttl_dlx_queue"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
2.1.2 消息队列过长情况
定义死信队列
代码语言:javascript
复制
<!--定义定向交换机中的持久化死信队列,不存在则自动创建-->
<rabbit:queue id="my_dlx_queue" name="my_dlx_queue" auto-declare="true"/>
<!--定义广播类型交换机-->
<rabbit:direct-exchange id="my_dlx_exchange" name="my_dlx_exchange" auto-declare="true">
    <rabbit:bindings>
        <!--绑定路由键my_max_dlx-->
        <rabbit:binding key="my_max_dlx" queue="my_dlx_queue"/>
    </rabbit:bindings>
</rabbit:direct-exchange>
队列设置死信交换机
代码语言:javascript
复制
<!--定义限制长度的队列及其属性,不存在则自动创建-->
<rabbit:queue id="my_max_dlx_queue" name="my_max_dlx_queue" auto-declare="true">
    <rabbit:queue-arguments>
        <!--投递到该队列的消息最多2个消息,如果超过则最早的消息被删除投递到死信交换机-->
        <entry key="x-max-length" value-type="long" value="2"/>
        <!--设置当消息过期后投递到对应的死信交换机-->
        <entry key="x-dead-letter-exchange" value="my_dlx_exchange"/>
    </rabbit:queue-arguments>
</rabbit:queue>
<!--定义定向交换机 根据不同的路由key投递消息-->
<rabbit:direct-exchange id="my_normal_exchange" name="my_normal_exchange" auto-declare="true">
    <rabbit:bindings>
        <rabbit:binding key="my_max_dlx" queue="my_max_dlx_queue"/>
    </rabbit:bindings>
</rabbit:direct-exchange>

2.2 配置类方式

2.2.1 消息过期情况
代码语言:javascript
复制
@Configuration
public class RabbitMQConfig {
     /**
     * 声明死信交换机
     *
     * @return 返回
     */
    @Bean("my_dlx_exchange")
    public DirectExchange myDlxExchange() {
        return new DirectExchange("my_dlx_exchange", true, false, new HashMap<>());
    }
    /**
     * 声明死信队列
     *
     * @return 返回
     */
    @Bean("my_dlx_queue")
    public Queue myDlxQueue() {
        return new Queue("my_dlx_queue", true, false, false, new HashMap<>());
    }
    /**
     * 绑定队列,设置路由key
     *
     * @param queue          参数
     * @param directExchange 参数
     * @return 返回
     */
    @Bean
    public Binding bindingDead(@Qualifier("my_dlx_queue") Queue queue, @Qualifier("my_dlx_exchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("my_ttl_dlx");
    }
    /**
     * 声明过期队列
     *
     * @return 返回
     */
    @Bean("my_ttl_dlx_queue")
    public Queue myTtlDlxQueue() {
        Map<String, Object> map = new HashMap<>();
        //6s后队列过期
        map.put("x-message-ttl", 6000);
        //消息过期后,进入死信交换机
        map.put("x-dead-letter-exchange", "my_dlx_exchange");
        return new Queue("my_ttl_dlx_queue", true, false, false, map);
    }
    /**
     * 声明交换机
     *
     * @return 返回
     */
    @Bean("my_normal_exchange")
    public DirectExchange myNormalExchange() {
        return new DirectExchange("my_normal_exchange", true, false, new HashMap<>());
    }
    /**
     * 交换机绑定过期队列
     *
     * @param queue          参数
     * @param directExchange 参数
     * @return 返回
     */
    @Bean
    public Binding binding(@Qualifier("my_ttl_dlx_queue") Queue queue, @Qualifier("my_normal_exchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("my_ttl_dlx");
    } 
}
2.2.2 消息过长情况
代码语言:javascript
复制
@Configuration
public class RabbitMQConfig {
    /**
     * 声明死信交换机
     *
     * @return 返回
     */
    @Bean("my_dlx_exchange")
    public DirectExchange myDlxExchange() {
        return new DirectExchange("my_dlx_exchange", true, false, new HashMap<>());
    }
    /**
     * 声明死信队列
     *
     * @return 返回
     */
    @Bean("my_dlx_queue")
    public Queue myDlxQueue() {
        return new Queue("my_dlx_queue", true, false, false, new HashMap<>());
    }
    /**
     * 绑定队列,设置路由key
     *
     * @param queue          参数
     * @param directExchange 参数
     * @return 返回
     */
    @Bean
    public Binding bindingDead(@Qualifier("my_dlx_queue") Queue queue, @Qualifier("my_dlx_exchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("my_max_dlx");
    }
   /**
     * 声明过长队列
     *
     * @return 返回
     */
    @Bean("my_max_dlx_queue")
    public Queue myMaxDlxQueue(){
        Map<String, Object> map = new HashMap<>();
        //设置消息过长
        map.put("x-max-length",2);
        //消息过长后,进入死信队列
        map.put("x-dead-letter-exchange","my_dlx_exchange");
        return new Queue("my_max_dlx_queue",true,false,false,map);
    }
    /**
     * 声明交换机
     *
     * @return 返回
     */
    @Bean("my_normal_exchange")
    public DirectExchange myNormalExchange() {
        return new DirectExchange("my_normal_exchange", true, false, new HashMap<>());
    }
    /**
     * 消息过长   交换机绑定过期队列
     *
     * @param queue          参数
     * @param directExchange 参数
     * @return 返回
     */
    @Bean
    public Binding binding2(@Qualifier("my_max_dlx_queue") Queue queue, @Qualifier("my_normal_exchange") DirectExchange directExchange) {
        return BindingBuilder.bind(queue).to(directExchange).with("my_max_dlx");
    }
}

2.3 死信队列测试

2.3.1 测试消息过期情况
代码语言:javascript
复制
/**
     * 过期消息投递到死信队列
     * 投递到一个正常的队列,但是该队列有设置过期时间,到过期时间之后消息会被投递到死信交换机(队列)
     */
@Test
public void dlxTTLMessageTest(){
    rabbitTemplate.convertAndSend("my_normal_exchange", "my_ttl_dlx", "测试过期消息;6秒过期后会被投递到死信交换机");
}

刚运行时:

3
3

6s之后:

4
4
2.3.2 测试消息过长情况
代码语言:javascript
复制
/**
     * 超过队列长度消息投递到死信队列
     * 投递到一个正常的队列,但是该队列有设置最大消息数,到最大消息数之后队列中最早的消息会被投递到死信交换机(队列)
     */
@Test
public void dlxMaxMessageTest(){
    rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
                                  "队列my_max_dlx_queue的最大长度为2;消息超过后会被投递到死信交换机;这是第1个消息");
    rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
                                  "队列my_max_dlx_queue的最大长度为2;消息超过后会被投递到死信交换机;这是第2个消息");
    rabbitTemplate.convertAndSend("my_normal_exchange", "my_max_dlx",
                                  "队列my_max_dlx_queue的最大长度为2;消息超过后会被投递到死信交换机;这是第3个消息");
}

发送了三条信息,而设置的消息队列长度为2,这样最先发送的第1个消息会进入死信队列:

5
5
6
6

3.延迟队列

延迟队列存储的对象是对应的延迟消息;所谓“延迟消息” 是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。在RabbitMQ中延迟队列可以通过 过期时间 + 死信队列 来实现;

3.1 生产者

代码语言:javascript
复制
public class Producer {
    //交换机名称
    static final String MY_DLX_EXCHANGE = "my_dlx_exchange";
    static final String MY_NORMAL_EXCHANGE = "my_normal_exchange";
    //队列名称
    static final String MY_DLX_QUEUE = "my_dlx_queue";
    static final String MY_TTL_DLX = "my_ttl_dlx";
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        //声明死信交换机
        channel.exchangeDeclare(MY_DLX_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明一个队列
        channel.queueDeclare(MY_DLX_QUEUE, true, false, false, null);
        //绑定
        channel.queueBind(MY_DLX_QUEUE, MY_DLX_EXCHANGE, "my_ttl_dlx");
        //声明工作交换机
        channel.exchangeDeclare(MY_NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明工作队列
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("x-message-ttl", 6000);
        map.put("x-dead-letter-exchange", MY_DLX_EXCHANGE);
        channel.queueDeclare("my_ttl_dlx", true, false, false, map);
        //绑定
        channel.queueBind(MY_TTL_DLX, MY_NORMAL_EXCHANGE, "my_ttl_dlx");

        String message = LocalDateTime.now() + ",延迟6s的消息";
        channel.basicPublish(MY_NORMAL_EXCHANGE, "my_ttl_dlx", null, message.getBytes());
        System.out.println("发送消息为::" + message);

        channel.close();
        connection.close();
    }
}

3.2 消费者

代码语言:javascript
复制
public class Consumer {
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.57.129");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setVirtualHost("/");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(Producer.MY_DLX_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.queueDeclare(Producer.MY_DLX_QUEUE,true,false,false,null);
        channel.queueBind(Producer.MY_DLX_QUEUE,Producer.MY_DLX_EXCHANGE,"my_ttl_dlx");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者收到消息:" + new String(body,"utf-8")+",当前时间:"+ LocalDateTime.now());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(Producer.MY_DLX_QUEUE, false, consumer);
    }
}

3.3 测试

先启动 消费者 ,开启监听,在启动 生产者 发送消息。

这时候可能出现错误:

这个问题是因为你创建的交换机已经存在,可以去rabbitmq网站中,删除对应交换机。

再次运行就成功了:

8
8
9
9

个人博客为: MoYu’s HomePage

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2021-09-26 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Rabbit 高级操作
    • 1.过期时间TTL
      • 1.1 设置队列TTL
      • 1.2 设置消息TTL
    • 2.死信队列
      • 2.1 配置文件方法
      • 2.2 配置类方式
      • 2.3 死信队列测试
    • 3.延迟队列
      • 3.1 生产者
      • 3.2 消费者
      • 3.3 测试
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档