首页
学习
活动
专区
圈层
工具
发布
44 篇文章
1
[PyUserInput]模拟鼠标和键盘模拟
2
银行排队模拟(离散事件模拟)
3
Linux网络模拟
4
Linux TC(Traffic Control)作为损伤仪的基础配置和使用
5
深入学习Docker网络(看这篇就完全够了)
6
【鸿蒙 HarmonyOS】鸿蒙手机模拟器 ( 鸿蒙远程模拟器 | 鸿蒙远程模拟器运行手机应用 )
7
探索嵌入式应用框架(EAF)
8
多 OS 混合部署框架
9
嵌入式系统架构浅谈:编程设计模式 (一)---访问硬件的设计模式
10
事件驱动和消息驱动
11
原来 8 张图,就能学废 Reactor 和 Proactor
12
Linux df -h 命令hang住没有反应
13
kafka消费组信息采集异常(hang住)排查
14
ext4 io hung模拟脚本
15
解决 umount 命令卸载磁盘时busy/卡死的问题
16
程序卡死在void HardFault_Handler的解决办法
17
执行sed命令卡死CPU消耗100%一例分析
18
记一次因Redis使用不当导致应用卡死过程
19
字节对齐不慎引发的挂死问题
20
解引用NULL为什么会导致程序挂死?
21
记64位地址截断引发的挂死问题
22
websocket 在线工具_websocket添加请求头
23
【嵌入式Linux应用开发】SquareLine Studio与LVGL模拟器
24
详解Handler机制中消息队列的出队逻辑
25
Android UpdateEngine模块流程(含序列图)
26
物联网时代的嵌入式开发平台
27
400+条实用C/C++框架、库、工具整理 ,你能想到的都在这里了
28
ESP32芯片IO解读
29
M5Stack在ubuntu上进行开发编译
30
【抽象那些事】不完整的抽象&多方面抽象&未用的抽象&重复的抽象
31
H264,你不知道的小技巧
32
linux 创建虚拟块设备,制作文件系统并挂载,用于测试lustre
33
基于linux开发uvc摄像头_uvc协议扩展
34
清晰讲解LSB、MSB和大小端模式及网络字节序
35
在树莓派中使用 MicroPython 接入 MQTT
36
MicroPython 玩转硬件系列1:环境搭建
37
嵌入式系统架构浅谈:编程设计模式 (二)---嵌入并发和资源管理的设计模式
38
嵌入式软件架构设计之分层设计
39
IC之路(一)Proteus-Arduino仿真环境搭建
40
图像处理基础(六)-libjpeg常用算法
41
OpenCV双目标定
42
L-K光流推导及OpenCV代码实现
43
NDI Webcam Input工具,那些你不知道的知识!
44
使用QEMU chroot进行固件本地调试
清单首页其它文章详情

kafka消费组信息采集异常(hang住)排查

一、问题描述

小组同学在使用kafka官方工具kafka-consumer-groups.sh批量导集群消费组详情时,发现某一个集群基于broker的某些消费组会出现异常,主要表现:

  • 结果不全: 只有部分分区的信息
  • 进程会阻塞: 不会像导他消费组时那样,执行完自动退出
image.png

二、问题分析

1、信息梳理

兵马未动,粮草先行。在分析问题前我们需要具体梳理一下潜在的线索:

  • 集群的版本是0.9.0.1的bug?(古老的版本,最近半年会裁撤掉)但目前0.9.0.1版本集群中只有这个出现这样的问题
  • 集群部署不规范,broker和zk端口不统一的问题?但之前也有类似部署不规范的集群,没有出现过这样的问题
  • 集群机器有异常?
  • broker消费特殊场景下的bug?此次异常的消费组大多同时消费2个topic: 一个是日常三副本的topic,一个是离线补录的二副本的topic,确实存在bug的可能性。
  • kafka-consumer-groups.sh特殊场景下的Bug?

2、机器问题排查

1) strace相关进程,发现进程确实阻塞住了

2) 查看/proc/pid/stack看则有下面的堆栈输出

代码语言:txt
复制
$ cat /proc/12097/stack

[<ffffffff81097b6b>] futex_wait_queue_me+0xdb/0x140
[<ffffffff81097e46>] futex_wait+0x166/0x250
[<ffffffff81099f1e>] do_futex+0xde/0x570
[<ffffffff8109a421>] SyS_futex+0x71/0x150
[<ffffffff81b2a202>] system_call_fastpath+0x16/0x1b
[<ffffffffffffffff>] 0xffffffffffffffff

3) 查看机器版本

与现网其他机器相比,没有太大的差异,机器层面相关性可能不是很大。

3、__consumer_offsetstopic排查

基于broker消费的消费组,其偏移量的元数据信息是存储在__consumer_offsets这个topic下的。笔者之前在《kafka部分group无法正常消费数据排查》一文中曾介绍过因__consumer_offsets问题导致group异常的情况,所以便查看了一下__consumer_offsets的情况,发现一切正常:

