在Broker端进行消息过滤
1、使用Tag进行过滤
一个Message只能有一个Tag,Broker端可以在ConsumeQueue中过滤,从CommitLog里读取过滤后备命中的消息。
2、使用SQL表达式的方式进行过滤
通过putUserProperty函数来增加多个自定义的属性,通过自定义数据过过滤。
SQL表达式方式的过滤需要Broker先读出消息里的属性内容,然后做SQL计算,增加磁盘压力。
3、Filter Server方式过滤
Filter Server是一种比SQL表达式更灵活的过滤方式,允许用户自定义Java函数,根据Java函数的逻辑对消息进行过滤。
首先需要在启动Broker之前,配置文件中增加filterServerNums。在Broker启动的时候,本地会启动相应数量的Filter Server进程。
Filter Server相当于一个Consumer进程,它从本机Broker获取消息,然后根据用户的Java函数进行过滤,过滤后的消息在传给Consumer。
提高Consumer处理能力
1、提高消费并行度
同一个ConsumerGroup中(Clustering方式),可以通过增加Consumer实例的数量来提高并行度,通过启动多个Consumer进程增加Consumer实例数。总的Consumer数量不要超过Topic下Read Queu数量。
提高单个Consumer实例中并行处理的线程数,修改consumeThreadMin和consumerThreadMax。
2、以批量方式进行消费
设置consumer的consumeMessageBatchMaxSize,实现批量获取消息的长度
3、检测延时情况,跳过非重要信息
判断队列中消息堆积情况,就直接丢弃。
Consumer的负载均衡
1、DefaultMQPushConsumer的负载均衡
同一个ConsumerGroup里加入新的DefaultMQPushConsumer时,各个Consumer都会触发doRebalance动作。
默认的负载均衡算法是AllocateMessageQueueAveragely。负载均衡的结果与Topic的Message Queue数量,及ComsumerGroup里的Consumer的数量有关。负载均衡的分配粒度只能到Message Queue。
如果创建Topic的时候,把Message Queue数设为3,当Consumer数量为2的时候,有一个Consumer需要处理Topic三分之二的消息,另一个处理三分之一的消息;当Consumer数量为4的时候,有一个Consumer无法收到消息,其他3个Consumer各处理Topic三分之一的消息。可见Message Queue数量设置过小不利于做负载均衡,通常情况下,应把一个Topic的Message Queue数设置为16。
其他的负载均衡还有AllocateMessageQueueAveragelyByCircle、AllocateMessageQueueByConfig、AllocateMessageQueueByMachineRoom、AllocateMessageQueueConsistentHash。
2、DefaultMQPullConsumer的负载均衡
Pull Consumer可以看到所有的Message Queu,而且从哪个Message Queue读取消息,读消息时的Offset都由使用者控制,使用者可以实现任何特殊方式的负载均衡。
使用registerMessageQueueListener函数,在新的Consumer加入或退出时被触发。
使用MQPullConsumerScheduleService类
MQPullConsumerScheduleService源码
提高Producer的发送速度
1、Producer方面的调优
发送一条消息需要三步:一是客户端发送请求到服务器;二是服务器处理请求;三是服务器向客户端返回应答。
在一些对速度要求高,但是可靠性要求不高的场景下,比如日志收集类应用,可以采用Oneway方式发送,Oneway方式只发送请求不等待应答,即将数据写入客户端的Socket缓冲区就返回,不等待对方返回结果。
另外可以增加Producer的并发量,使用多个Producer同时发送。RocketMQ引入了一个并发窗口,在窗口内消息可以并发的写入DirectMem中,然后异步地将连续一段的数据刷入文件系统中。
2、Linux操作系统层的调优
推荐使用EXT4文件系统,IO调度算法使用deadline算法。
deadline算法:实现四个队列,其中两个处理正常的read和write操作,另外两个处理超时的read和write操作。正常的read和write队列中,元素按扇区号排序,进行正常的IO合并处理以提高吞吐量。超时的read和write队列中,元素按请求创建时间排序,如果有超时的请求出现,就放进这两个队列,调度算法保证超时的队列中的IO请求会优先被处理。
领取专属 10元无门槛券
私享最新 技术干货