本文来源于官方文档Consumer Priorities。
消费者优先级的机制:
正常情况下,所有订阅同一个队列的活跃消费者以循环的(round-robin)方式从队列中接收消息。当使用了消费者优先级,如果多个活跃消费者使用了相同的高优先级属性,那么消息投递也是以循环的方式进行(其实使用了相同的优先级类似于没有启用优先级)。
活跃的消费者就是可以在不用等待的情况下接收和处理消息的消费者,也就是消费者如果无法接收消息,那么它就是出于非活跃状态(或者说阻塞状态),阻塞的常见原因有:
basic.qos
之后,消费者在信道中未确认的预读取消息达到了上限。因此,对于每个存在的队列,必定至少出现下面三种情况的其中一种:
消费者可能在一秒内多次在活跃和阻塞状态之间切换,只要消费处理速度足够快。RabbitMQ不会通过Web管理插件或者**rabbitmqctl
**命令公开消费者当前是活跃还是阻塞状态,换言之,只能通过客户端感知。
启用消费者优先级的时候,RabbitMQ会优先投递消息到优先级属性比较高的消费者,但是如果所有优先级高的消费者都处于阻塞状态,RabbitMQ会把消息投递到活跃的优先级稍低的消费者,而不是一直等待优先级高的消费者解除阻塞,造成优先级低的消费者一直处于饥饿状态。
在使用**basic.consume
**方法可以设置参数**x-priority
**的值为整数,数字越大则优先级越高,未设置则使用默认值0。
public class ConsumerPriorityMain extends BaseChannelFactory {
public static void main(String[] args) throws Exception {
provideChannel(channel -> {
Map<String, Object> consumerArgs = new HashMap<>(8);
consumerArgs.put("x-priority", 10);
channel.basicConsume("throwable.queue.direct", true, consumerArgs, new DefaultConsumer(channel) {
});
consumerArgs.put("x-priority", 100);
channel.basicConsume("throwable.queue.direct", true, consumerArgs, new DefaultConsumer(channel) {
});
});
}
}
上面的例子设置了两个消费者,后者的优先级为100,而前者的优先级为10。
本文是Throwable的原创文章,转载请提前告知作者并且标明出处。 博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议 本文永久链接是:https://cloud.tencent.com/developer/article/1650081
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有