image.png

4、进程阻塞原因排查

几经周折,没有发现什么进展,还是决定回到kafka-consumer-groups.sh本身,从查看进程堵塞原因出发。此时就需要我们jstack查看一下进程内诸线程的情况,我们发现:

image.png

进程阻塞在获取某个分区的HW(HighWatermark)上(注意:LEO对消费者是不可见的,所以这里虽然调用的方法是getLogEndOffset,但实际上是获取HW),这时我们就要从源码中进行深入的分析。

三、源码分析

kafka-consumer-groups.sh获取基于broker消费組信息,即调用kafka.admin.ConsumerGroupCommandKafkaConsumerGroupService.describeGroup。相关实现如下:

1、KafkaConsumerGroupService

代码语言:txt
复制
class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {

  private val adminClient = createAdminClient()

  // `consumer` is only needed for `describe`, so we instantiate it lazily
  private var consumer: KafkaConsumer[String, String] = null

  def list() {
    adminClient.listAllConsumerGroupsFlattened().foreach(x => println(x.groupId))
  }

  protected def describeGroup(group: String) {
    val consumerSummaries = adminClient.describeConsumerGroup(group)
    if (consumerSummaries.isEmpty)
      println(s"Consumer group `${group}` does not exist or is rebalancing.")
    else {
      val consumer = getConsumer()
	  // 打印描述头
      printDescribeHeader()
      consumerSummaries.foreach { consumerSummary =>
        val topicPartitions = consumerSummary.assignment.map(tp => TopicAndPartition(tp.topic, tp.partition))
        val partitionOffsets = topicPartitions.flatMap { topicPartition =>
          Option(consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
            topicPartition -> offsetAndMetadata.offset
          }
        }.toMap
        describeTopicPartition(group, topicPartitions, partitionOffsets.get,
          _ => Some(s"${consumerSummary.clientId}_${consumerSummary.clientHost}"))
      }
    }
  }

  // 获取HW的值
  protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
    val consumer = getConsumer()
    val topicPartition = new TopicPartition(topic, partition)
    consumer.assign(List(topicPartition).asJava)
    consumer.seekToEnd(topicPartition)
    val logEndOffset = consumer.position(topicPartition)
    LogEndOffsetResult.LogEndOffset(logEndOffset)
  }

 //省略中间一部分不重要的代码 
 
  private def createNewConsumer(): KafkaConsumer[String, String] = {
    val properties = new Properties()
    val deserializer = (new StringDeserializer).getClass.getName
    val brokerUrl = opts.options.valueOf(opts.bootstrapServerOpt)
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
    properties.put(ConsumerConfig.GROUP_ID_CONFIG, opts.options.valueOf(opts.groupOpt))
   // 不自动提交offset很重要,否则会影响消费组正常的消费(丢数据) 
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserializer)
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
    if (opts.options.has(opts.commandConfigOpt)) properties.putAll(Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)))

    new KafkaConsumer(properties)
  }
}

2、CURRENT OFFSETLOG END OFFSET计算规则

其中关键部分:

  • CURRENT OFFSET计算规则:
代码语言:txt
复制
consumer.committed(new TopicPartition(topicPartition.topic, topicPartition.partition))).map { offsetAndMetadata =>
  topicPartition -> offsetAndMetadata.offset
}
  • HW(脚本显示的LOG END OFFSET)计算规则
代码语言:txt
复制
    consumer.seekToEnd(topicPartition)
    val logEndOffset = consumer.position(topicPartition)

从jstack分析来看,是这一步卡住了。

3、验证分析

参考HW(脚本显示的LOG END OFFSET)计算规则,实现了一个简单的HW采集程序,分别采集异常消费组下2个topic的情况,来看看具体是哪一步卡住?卡住前后是否有相关日志或输出?

我们发现:

  • 该消费组下日常的topic是正常获取HW
image.png
  • 而离线补录的topic无法正常获取HW值,可能异常
image.png

进而发现补录的topic存在leader为-1的情况。

image.png

推测:因为离线补录的topic大部分是不会在线上生产数据,只会在某些特点场景下由平台侧往里面的一次性导入数据,所有这个古老的集群当时下掉若干个节点时并没有迁移这些一次性的topic,从而在使用kafka-consumer-groups.sh获取消费组产生异常。

四、总结

1、 这次问题分析走了一些弯路,但还是加强了对kafka-consumer-groups.sh实现原理的理解

2、topic leader为-1会造成各种各样奇怪的问题,哪怕是一些不重要的topic。

目前所有高版本的集群针对这类场景有完善的监控,而0.9.0.1这种古老集群还相对不完善,等最近裁撤迁移到新集群后会有很大改善。

更多内容可以关注我的公众号~

下一篇
举报
领券