前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >RabbitMQ扩展之消费者消息预读取

RabbitMQ扩展之消费者消息预读取

作者头像
Throwable
发布2020-06-23 16:21:22
1.4K0
发布2020-06-23 16:21:22
举报

前提

本文来源于官方文档Consumer Prefetch

消费者消息预读取

消费者消息预读取是一个更加合理和高效的限制未确认消息数量的解决方式。

AMQP 0-9-1协议中定义了basic.qos方法用于限制信道或者连接上的未确认消息数量,这个消息数据量命名为prefetch_count。不幸的是,信道其实并不是限制未确认消息数量的理想范畴,因为单个信道有可能有多个消费者订阅多个不同的队列,所以信道和队列需要为发送的每个消息相互协调,以确保消息总数量不超过限制,造成了性能下降,单机性能出现瓶颈,在集群方案中耗时更加严重。

basic.qos定义了两个属性:

  • prefetch_count:预读取消息的数量。
  • global:是否全局的。

在许多情况下,指定每个消费者的预读取消息数量更加合理。因此,RabbitMQ在basic.qos方法中重新定义了global标志的含义:

global的值

prefetch_count在AMQP 0-9-1中的含义

prefetch_count在RabbitMQ中的含义

false

同一个信道上的消费者共享

单独应用于信道上的每个新消费者

true

所有消费者基于同一个连接共享

同一个信道上的消费者共享

basic.qos方法在RabbitMQ的Java驱动中对应三个方法:

void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;

// prefetchSize = 0
void basicQos(int prefetchCount, boolean global) throws IOException;

// prefetchSize = 0 , global = false
void basicQos(int prefetchCount) throws IOException;
  • prefetchSize:预读取的消息内容大小上限(包含),可以简单理解为消息有效载荷字节数组的最大长度限制,0表示无上限。
  • prefetchCount:预读取的消息数量上限,0表示无上限。
  • global:false表示prefetchCount单独应用于信道上的每个新消费者,true表示prefetchCount在同一个信道上的消费者共享。

限制单个消费者

public class BasicQosSingle extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			channel.basicQos(10); //基于消费者进行限制
			channel.basicConsume("throwable.queue.direct",new DefaultConsumer(channel){});
		});
	}
}

此消费者最多只能有10条预读取的未确认的消息。

独立限制多个消费者

基于同一个信道对多个队列建立不同的消费者:

public class BasicQosMulti extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			DefaultConsumer consumer1 = new DefaultConsumer(channel) {};
			DefaultConsumer consumer2 = new DefaultConsumer(channel) {};
			channel.basicQos(10); //基于消费者进行限制
			channel.basicConsume("throwable.queue.direct",consumer1);
			channel.basicConsume("throwable.queue.fanout",consumer2);
		});
	}
}

每个费者最多只能有10条预读取的未确认的消息。

基于共享限制多个消费者

AMQP规范没有解释如果使用不同的global多次调用basic.qos会发生什么,RabbitMQ将此解释为意味着两个预取限制应该彼此独立地强制执行。

public class BasicQosShare extends BaseChannelFactory {

	public static void main(String[] args) throws Exception {
		provideChannel(channel -> {
			DefaultConsumer consumer1 = new DefaultConsumer(channel) {};
			DefaultConsumer consumer2 = new DefaultConsumer(channel) {};
			channel.basicQos(10, false); //基于消费者进行限制
			channel.basicQos(15, true); //基于信道进行限制
			channel.basicConsume("throwable.queue.direct",consumer1);
			channel.basicConsume("throwable.queue.fanout",consumer2);
		});
	}
}

上面的代码表示:

  • 两个消费者consumer1和consumer2基于信道最多只能有15条未确认的预读取消息。
  • 消费者consumer1和consumer2自身最多只能有10条未确认的预读取消息。

也就是有双重限制,这种限制需要信道和队列之间协调,会耗费额外的性能。

消息预读取的意义

消息预读取可以理解为RabbitMQ Broker把未确认的消息批量推送到RabbitMQ的Java客户端中,由客户端先缓存这些消息,然后投递到消费者中。试想,如果在推模式下,没有消息预读取功能,RabbitMQ Broker每次投递一条消息到客户端消费者中,这样就会产生大量的IO操作,导致性能下降,此外,消费者处理速度有可能比较快,容易产生消费者饥饿的情况。可以根据消费者实际的消费速度和消息发布的速度,对消费者的预读取未确认消息的上限进行配置,这样在大多数场景下可以提高消费者的性能。

本文是Throwable的原创文章,转载请提前告知作者并且标明出处。 博客内容遵循 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 协议 本文永久链接是:https://cloud.tencent.com/developer/article/1650067

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2018年11月28日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前提
  • 消费者消息预读取
    • 限制单个消费者
      • 独立限制多个消费者
        • 基于共享限制多个消费者
        • 消息预读取的意义
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档