首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

RocketMQ篇6:吞吐量优先的使用场景

在Broker端进行消息过滤

1、使用Tag进行过滤

一个Message只能有一个Tag,Broker端可以在ConsumeQueue中过滤,从CommitLog里读取过滤后备命中的消息。

2、使用SQL表达式的方式进行过滤

通过putUserProperty函数来增加多个自定义的属性,通过自定义数据过过滤。

SQL表达式方式的过滤需要Broker先读出消息里的属性内容,然后做SQL计算,增加磁盘压力。

3、Filter Server方式过滤

Filter Server是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。

首先需要在启动Broker之前,配置文件中增加filterServerNums。在Broker启动的时候,本地会启动相应数量的Filter Server进程。

Filter Server相当于一个Consumer进程,它从本机Broker获取消息,然后根据用户的Java函数进行过滤,过滤后的消息在传给Consumer。

提高Consumer处理能力

1、提高消费并行度

同一个ConsumerGroup中(Clustering方式),可以通过增加Consumer实例的数量来提高并行度,通过启动多个Consumer进程增加Consumer实例数。总的Consumer数量不要超过Topic下Read Queu数量。

提高单个Consumer实例中并行处理的线程数,修改consumeThreadMin和consumerThreadMax。

2、以批量方式进行消费

设置consumer的consumeMessageBatchMaxSize,实现批量获取消息的长度

3、检测延时情况,跳过非重要信息

判断队列中消息堆积情况,就直接丢弃。

Consumer的负载均衡

1、DefaultMQPushConsumer的负载均衡

同一个ConsumerGroup里加入新的DefaultMQPushConsumer时,各个Consumer都会触发doRebalance动作。

默认的负载均衡算法是AllocateMessageQueueAveragely。负载均衡的结果与Topic的Message Queue数量,及ComsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只能到Message Queue。

如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消息。可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的Message Queue数设置为16。

其他的负载均衡还有AllocateMessageQueueAveragelyByCircle、AllocateMessageQueueByConfig、AllocateMessageQueueByMachineRoom、AllocateMessageQueueConsistentHash。

2、DefaultMQPullConsumer的负载均衡

Pull Consumer可以看到所有的Message Queu,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。

使用registerMessageQueueListener函数,在新的Consumer加入或退出时被触发。

使用MQPullConsumerScheduleService类

MQPullConsumerScheduleService源码

提高Producer的发送速度

1、Producer方面的调优

发送一条消息需要三步:一是客户端发送请求到服务器;二是服务器处理请求;三是服务器向客户端返回应答。

在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用,可以采用Oneway方式发送,Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。

另外可以增加Producer的并发量,使用多个Producer同时发送。RocketMQ引入了一个并发窗口,在窗口内消息可以并发的写入DirectMem中,然后异步地将连续一段的数据刷入文件系统中。

2、Linux操作系统层的调优

推荐使用EXT4文件系统,IO调度算法使用deadline算法。

deadline算法:实现四个队列,其中两个处理正常的read和write操作,另外两个处理超时的read和write操作。正常的read和write队列中,元素按扇区号排序,进行正常的IO合并处理以提高吞吐量。超时的read和write队列中,元素按请求创建时间排序,如果有超时的请求出现,就放进这两个队列,调度算法保证超时的队列中的IO请求会优先被处理。

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20200515A0RYG200?refer=cp_1026
  • 腾讯「腾讯云开发者社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券