首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka消费组信息采集异常(hang住)排查

kafka消费组信息采集异常(hang住)排查

原创
作者头像
皮皮熊
修改2022-02-28 20:19:48
2.4K0
修改2022-02-28 20:19:48
举报

一、问题描述

小组同学在使用kafka官方工具kafka-consumer-groups.sh批量导集群消费组详情时,发现某一个集群基于broker的某些消费组会出现异常,主要表现:

  • 结果不全: 只有部分分区的信息
  • 进程会阻塞: 不会像导他消费组时那样,执行完自动退出
image.png
image.png

二、问题分析

1、信息梳理

兵马未动,粮草先行。在分析问题前我们需要具体梳理一下潜在的线索:

  • 集群的版本是0.9.0.1的bug?(古老的版本,最近半年会裁撤掉)但目前0.9.0.1版本集群中只有这个出现这样的问题
  • 集群部署不规范,broker和zk端口不统一的问题?但之前也有类似部署不规范的集群,没有出现过这样的问题
  • 集群机器有异常?
  • broker消费特殊场景下的bug?此次异常的消费组大多同时消费2个topic: 一个是日常三副本的topic,一个是离线补录的二副本的topic,确实存在bug的可能性。
  • kafka-consumer-groups.sh特殊场景下的Bug?

2、机器问题排查

1) strace相关进程,发现进程确实阻塞住了

2) 查看/proc/pid/stack看则有下面的堆栈输出

$ cat /proc/12097/stack

[<ffffffff81097b6b>] futex_wait_queue_me+0xdb/0x140
[<ffffffff81097e46>] futex_wait+0x166/0x250
[<ffffffff81099f1e>] do_futex+0xde/0x570
[<ffffffff8109a421>] SyS_futex+0x71/0x150
[<ffffffff81b2a202>] system_call_fastpath+0x16/0x1b
[<ffffffffffffffff>] 0xffffffffffffffff

3) 查看机器版本

与现网其他机器相比,没有太大的差异,机器层面相关性可能不是很大。

3、__consumer_offsetstopic排查

基于broker消费的消费组,其偏移量的元数据信息是存储在__consumer_offsets这个topic下的。笔者之前在《kafka部分group无法正常消费数据排查》一文中曾介绍过因__consumer_offsets问题导致group异常的情况,所以便查看了一下__consumer_offsets的情况,发现一切正常:

image.png
image.png

4、进程阻塞原因排查

几经周折,没有发现什么进展,还是决定回到kafka-consumer-groups.sh本身,从查看进程堵塞原因出发。此时就需要我们jstack查看一下进程内诸线程的情况,我们发现:

image.png
image.png

进程阻塞在获取某个分区的HW(HighWatermark)上(注意:LEO对消费者是不可见的,所以这里虽然调用的方法是getLogEndOffset,但实际上是获取HW),这时我们就要从源码中进行深入的分析。

三、源码分析

kafka-consumer-groups.sh获取基于broker消费組信息,即调用kafka.admin.ConsumerGroupCommandKafkaConsumerGroupService.describeGroup。相关实现如下:

1、KafkaConsumerGroupService

class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {

  private val adminClient = createAdminClient()

  // `consumer` is only needed for `describe`, so we instantiate it lazily
  private var consumer: KafkaConsumer[String, String] = null

  def list() {
    adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
  }

  protected def describeGroup(group: String) {
    val consumerSummaries = adminClient.describeConsumerGroup(group)
    if (consumerSummaries.isEmpty)
      println(s"Consumer group `${group}` does not exist or is rebalancing.")
    else {
      val consumer = getConsumer()
	  // 打印描述头
      printDescribeHeader()
      consumerSummaries.foreach { consumerSummary =>
        val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
        val partitionOffsets = topicPartitions.flatMap { topicPartition =>
          Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
            topicPartition -> offsetAndMetadata.offset
          }
        }.toMap
        describeTopicPartition(group, topicPartitions, partitionOffsets.get,
          _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
      }
    }
  }

  // 获取HW的值
  protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
    val consumer = getConsumer()
    val topicPartition = new TopicPartition(topic, partition)
    consumer.assign(List(topicPartition).asJava)
    consumer.seekToEnd(topicPartition)
    val logEndOffset = consumer.position(topicPartition)
    LogEndOffsetResult.LogEndOffset(logEndOffset)
  }

 //省略中间一部分不重要的代码 
 
  private def createNewConsumer(): KafkaConsumer[String, String] = {
    val properties = new Properties()
    val deserializer = (new StringDeserializer).getClass.getName
    val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, opts.options.valueOf(opts.groupOpt))
   // 不自动提交offset很重要,否则会影响消费组正常的消费(丢数据) 
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
    if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))

    new KafkaConsumer(properties)
  }
}

2、CURRENT OFFSETLOG END OFFSET计算规则

其中关键部分:

  • CURRENT OFFSET计算规则:
consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
  topicPartition -> offsetAndMetadata.offset
}
  • HW(脚本显示的LOG END OFFSET)计算规则
    consumer.seekToEnd(topicPartition)
    val logEndOffset = consumer.position(topicPartition)

从jstack分析来看,是这一步卡住了。

3、验证分析

参考HW(脚本显示的LOG END OFFSET)计算规则,实现了一个简单的HW采集程序,分别采集异常消费组下2个topic的情况,来看看具体是哪一步卡住?卡住前后是否有相关日志或输出?

我们发现:

  • 该消费组下日常的topic是正常获取HW
image.png
image.png
  • 而离线补录的topic无法正常获取HW值,可能异常
image.png
image.png

进而发现补录的topic存在leader为-1的情况。

image.png
image.png

推测:因为离线补录的topic大部分是不会在线上生产数据,只会在某些特点场景下由平台侧往里面的一次性导入数据,所有这个古老的集群当时下掉若干个节点时并没有迁移这些一次性的topic,从而在使用kafka-consumer-groups.sh获取消费组产生异常。

四、总结

1、 这次问题分析走了一些弯路,但还是加强了对kafka-consumer-groups.sh实现原理的理解

2、topic leader为-1会造成各种各样奇怪的问题,哪怕是一些不重要的topic。

目前所有高版本的集群针对这类场景有完善的监控,而0.9.0.1这种古老集群还相对不完善,等最近裁撤迁移到新集群后会有很大改善。

更多内容可以关注我的公众号~

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、问题描述
  • 二、问题分析
    • 1、信息梳理
      • 2、机器问题排查
        • 3、__consumer_offsetstopic排查
          • 4、进程阻塞原因排查
          • 三、源码分析
            • 1、KafkaConsumerGroupService
              • 2、CURRENT OFFSET和LOG END OFFSET计算规则
                • 3、验证分析
                • 四、总结
                相关产品与服务
                消息队列 CKafka 版
                消息队列 CKafka 版(TDMQ for CKafka)是一个分布式、高吞吐量、高可扩展性的消息系统,100%兼容开源 Kafka API 2.4、2.8、3.2 版本。CKafka 基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。CKafka 具有高可用、数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合、流式数据集成等场景。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档