本文介绍RabbitMq几个重要的概念。分别是优先级队列、消息顺序性、消息分发、持久化。
目录
前言
正文
优先级队列
消息顺序性
消息分发
持久化
顾名思义,优先级高的具备优先消费的特权。
设置方式是在声明队列的时候设置参数:x-max-priority,代表最大优先级,如果参数设置10,如代码所示:
@Bean
public Queue priorityQueue() {
Map<String, Object> map = new HashMap<>();
//给当前队列配置最大优先级
map.put("x-max-priority", 10);
return new Queue(PRIORITY_QUEUE, true, false, false, map);
}
那么则表示在该队列存在11个级别,从高到低是 10,9,……0。
并且需要在发送消息的时候设置当前消息的优先级,如代码所示:
amqpTemplate.convertAndSend("priority_exchange", "", order, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(finalI);
return message;
}
});
需要注意的是,需要在生产者的速度大于消费者的速度,且Broker中有消息堆积的情况下,设置队列及消息优先级才有意义,否则,生产者刚生产一条就被消费者消费了,消息在Broker没有堆积,何谈优先级。
RabbitMq没有属性设置消息的顺序性,所以在没有前提的情况下说RabbitMq消息的消费具有顺序性是错误的,理想情况下,没有重复消费前提下,生产者发送 msg1、msg2、msg3,那么消费者消费顺序也是msg1、msg2、msg3。
但是这种情况毕竟是理想的,而这种理想情况在实际中很容易会被打破,例如消息丢失,网络原因,异常发生,而且也是在一个生产者和一个消费这的情况,如果多个生产者的话,真的就无法保证哪个消息先到达Broker,也就不能保证顺序。
例举一下情况,打破消费的顺序性。
生产者使用了事务,且触发了回滚,重新补发消息后,顺序可能是错乱的。
开启publisher confirm后出现超时、中断、拒绝、nack命令等,重新补发消息后,顺序可能是错乱的。
RocketMq有多个消费者的时候,队列会以轮询的方式分发给多个消费者。
这里有一个很重要的参数 channel.basicQos(),该方法是允许信道上消费者最大未确认消息数量。他是针对信道而言的,一个连接可以有多个信道,一个信道可以有多个队列。
channel.basicQos()参数只适用于推模式的消费方式。
举个例子,channel.basicQos(5),代表该信道上的其中一个消费者未确认数量达到5后,RabbitMq就不会向这个消费者在发送任何消息,直到该消费者确认了一个消息后计数器减1,之后才可以继续接收消息。
该参数有3个重载方法:
void basicQos(int var1, int var2, boolean var3) throws IOException;
void basicQos(int var1, boolean var2) throws IOException;
void basicQos(int var1) throws IOException;
我们上面说的那个数字就是该方法的第一个参数 var1,在他的实现类 AutorecoveringChannel 里参数名叫 prefetchCount,
如果使用 basicQos(int var1),var1代表消费者所能接收未确认消息总数,写0 代表没有上限。
如果使用 basicQos(int var1, int var2, boolean var3),在他的实现类里实现如下,仔细看他的参数顺序。
public void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException {
if (global) {
this.prefetchCountGlobal = prefetchCount;
} else {
this.prefetchCountConsumer = prefetchCount;
}
this.delegate.basicQos(prefetchSize, prefetchCount, global);
}
prefetchSize参数是消费者所能接收未确认消息的总大小,单位是B,同样,设置0表示无上限。
global参数含义如下
global | Meaning of prefetch_count in AMQP 0-9-1 | Meaning of prefetch_count in RabbitMQ |
---|---|---|
false | 信道上的消息都要遵循prefetchCount值 | 信道上新的消费者需要遵循prefetchCount值 |
true | 当前连接上所有消费者都要遵循prefetchCount值 | 信道上的消息都要遵循prefetchCount值 |
那么我们怎么使用呢?
channel.basicQos(10);
channel.basicConsume("my-queue1",false,consumer1);
channel.basicConsume("my-queue2",false,consumer2);
这样写是说my-queue1和my-queue2两个队列里最大的未确认消息总是分别都是10.
再来一种复杂的写法
channel.basicQos(3,false);
channel.basicQos(5,true);
channel.basicConsume("my-queue1",false,consumer1);
channel.basicConsume("my-queue2",false,consumer2);
含义是:每个消费者最多接收3个未确认的消息,当前这个信道最大可接收5个未确认的消息。
设置为true 指的是同一个连接范围内所有信道上未确认数量之和。
设置为false指的是每个信道上每个消费者最大未确认数量。
RabbitMq计算这些数量也是需要占用性能的,所以我们一般指定的false,默认也是false。
关于消息分发的概念理解起来还是比较复杂的,该功能也是在特殊需要时才会使用,一般的我们没必要设置这个参数,而且目前我没有找到Spring集成的模版中直接可以操作这个参数,所以只能用原始的方式在channel 声明。
持久化概念是一个比较重要也是比较好理解的,而且我们在使用的时候也会经常碰到。
首先,持久化的意思跟其他中间件的持久化概念基本相同,都是保证数据的可靠性,防止丢失的操作。
在RabbitMq中存在三个部分的持久化,分别是:交换器持久化、队列持久化、消息持久化。
无论是那个阶段的持久化,参数都是 durable ,设置 true代表持久化,false不持久化
交换器持久化
在使用的时候我们无需指定,因为构造方法里面默认给了true
@Bean
FanoutExchange goodsChangeExchange() {
return new FanoutExchange("GOODS");// 配置广播路由器
}
public AbstractExchange(String name) {
this(name, true, false);
}
队列持久化
队列和消息的持久化,如果只设置其中一个是没有意义的,因为消息存在队列里,如果消息设置持久化,队列没有,那么队列丢失,消息也会丢失,有句话说,毛长在皮上,那皮都没有了毛还能有吗。反过来,设置了队列持久化同样也不能保证消息不能丢失。因为,两者必须同时存在才有意义,当然设置了持久化后也会消耗性能的。
这是Queue.class 部分代码,我们不设置持久化,默认是持久化的,所以我们不用指定也是可以的。
public Queue(String name) {
this(name, true, false, false);
}
消息持久化
消息持久化有两种写法,分别对应的枚举类的两个方法。
amqpTemplate.convertAndSend("priority_exchange", "", order, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(finalI);
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.fromInt(2));
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
-----------------------------------------------
public enum MessageDeliveryMode {
NON_PERSISTENT,
PERSISTENT;
private MessageDeliveryMode() {
}
public static int toInt(MessageDeliveryMode mode) {
switch(mode) {
case NON_PERSISTENT:
return 1;
case PERSISTENT:
return 2;
default:
return -1;
}
}
public static MessageDeliveryMode fromInt(int modeAsNumber) {
switch(modeAsNumber) {
case 1:
return NON_PERSISTENT;
case 2:
return PERSISTENT;
default:
return null;
}
}
}
同样,我们也不需要设置消息的持久化,因为默认就是持久化的,大家自行搜索这个类 MessageProperties.class
我截取了部分代码。
public static final MessageDeliveryMode DEFAULT_DELIVERY_MODE;
public MessageProperties() {
this.deliveryMode = DEFAULT_DELIVERY_MODE;
this.priority = DEFAULT_PRIORITY;
}
static {
DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
DEFAULT_PRIORITY = Integer.valueOf(0);
}
以上我们把交换器、队列、消息都设置了持久化,那么就能保证消息百分之百不丢失吗?答案是否定的。
在这里透露一下,下一节我们就会介绍比持久化可靠性更高的方式来保证消息不被丢失。
可以提示一下,一种是事务,一种是生产者确认方式。
想看更多内容请关注!
代码地址:https://github.com/362460453/rabbitMQ-demo