前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议(下)

深入剖析 RabbitMQ —— Spring 框架下实现 AMQP 高级消息队列协议(下)

原创
作者头像
风尘浪子
修改2019-08-13 14:37:10
8150
修改2019-08-13 14:37:10
举报
文章被收录于专栏:JAVA 核心编程JAVA 核心编程

前言

消息队列在现今数据量超大,并发量超高的系统中是十分常用的。本文将会对现时最常用到的几款消息队列框架 ActiveMQ、RabbitMQ、Kafka 进行分析对比。

详细介绍 RabbitMQ 在 Spring 框架下的结构及实现原理,从Producer 端的事务、回调函数(ConfirmCallback / ReturnCallback)到 Consumer 端的 MessageListenerContainer 信息接收容器进行详细的分析。通过对 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等常用类型介绍,深入剖析在消息处理各个传输环节中的原理及注意事项。

并举以实例对死信队列、持久化操作进行一一介绍。

目录

一、RabbitMQ 与 AMQP 的关系

二、RabbitMQ 的实现原理

三、RabbitMQ 应用实例

四、Producer 端的消息发送与监控

五、Consumer 端的消息接收与监控

六、死信队列

七、持久化操作

六、死信队列

死信队列(Dead-Letter-Exchange) 可被看作是死信交换器。当消息在一个队列中变成死信后,它能被重新被发送到特定的交换器中,这个交换器就是DLX ,绑定DLX 的队列就称之为死信队列。消息变成死信一般是由于以下几种情况:

·消息被拒绝,requeue 被设置为 false, 可通过上一介绍的 void basicReject (deliveryTag, requeue) 或 void basicNack(deliveryTag,multiple, requeue) 完成设置 ;

·消息过期;

·队列超出最大长度。

其实死信队列 DLX 也是一个正常的交换器,和一般的交换器没有什么区别,我们可以用一般建立队列的方法,建立一个死信队列。然后建立一个正常的队列,在正常队列中加入参数 x-dead-letter-exchange、x-dead-letter-routing-key 与死信队列进行绑定,完成绑定后在管理界面 Features 选项中 direct.queue.first 会显示 DLX DLK。这时当被绑定的队列出现超时,超长,或被拒绝时(注意requeue被设置为false时,对会激发死信),信息就会流入死信队列被处理。

具体的例子Producer端:

代码语言:javascript
复制
 1 @Configuration 
 2 public class BindingConfig {
 3 public final static String Queue_First="direct.queue.first";
 4 public final static String Exchange_Name="directExchange";
 5 public final static String Routing_Key_First="directKey1";
 6 
 7 @Bean
 8 public Queue queueFirst(){
 9 return new Queue(this.Queue_First);
10 }
11 
12 @Bean
13 public DirectExchange directExchange(){
14 return new DirectExchange(this.Exchange_Name);
15 }
16 
17 @Bean
18 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
19 return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
20 }
21 }
22 
23 @Configuration
24 public class ConnectionConfig {
25 @Value("${spring.rabbitmq.host}")
26 public String host;
27 
28 @Value("${spring.rabbitmq.port}")
29 public int port;
30 
31 @Value("${spring.rabbitmq.username}")
32 public String username;
33 
34 @Value("${spring.rabbitmq.password}")
35 public String password;
36 
37 @Value("${spring.rabbitmq.virtual-host}")
38 public String virtualHost;
39 
40 @Bean
41 public ConnectionFactory getConnectionFactory(){
42 CachingConnectionFactory factory=new CachingConnectionFactory();
43 System.out.println(host);
44 factory.setHost(host);
45 factory.setPort(port);
46 factory.setUsername(username);
47 factory.setPassword(password);
48 factory.setVirtualHost(virtualHost);
49 return factory;
50 }
51 }
52 
53 @Controller
54 @RequestMapping("/producer")
55 public class ProducerController {
56 @Autowired
57 private RabbitTemplate template;
58 
59 @RequestMapping("/send")
60 public void send() {
61 for(int n=0;n<10;n++){ 
62 template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"Hello World! "
63 +String.valueOf(n),getCorrelationData());
64 }
65 }
66 
67 private CorrelationData getCorrelationData(){
68 return new CorrelationData(UUID.randomUUID().toString());
69 }
70 }

Customer 端

