前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pulsar-8:40个分区的topic消息严重不均衡下个别partition无法被consumer消费

pulsar-8:40个分区的topic消息严重不均衡下个别partition无法被consumer消费

作者头像
千里行走
发布2022-04-06 11:20:18
1.2K0
发布2022-04-06 11:20:18
举报
文章被收录于专栏:千里行走

目录:

(1).pulsar版本&细节&架构

(2).问题&现象与使用陈述&脱敏代码

1.问题与现象

2.排查过程 (3).最终原因与解决

(1).pulsar版本&细节&架构

pulsar版本是2.8.0,部署在openjdk11上,具体版本号是:11.0.12。 在aws海外部署,使用机型是c5a.2xlarge(8c16g),一共是3台,每台部署一个broker、bookie、zk。启动命令的参数没有修改都是默认值。

部署详情与细节:

pulsar-7:aws上部署生产级别的5节点pulsar集群

(2).问题&现象与使用陈述&脱敏代码

1.问题与现象

40个分区的topic消息严重不均衡下个别partition无法被consumer消费。最近一次是有两个分区各自堆积到30万左右(backlog值)。

这个分区topic消息发送平均大小:

pulsar_rate_in:

pulsar_rate_out:spike的时间是我们发现不消费的时间,但有可能之前就已经有问题了,这么高是重启了consumer。

2.脱敏代码

代码语言:javascript
复制
package test;


import com.google.common.collect.Lists;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.redisson.client.codec.StringCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;


import org.apache.pulsar.client.api.Producer;


@Service
public class PushService implements IPushService {


        private final static Logger logger = LoggerFactory.getLogger(PushService.class);


        @Resource
        private IMqBackoffService mqBackoffService;


        @Resource(name = "XXXStatusProducer")
        private ProducerYYYStatusUUUProducer;


        @Override
        public void pushZZZPPPRRRStatus(Long userId, ListRRRList) {
                String content = GsonUtil.beanToJsonString(RRRList);
                try {
                        //使用的是异步,且批量的发送方式,按照key做sharding发送到topic不同的partition.
                        YYYStatusUUUProducer.newMessage().key(String.valueOf(userId))
                                        .value(content.getBytes(StandardCharsets.UTF_8)).sendAsync().exceptionally((e -> {
                                                logger.error("send sync ZZZ PPP RRR status error,content:{}", content, e);
                                                // 如果发送失败,会将异步发送失败的消息存到aws的aurora数据库,使用数据库的本地事务(shardingjdbc4.1.1),事务使用的注解方式.
                                                mqBackoffService.saveYYYStatus(RRRList);
                                                return null;
                                        }));


                } catch (Exception e) {
                        logger.error("send ZZZ PPP RRR status error,content:{}", content, e);
                        mqBackoffService.saveYYYStatus(RRRList);
                }
        }


}
代码语言:javascript
复制
consumer使用的是key-sharding方式消费,脱敏代码:
代码语言:javascript
复制
package test;


import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.List;


import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;


@Component
public class XXXResultMessageListener implements MessageListener{


        private static final Logger log = LoggerFactory.getLogger(XXXResultMessageListener.class);


        private final IoooService oooService;


        public XXXResultMessageListener(IoooService oooService) {
                this.oooService = oooService;
        }


        @SneakyThrows
        @LogTid
        @Override
        public void received(Consumerconsumer, Messagemsg) {
                String body = new String(msg.getValue());
                try {
                        if (StrUtil.isBlank(body)) {
                                log.warn("YYY result received msg value is null messageId:{}", msg.getMessageId());
                                consumer.acknowledge(msg.getMessageId());
                                return;
                        }
                        log.info("YYY result,topicName:{},message:{}", msg.getTopicName(), body);
                        ListTTTUUUStatusRespList = JSONUtil.toList(body, TTTUUUStatusResp.class);
                        if (CollUtil.isNotEmpty(TTTUUUStatusRespList)) {
                                TTTUUUStatusResp TTTUUUStatusResp = TTTUUUStatusRespList.get(0);
                                if (UserTypeEnum.PPP.getCode().equals(TTTUUUStatusResp.getUserType())) {
                                        // 只是设置标志,是否打印日志.
                                        ThreadFilter.setPrintFlag(false);
                                }
                                // 同步处理,等处理完成后再获取消息,涉及到的aurora数据库的本地事务,使用的是shardingjdbc4.1.1,事务都是用的注解方式.
                                oooService.vvvBatchReceived(TTTUUUStatusRespList);
                        }
                        consumer.acknowledge(msg.getMessageId());
                } catch (Exception e) {
                        if (e instanceof BaseRuntimeException) {
                                // BaseRuntimeException是业务异常,业务规定不用处理,直接返回ack.
                                consumer.acknowledge(msg.getMessageId());
                        } else {
                                log.error("YYY result Failed to process message error,data is {},e:{}", body, e.getStackTrace(), e);
                                // 如果业务处理失败,告知pulsar重新消费.
                                consumer.negativeAcknowledge(msg);
                        }
                        throw e;
                } finally {
                        ThreadFilter.removePrintFlag();
                }
        }
}

