前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ的消息持久化处理

RabbitMQ的消息持久化处理

作者头像
别先生
发布2019-11-04 23:25:53
1.7K0
发布2019-11-04 23:25:53
举报
文章被收录于专栏:别先生别先生

1、RabbitMQ的消息持久化处理,消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何保证消息可靠性的呢——消息持久化。

2、autoDelete属性的理解。

  1)、@Queue: 当autoDelete属性设置到该注解的时候,含义即是,当所有消费者客户端连接断开后,是否自动删除队列,当设置值是true的时候删除该队列,当值是false的时候不删除该队列。

  2)、@Exchange:当autoDelete属性设置到该注解的时候,含义即是,当所有绑定队列都不在使用时,是否自动删除交换器,当设置值是true的时候删除该交换器,当值是false的时候不删除该交换器。

3、之前写过RabbitMQ的交换器Exchange之direct(发布与订阅 完全匹配),这里借助这个进行消息持久化测试。生产者的代码不发生改变,这里只是将消费者的autoDelete = "true"属性修改为autoDelete = "false",进行对比测试。

Info级别的日志进行消息的持久化操作,即队列不进行自动删除。将autoDelete = "false"即可。

 1 package com.example.bie.consumer;
 2 
 3 import org.springframework.amqp.core.ExchangeTypes;
 4 import org.springframework.amqp.rabbit.annotation.Exchange;
 5 import org.springframework.amqp.rabbit.annotation.Queue;
 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9 import org.springframework.stereotype.Component;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         消息接收者
16  * 
17  *         1、@RabbitListener bindings:绑定队列
18  * 
19  *         2、@QueueBinding
20  *         value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21  * 
22  *         3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23  * 
24  *         4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25  * 
26  * 
27  */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30 
31         value = @Queue(value = "${rabbitmq.config.queue.info}", autoDelete = "false"),
32 
33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),
34 
35         key = "${rabbitmq.config.queue.info.routing.key}"))
36 public class LogInfoConsumer {
37 
38     /**
39      * 接收消息的方法,采用消息队列监听机制.
40      * 
41      * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42      * 
43      * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44      * 
45      * @param msg
46      */
47     @RabbitHandler
48     public void consumer(String msg) {
49         // 打印消息
50         System.out.println("INFO消费者===>消费: " + msg);
51     }
52 
53 }

Error级别的日志进行消息的持久化操作,即队列进行自动删除。将autoDelete = "true"即可。

 1 package com.example.bie.consumer;
 2 
 3 import org.springframework.amqp.core.ExchangeTypes;
 4 import org.springframework.amqp.rabbit.annotation.Exchange;
 5 import org.springframework.amqp.rabbit.annotation.Queue;
 6 import org.springframework.amqp.rabbit.annotation.QueueBinding;
 7 import org.springframework.amqp.rabbit.annotation.RabbitHandler;
 8 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 9 import org.springframework.stereotype.Component;
10 
11 /**
12  * 
13  * @author biehl
14  * 
15  *         消息接收者
16  * 
17  *         1、@RabbitListener bindings:绑定队列
18  * 
19  *         2、@QueueBinding
20  *         value:绑定队列的名称、exchange:配置交换器、key:路由键routing-key绑定队列和交换器
21  * 
22  *         3、@Queue value:配置队列名称、autoDelete:是否是一个可删除的临时队列
23  * 
24  *         4、@Exchange value:为交换器起个名称、type:指定具体的交换器类型
25  * 
26  * 
27  */
28 @Component
29 @RabbitListener(bindings = @QueueBinding(
30 
31         value = @Queue(value = "${rabbitmq.config.queue.error}", autoDelete = "true"),
32 
33         exchange = @Exchange(value = "${rabbitmq.config.exchange}", type = ExchangeTypes.DIRECT),
34 
35         key = "${rabbitmq.config.queue.error.routing.key}"))
36 public class LogErrorConsumer {
37 
38     /**
39      * 接收消息的方法,采用消息队列监听机制.
40      * 
41      * @RabbitHandler意思是将注解@RabbitListener配置到类上面
42      * 
43      * @RabbitHandler是指定这个方法可以进行消息的接收并且消费.
44      * 
45      * @param msg
46      */
47     @RabbitHandler
48     public void consumer(String msg) {
49         // 打印消息
50         System.out.println("ERROR消费者===>消费<===消息message: " + msg);
51     }
52 
53 }

4、启动你的生产者,启动你的消费者,观察RabbitMQ的图形化界面。未生产消息、未消费消息的界面如下所示:

生产消息、消费消息的界面如下所示,我这里还使用浏览器访问控制层触发生产者生产消息,消费者消费消息:

现在停止你的消费者,记录消息到第几条消息了。方便再次启动消费者进行观察。

启动你的消费者,观察,看看是从第几条开始消费的。可以看到消息从第82条开始消费的。

RabbitMQ的消息持久化处理,Ready是对未接收到的数据状态表示,如果RabbitMQ在队列里面存放的消息未被消费者所消费,那么会给未消费的消息加一个标记,表示当前这个消息未被消费。消息持久化处理解决了丢失消息的这种状况,我们可以接收到消息,就是因为队列一直存在着呢,但是手动删除队列,消息也就丢失了,所以要慎重操作。当消费者停止以后,生产者生产的消息存储在RabbitMQ的服务器内存中,队列也存在内存中,数据在队列中,即数据保存在内存中。但是如果RabbitMQ的服务都停止了,队列也就消失了,队列消失了,数据也就丢失了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019-11-03 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档