代码语言:javascript
复制
 1 @Configuration
 2 public class BindingConfig {
 3 //普通队列参数
 4 public final static String Queue_First="direct.queue.first";
 5 public final static String Exchange_Name="directExchange";
 6 public final static String Routing_Key_First="directKey1";
 7 //死信队列参数
 8 public final static String Queue_Dead="direct.queue.dead";
 9 public final static String Exchange_Dead="directDead";
 10 public final static String Routing_Key_Dead="directDeadKey";
 11 
 12 @Bean
 13 public Queue queueFirst(){
 14 Map<String, Object> args=new HashMap<String,Object>();
 15 //声明当前死信的 Exchange
 16 args.put("x-dead-letter-exchange", this.Exchange_Dead);
 17 //声明当前队列的死信路由key
 18 args.put("x-dead-letter-routing-key", this.Routing_Key_Dead);
 19 //把死信队列的参数绑定到当前队列中
 20 return QueueBuilder.durable(Queue_First).withArguments(args).build();
 21 }
 22 
 23 @Bean
 24 public DirectExchange directExchange(){
 25 return new DirectExchange(this.Exchange_Name);
 26 }
 27 
 28 @Bean
 29 public Binding bindingExchangeFirst(Queue queueFirst, DirectExchange directExchange){
 30 return BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First);
 31 }
 32 
 33 @Bean
 34 public Queue queueDead(){
 35 return new Queue(this.Queue_Dead);
 36 }
 37 
 38 @Bean
 39 public DirectExchange directExchangeDead(){
 40 return new DirectExchange(this.Exchange_Dead);
 41 }
 42 
 43 @Bean
 44 public Binding bindingExchangeDead(Queue queueDead,DirectExchange directExchangeDead){
 45 return BindingBuilder.bind(queueDead).to(directExchangeDead).with(this.Routing_Key_Dead);
 46 }
 47 }
 48 
 49 @Configuration
 50 public class ConnectionConfig {
 51 @Value("${spring.rabbitmq.host}")
 52 public String host;
 53 
 54 @Value("${spring.rabbitmq.port}")
 55 public int port;
 56 
 57 @Value("${spring.rabbitmq.username}")
 58 public String username;
 59 
 60 @Value("${spring.rabbitmq.password}")
 61 public String password;
 62 
 63 @Value("${spring.rabbitmq.virtual-host}")
 64 public String virtualHost;
 65 
 66 @Bean
 67 public ConnectionFactory getConnectionFactory(){
 68 CachingConnectionFactory factory=new CachingConnectionFactory();
 69 factory.setHost(host);
 70 factory.setPort(port);
 71 factory.setUsername(username);
 72 factory.setPassword(password);
 73 factory.setVirtualHost(virtualHost);
 74 return factory;
 75 }
 76 }
 77 
 78 @Configuration
 79 public class DirectMessListener {
 80 @Autowired
 81 private ConnectionConfig connectionConfig;
 82 @Autowired
 83 private RabbitTemplate template;
 84 private int index=0,normalIndex=0,deadIndex=0; 
 85 
 86 @Bean
 87 public DirectMessageListenerContainer messageContainer(){
 88 DirectMessageListenerContainer container=new DirectMessageListenerContainer();
 89 container.setConnectionFactory(connectionConfig.getConnectionFactory());
 90 // 设置每个队列的 consumer 数量
 91 container.setConsumersPerQueue(4);
 92 // 设置每个 consumer 每次的接收的消息数量
 93 container.setPrefetchCount(10);
 94 // 使用MANUAL手动确认
 95 container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
 96 // 监听队列
 97 container.addQueueNames(BindingConfig.Queue_First);
 98 container.addQueueNames(BindingConfig.Queue_Dead);
 99 container.setConsumerTagStrategy(queue -> "consumer"+(++index));
100 
101 container.setMessageListener(new ChannelAwareMessageListener(){
102 @Override
103 public void onMessage(Message message, com.rabbitmq.client.Channel channel) 
104 throws Exception {
105 MessageProperties prop=message.getMessageProperties();
106 if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_First)){
107 System.out.println("This is a normal queue! "+(++normalIndex));
108 //把当前的队列转送到死信队列中
109 channel.basicReject(prop.getDeliveryTag(), false); 
110 }
111 if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_Dead)){
112 System.out.println("This is a dead queue! "+(++deadIndex));
113 //模拟对死信队列处理
114 Thread.currentThread().sleep(5000);
115 .......
116 //处理完毕
117 channel.basicAck(prop.getDeliveryTag(), false);
118 }
119 
120 }
121 });
122 return container;
123 }
124 }

通过管理界面可以看,信息会先发送到 direct.queue.first,然后被放进死信队列作处理。

运行结果

添加描述

死信队列最常用的场景可以在订单支付,流程审批等环节。例如在 京*、淘* 等平台,当下单成功后,客户要在一定的时间内完成支付操作,否则订单被视作无效,这些业务流程就可以使用死信队列来处理。

七、持久化操作

RabbitMq 的持久化操作包含有 Queue 持久化、Message 持久化和 Exchange 持久化三类。

7.1 Queue 的持久化

队列持久化只需要在 Queue 的构造函数 public Queue(String name, boolean durable) 把 durable 参数置为 true 就可实现。如果队列不设置持久化( (durable 默认为 false), 那么在RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。

7.2 Message 持久化

设置了Queue 持久化以后,当 RabbitMQ 服务重启之后,队列依然存在,但消息已经消失,可见单单设置队列的持久化而不设置消息持久化显得毫无意义,所以通常列队持久化会与消息持久化共同使用。

在 RabbitMQ 原生态的框架下,需要把信息属性设置为 MessageProperties.PERSISTENT TEXT PLAIN 才会实现消息的持久化。

而在 Spring 框架下,由于在使用回调函数时需要把 Message 重新返回队列再进行处理,所以 Message 默认已经是持久化的。

7.3 Exchage 的持久化

交换器持久化可通过构造函数 public DirectExchange(String name, boolean durable, boolean autoDelete) 把 durable 参数置为 true 就可实现,而 autoDelete 则是指在所在消费者都解除订阅的情况下自动删除。如果交换器不设置持久化,那么在 RabbitMQ 服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是消息不再发送到该 Exchange 。对一个长期使用的交换器来说,持久化还是有其必要性的。

本章总结

RabbitMQ 发展至今,被越来越多的人认可,这和它在易用性、扩展性、可靠性和高可用性等方面的卓著表现是密不可分的。

相比于传统的 ActiveMQ 和分布式 Kafka,它具有自己独有的特点。

希望文章有帮于大家对 RabbitMQ 消息队列方面有更深入的了解,在不同的开发环境中灵活运用。

由于时间仓促,文章当中有不明确的地方或有错漏敬请点明。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ 版
消息队列 CMQ 版(TDMQ for CMQ,简称 TDMQ CMQ 版)是一款分布式高可用的消息队列服务,它能够提供可靠的,基于消息的异步通信机制,能够将分布式部署的不同应用(或同一应用的不同组件)中的信息传递,存储在可靠有效的 CMQ 队列中,防止消息丢失。TDMQ CMQ 版支持多进程同时读写,收发互不干扰,无需各应用或组件始终处于运行状态。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档