3.问题排查

producer使用的是批量发送(平均8条左右是一批,最多40条是一批),并且是异步发送,使用key-sharding的方式发送到这个topic的不同分区。

这个40个分区的topic,现在是有2个分区啊严重堆积,各堆积了30万不消费。但是重启broker后会消费,但是当消费万1~2万消息后又不消费了,再次重启又消费一点然后停止,反复如此。而且我们一直给的压力都很大,压极限,之前也经常出现处理能力不足造成的几十万消息堆积,但是处理能力上来后都可以平滑的消费万,但是11.12号突然出现消费不了的情况。

在问题分区上查看了pulsar-broker的所有线程,没有死锁。

经过不断查、重启,现在连个分区还各余不到10万堆积。

arthas看堆积的broker节点,也没有啥异常:

我查阅了2.8.1release的fix list,注意到了2.8.1的这个修复: [broker] Fix issue where Key_Shared consumers could get stuck #10920 #10920

然后我同样看了下问题分区的stats,和上述issue中的stats做对比,不是同一个现象,但确实有些问题: a1.consumers竟然是空的,也就是没有消费者,那肯定没有办法消费了吧?在issue#10920中consumers是有消费者的。 a2.subscriptions下的msgBacklog和msgBacklogNoDelayed有值且相等且正确,但是backlogSize确实0,这个貌似不对。

顺着思路我查看了这个topic其他正常的分区,正常分区的stats是: 1.consumers有消费者实例,正确。 2.subscriptions下的msgBacklog和msgBacklogNoDelayed,backlogSize有值且相等且正确,都是0。

也就是说,在这种场景下,somehow的未知原因会导致严重堆积/不均衡的分区会丢失自己的consumer么。

接着查,发现是因为消费者的jvm假死了,消费者是两台机器(8c16g),都假死了,但是奇怪的是第一台机器的所关联的分区还在(topics stats可以看到),第二台机器所关联的分区不在了(topics stats看到的consuemrs为空)。

假死原因是因为垃圾回收用的是G1,之前用ZGC的时候,即使积压了几十万,从来没死过,过会就缓过来了。

现在我把consumer重启(垃圾回收还是G1),问题分区的topic stats:然后跑一跑又停了(消费者hang住了),但是topic stats中的consuemrs还在。

果然如所猜测的一致,当消费者由于GC假死后,分区的topic stats中的consumers为空,pulsar-broker应该是通过心跳判断consuemr已经断开了。

目前从重现&各种现象下的分析结果是:消费者如果使用的是G1(已经做调优),并且在海量消息段时间涌入时,会出现假死,然后从pulsar-broker上断开。导致无法消费。

再切ZGC试试,发现ZGC也不行,经查是因为每批发送40个消息,之前用ZGC的时候是每批发送10个消息。

如下图,是consumer刚恢复时pulsar给consumer吐出的速率,但是一个消费者的TPS是600多(业务重),肯定是无法消费过来的,但还接受海量的消息,会不会把内存中的queue撑爆了?

我的问题是:为啥我的消费能力只有600多,但是pulsar却给我这么海量的消息,我并没有取这么多消息啊。

目前阶段定位是: pulsar-consumer默认使用的是push方式,大量积压后,消费者重启时,pulsar-broker会推送海量消息到consumer,直接把consuemr内存打爆。

(3).最终原因与解决

key_shard模式下,pulsar-client会用一个receiverQueue不断接收pulsar-broker推送过来的消息,receiverQueueSize只能限制这个queue的大小,但这个限制没用,只能让服务慢点死。因为每个key_shard都在client分配了一个singleThreadPool来处理,而这个singleThreadPool是一个无界队列,receiverQueue不断接收到消息后转发到key_shard的threadPool中的无界队列后,receiverQueue继续从pulsar-broker接收推送来的消息,直到把所有的key_shard的threadpool的无界队列打到最大极限,把服务吃死。

这个问题只有两种解决方式: 1.增加consumer的消费节点,和每个消费节点的消费能力。但是我们的业务场景是不可能的,因为处理速度是恒定的,我不可能无限加节点去解决这个问题。如果还要用这个方式,当出现这个问题后,只能临时加节点结合不断重启来解决,这样做也很奇葩。 2.改用同步pull的方式去消费,即当本地线程池处理完消息后再到receiverQueue中拿消息,这样就不会把本地线程池的queue打满。肯定没有问题。对于我们的业务来说,这个是正确选择。本质上还是对pulsar的使用不当造成的,用其他的消息队列用push的话也会这样大量堆积。

最后再次简述原因:

pulsar-client会用一个receiverQueue不断接收pulsar-broker推送过来的消息,结果本地处理这些消息的线程池/本地队列也是异步的去从receiverQueue中拿消息,且本地队列没有限制队列长度,然后直接打满本地队列。

我提交的相关issue地址:

https://github.com/apache/pulsar/issues/12800

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-02-17,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 千里行走 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
消息队列 CMQ
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档