前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >kafka offset-check工具失效的问题

kafka offset-check工具失效的问题

作者头像
sanmutongzi
发布2020-03-04 14:43:45
5240
发布2020-03-04 14:43:45
举报
文章被收录于专栏:stream processstream process

转载请注明原创地址http://www.cnblogs.com/dongxiao-yang/p/5414077.html

由于平时业务预警等需求,针对现在公司的kafka系统部署了几套监控系统,包括调用kafka-consumer-offset-checker.sh脚本写的lag监控,kafkaoffsetmonitor开源监控以及kafka-manager管理系统。最近发现kafka-consumer-offset-checker.sh脚本在原本运行正常的情况下一直出现"Exiting due to:null"的错误,这个问题会导致脚本直接退出无法获取完整的partition的lag信息导致报警失效。尝试把监控程序部署到其他机器又发现脚本可以正常运行。

为了搞明白问题,直接把kafka-consumer-offset-checker.sh脚本调用的kafka类ConsumerOffsetChecker拿出来进行研究,发现最后输出lag结果的方法如下

  private def processPartition(zkUtils: ZkUtils,
                               group: String, topic: String, pid: Int) {
    val topicPartition = TopicAndPartition(topic, pid)
    val offsetOpt = offsetMap.get(topicPartition)
    val groupDirs = new ZKGroupTopicDirs(group, topic)
    val owner = zkUtils.readDataMaybeNull(groupDirs.consumerOwnerDir + "/%s".format(pid))._1
    zkUtils.getLeaderForPartition(topic, pid) match {
      case Some(bid) =>
        val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
        consumerOpt match {
          case Some(consumer) =>
            val topicAndPartition = TopicAndPartition(topic, pid)
            val request =
              OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
            val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head

            val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
            println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
                                                                   owner match {case Some(ownerStr) => ownerStr case None => "none"}))
          case None => // ignore
        }
      case None =>
        println("No broker for partition %s - %s".format(topic, pid))
    }
  }

其中函数processPartition通过传入的group,topic,pid三个参数唯一确定需要计算的lag。

val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head 获取logSize

val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString) 用logSize减去offsetOpt这个map里对应的partition的offset得到lag。

把kafka这个类的源码搞到intellij idea在本地进行单步调试发现同样出现了Exiting due to:null的问题,并且永远是运行到某一特定分区后就问出题,调试到

val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head这个代码报错,尝试加入try catch并打印对应bid

val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkUtils, bid))
println(" brokerid ",bid)
.............
............

try {

              //val

              val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head

              val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
              println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
                owner match { case Some(ownerStr) => ownerStr case None => "none" }))
            }
            catch {
              case ex: Exception => //ignore
            }

研究发现对于不同的topic,出现问题的分区对应的broker id都是一样的,至此怀疑是代码环境与broker服务器之间的连通性出现问题,查了下本机以及监控环境的host配置的都是不全的,把host补全后问题解决。

后续发现kafkaoffsetmonitor以及kafka-manager出现的lag查询页面出现的分区显示不全或者数据为空的情况都通过补全host解决了。

吐槽一下kafka对于host的强依赖。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2016-04-20 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档