消息生产者如果向交换机发送了一个无法被路由到任何队列上的消息,那么此时交换机会判断消息的mandatory属性值:
//声明一个直连交换机--向test交换机发送一条消息,路由key为123,此时我们有没有提供对应的队列绑定的路由值123
//将mandatory参数设置为true
channel.exchangeDeclare("test", BuiltinExchangeType.DIRECT,true,false,null);
//生产者提供一个消息回退接口,当前出现当前情况下,会调用该接口,处理发送失败的方法
channel.addReturnListener(new RouteFailListener());
channel.basicPublish("test","123",true,null,"你好".getBytes(StandardCharsets.UTF_8));
消费端程序调用了 channel.basicQos(5) ,之后订阅了某个队列进行消费。 RabbitMq 会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,计数达到5后,那么RabbitMQ就不会向这个消费者再发消息。消费者确认了某条消息处理完后,RabbitMQ 将相应的计数减1之后消费者可以继续接收消息,直到再次到达计数上限。这种机制可以类比于 TCP IP中的"滑动窗口"
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-D7J0T7vk-1676088488565)(C:/Users/zdh/AppData/Roaming/Typora/typora-user-images/image-20230210153005421.png)]
//第一个参数:确认哪一个消息
//第二个参数:是否开启消息批量应答
channel.basicAck(envelope.getDeliveryTag(),false);
//第一个参数:拒绝哪一个消息
//第二个参数:是否将拒绝的消息重新入队
channel.basicReject(envelope.getDeliveryTag(),true);
//第一个参数:拒绝哪一个消息
//第二个参数:是否批量拒绝
//第三个参数:是否将拒绝的消息重新入队
//basic.nack 方法可以一次拒绝或重新排队多条消息。这就是它与 basic.reject 的区别。
channel.basicNack(envelope.getDeliveryTag(),true,true);
注意:
rabbitmq消息持久化前,需要先将对应的队列先进行持久化,然后在发布消息时,将消息标记为持久化。
channel.basicPublish("",QUEUE_NAME,true,
//消息添加持久化属性
MessageProperties.PERSISTENT_TEXT_PLAIN,("序号"+i).getBytes(StandardCharsets.UTF_8));
生产者一旦将信道设置为confirm模式,所有在该信道上发布的消息都会被指派一个唯一的ID,一旦消息被成功投递到所有匹配的队列后,broker就会发送一个确认给生产者(包含消息的唯一ID),此时生产者就知道消息已经成功到达目的队列了。
如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘后再发出。
confirm模式本身是异步的,一旦发送一条消息,生产者应用程序就可以在等待信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者便可以通过回调方法处理该确认消息。如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
Rabbitmq提供三种发布确认实现方式:
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl",1000);
channel.queueDeclare(Q2_QUEUE,false,false,false,arguments);
由于整个队列中消息的过期时间是一致的,所以过期的消息势必出现在队列头部,那么每次只需要判断队列头部消息是是否过期即可,如果过期就丢弃或者死信。
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("60000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);
由于Rabbitmq只会通过每次判断队列头部消息是否过期,进行丢弃或死信,因此如果基于消息粒度设置过期时间,那么队列中靠前的消息未必是最早过期的, 那么已经过期的消息所持有的资源就不会被释放,直到过期消息来到了队列头部。
同时指定单个消息TTL和单队列TTL情况下,取较小者。
死信来源:
队列过期不会对其中的消息进行死信
死信怎么处理:
我们通过设置队列的x-dead-letter-exchange属性,将某个交换机设置为绑定到当前队列上的死信交换机,当出现死信消息时,就交给死信交换机处理:
//声明死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE,DIRECT,false,true,null);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,true,null);
//绑定死信交换机和死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,DEAD_KEY);
//普通队列属性设置
HashMap<String, Object> arguments = new HashMap<>();
//设置当前普通队列关联的死信交换机
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信RoutingKey
arguments.put("x-dead-letter-routing-key",DEAD_KEY);
//设置队列中消息的存活时间--5s
arguments.put("x-message-ttl",5000);
//声明普通交换机
channel.exchangeDeclare(EXCHANGE_NAME,DIRECT,false,true,null);
//声明普通队列
channel.queueDeclare(QUEUE_NAME,false,false,true,arguments);
//绑定普通交换机和普通队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
优先级队列中会对队列中的消息按照优先级进行排序,但是为了支持消息在队列中按照优先级排序,需要付出相应的内存,磁盘和CPU成本。
如果想要优先级队列有机会对队列中的消息进行排队,通常需要配合消费端在手动确认模式下采用basic.qos方法,每次预取指定数量消息,从而给消息在队列中停留提供时间。
前面在设置死信队列时我们提到,可以为队列设置死信交换机来存储那些处理失败的消息,可是这些不可路由消息根本没有机会进入到队列,因此无法使用死信队列来保存消息。
在RabbitMQ.中,有一种备份交换机的机制存在,可以很好的应对这个问题。
备份交换机可以理解为 RabbitMQ中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为Fanout,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列了。
如果我们没有给某个交换机设置关联的备份交换机,那么会判断交换机对应mandatory参数是否被设置为true,如果为true,会尝试调用生产者提供的消息回退接口。
Rabbitmq在3.6.0版本中引入的惰性队列会将队列中的消息存入磁盘,当消费者消费到对应消息时,才会将消息从内存中加载出来。
当消费者由于各种原因下线,长时间无法消费消息造成消息队列中消息堆积时,惰性队列就很有必要了。
默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。
思路1: 利用单队列消息TTL,在队列粒度,指定队列中消息的过期时间,由于队列中靠头部的消息一定是越早过期的,所以不用担心消息不会按时死亡。但是缺点时,单个队列只能处理过期时间一样的消息,每增加一个新的时间需求,都需要新增一个队列。
思路2: 利用rabbitmq提供的延迟交换机插件, 此时我们就可以基于消息粒度指定消息TTL了,延迟交换机拿到这些消息后,不会立刻将其路由到某个队列,而是先保存起来,然后等待消息的延迟时间结束后,再将消息发送到指定的队列中去。
延迟交换机的劣势:
1.将消息持久化到磁盘保存,性能偏低
2.只发送一次消息,存在消息发送失败的可能,并且不支持mandatory属性
利用上面已经提供的关于Rabbitmq相关问题的解决方案,我们来综合利用解决下面场景中存在的问题:
优势: 通常使用消息队列完成异步处理;各个微服务通过消息总线进行通信,完成应用解耦;利用消息队列缓存用户请求,完成流量削锋。
缺点: 系统可用性降低,因为需要保证消息队列服务的可用性。系统复杂度提高,引入消息队列中需要考虑数据一致性问题和消息幂等性问题。一致性问题,ABCD四个系统基于消息队列总线进行通信,如果A发布消息到消息总线,BCD三个系统系统中BD写库成功,C失败了,咋整?
什么是消息顺序性问题?
如何解决:
setnx具有天然的互斥性,如果key已经存在那么设置失败,返回0
在业务中也是同样的处理思路:
这个问题需要拆分为三个子问题进行分析:
开启生产端的发布确认模式,即将生产方的信道设置为confirm模式,所有在该信道内发布的消息都会被指派一个唯一ID。
如何消息被成功投送到指定交换机,那么broker会给生产者发送一个ack确认消息。如何rabbitmq发生内部错误导致消息丢失,broker会给生产者发送一个nack消息。
如果开启了发布确认的异步模式,那么上述两种场景会分别回调生产者的ack和nack回调接口,生产者可以在nack回调接口中决定是否重新发送消息。
如果设置了消息持久化属性,那么消息会在持久化到硬盘后,再发送ack响应。
开启消费者端的手动应答机制,每条消息必须等待消费者成功发送ack响应到broker时,broker才会把消息从消息队列中删除。
1.声明队列时,将durable参数设置为true,表明当前队列是一个持久化队列 2.发送消息时,将deliverMode设置为2,表示当前消息是一个持久化消息。
实现RabbitMQ的高可用集群,一般在并发和数据量不高的情况下,这种模式非常的好且简单。主备模式也称为Warren模式
镜像模式:集群模式非常经典的就是Mirror镜像模式,保证100%数据不丢失,在实际工作中用的最多的。并且实现集群非常的简单,一般互联网大厂都会构建这种镜像集群模式。
Mirror镜像队列,目的是为了保证rabbitmq数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲是2-3个实现数据同步(对于100%数据可靠性解决方案一般是3个节点)集群架构如下: