聊聊kafka consumer offset lag的监控

本文主要讨论一下kafka consumer offset lag的监控

方案

  • 利用官方的类库 ConsumerOffsetChecker ConsumerGroupCommand
  • 利用官方的JMX

ConsumerOffsetChecker

在0.8.2.2版本如下 kafka_2.10-0.8.2.2-sources.jar!/kafka/tools/ConsumerOffsetChecker.scala

object ConsumerOffsetChecker extends Logging {

  private val consumerMap: mutable.Map[Int, Option[SimpleConsumer]] = mutable.Map()
  private val offsetMap: mutable.Map[TopicAndPartition, Long] = mutable.Map()
  private var topicPidMap: immutable.Map[String, Seq[Int]] = immutable.Map()

  private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
    //...
  }

  private def processPartition(zkClient: ZkClient,
                               group: String, topic: String, pid: Int) {
    //...
  }

  private def processTopic(zkClient: ZkClient, group: String, topic: String) {
    topicPidMap.get(topic) match {
      case Some(pids) =>
        pids.sorted.foreach {
          pid => processPartition(zkClient, group, topic, pid)
        }
      case None => // ignore
    }
  }

  private def printBrokerInfo() {
    println("BROKER INFO")
    for ((bid, consumerOpt) <- consumerMap)
      consumerOpt match {
        case Some(consumer) =>
          println("%s -> %s:%d".format(bid, consumer.host, consumer.port))
        case None => // ignore
      }
  }

  def main(args: Array[String]) {
    //...
    try {
      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)

      val topicList = topics match {
        case Some(x) => x.split(",").view.toList
        case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir +  "/owners").toList
      }

      topicPidMap = immutable.Map(ZkUtils.getPartitionsForTopics(zkClient, topicList).toSeq:_*)
      val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
      val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)

      debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))
      channel.send(OffsetFetchRequest(group, topicPartitions))
      val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
      debug("Received offset fetch response %s.".format(offsetFetchResponse))

      offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
        if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
          val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
          // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
          // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
          try {
            val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong
            offsetMap.put(topicAndPartition, offset)
          } catch {
            case z: ZkNoNodeException =>
              if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
                offsetMap.put(topicAndPartition,-1)
              else
                throw z
          }
        }
        else if (offsetAndMetadata.error == ErrorMapping.NoError)
          offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
        else {
          println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
        }
      }
      channel.disconnect()

      println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format("Group", "Topic", "Pid", "Offset", "logSize", "Lag", "Owner"))
      topicList.sorted.foreach {
        topic => processTopic(zkClient, group, topic)
      }

      if (options.has("broker-info"))
        printBrokerInfo()

      for ((_, consumerOpt) <- consumerMap)
        consumerOpt match {
          case Some(consumer) => consumer.close()
          case None => // ignore
        }
    }
    catch {
      case t: Throwable =>
        println("Exiting due to: %s.".format(t.getMessage))
    }
    finally {
      for (consumerOpt <- consumerMap.values) {
        consumerOpt match {
          case Some(consumer) => consumer.close()
          case None => // ignore
        }
      }
      if (zkClient != null)
        zkClient.close()

      if (channel != null)
        channel.disconnect()
    }
  }
}

缺点就是该类是给命令行调用的,每调用一次,就new一次zkClient,对于监控用来说,不是太合适,需要改造一下,抽取zkClient出来

ConsumerGroupCommand

0.8.2.2以上版本使用ConsumerGroupCommand替代了ConsumerOffsetChecker kafka_2.11-0.10.2.1-sources.jar!/kafka/admin/ConsumerGroupCommand.scala

object ConsumerGroupCommand extends Logging {
  //...
  def main(args: Array[String]) {
    val opts = new ConsumerGroupCommandOptions(args)

    if (args.length == 0)
      CommandLineUtils.printUsageAndDie(opts.parser, "List all consumer groups, describe a consumer group, or delete consumer group info.")

    // should have exactly one action
    val actions = Seq(opts.listOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
    if (actions != 1)
      CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --delete")

    opts.checkArgs()

    val consumerGroupService = {
      if (opts.useOldConsumer) {
        System.err.println("Note: This will only show information about consumers that use ZooKeeper (not those using the Java consumer API).\n")
        new ZkConsumerGroupService(opts)
      } else {
        System.err.println("Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).\n")
        new KafkaConsumerGroupService(opts)
      }
    }

    try {
      if (opts.options.has(opts.listOpt))
        consumerGroupService.listGroups().foreach(println(_))
      else if (opts.options.has(opts.describeOpt)) {
        val (state, assignments) = consumerGroupService.describeGroup()
        val groupId = opts.options.valuesOf(opts.groupOpt).asScala.head
        assignments match {
          case None =>
            // applies to both old and new consumer
            printError(s"The consumer group '$groupId' does not exist.")
          case Some(assignments) =>
            if (opts.useOldConsumer)
              printAssignment(assignments, false)
            else
              state match {
                case Some("Dead") =>
                  printError(s"Consumer group '$groupId' does not exist.")
                case Some("Empty") =>
                  System.err.println(s"Consumer group '$groupId' has no active members.")
                  printAssignment(assignments, true)
                case Some("PreparingRebalance") | Some("AwaitingSync") =>
                  System.err.println(s"Warning: Consumer group '$groupId' is rebalancing.")
                  printAssignment(assignments, true)
                case Some("Stable") =>
                  printAssignment(assignments, true)
                case other =>
                  // the control should never reach here
                  throw new KafkaException(s"Expected a valid consumer group state, but found '${other.getOrElse("NONE")}'.")
              }
        }
      }
      else if (opts.options.has(opts.deleteOpt)) {
        consumerGroupService match {
          case service: ZkConsumerGroupService => service.deleteGroups()
          case _ => throw new IllegalStateException(s"delete is not supported for $consumerGroupService.")
        }
      }
    } catch {
      case e: Throwable =>
        printError(s"Executing consumer group command failed due to ${e.getMessage}", Some(e))
    } finally {
      consumerGroupService.close()
    }
  }
}

也是基于命令行来设计的

JMX

这个是利用kafka本身写入的JMX的数据,就不用额外在去像ConsumerOffsetChecker去自己连接再去获取。比如

            ObjectName oName = new ObjectName("kafka.producer:*");
            Set<ObjectName> metricsBeans = mBeanServer.queryNames(oName, null);
            for (ObjectName mBeanName : metricsBeans) {
                MBeanInfo metricsBean = mBeanServer.getMBeanInfo(mBeanName);
                    MBeanAttributeInfo[] metricsAttrs = metricsBean.getAttributes();
                    for (MBeanAttributeInfo metricsAttr : metricsAttrs) {
                           //get value
                           Object value = mBeanServer.getAttribute(mBeanName, metricsAttr.getName());
                          //process ...
                    }
            }

小结

可以自己改造ConsumerOffsetChecker或者ConsumerGroupCommand,然后上报到statsd或者Prometheus。当然能利用JMX是最省事的了。

doc

  • kafka官方JMX+Reporters

原文发布于微信公众号 - 码匠的流水账(geek_luandun)

原文发表时间:2017-12-28

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏世界第一语言是java

java根据ip地址获取城市地域信息

41050
来自专栏扎心了老铁

使用HDFS客户端java api读取hadoop集群上的信息

本文介绍使用hdfs java api的配置方法。 1、先解决依赖,pom <dependency> <groupId>org.apac...

1.5K70
来自专栏osc同步分享-java技术分享站

springmvc 其他

<!-- 配置直接转发的页面 --> <!-- 可以直接相应转发的页面, 而无需再经过 Handler 的方法. --> <mvc:view-contr...

36180
来自专栏丑胖侠

Spring注解的(List&Map)特殊注入功能

最近接手一个新项目,已经没有原开发人员维护了。项目框架是基于Spring boot进行开发。其中有两处Spring的注解花费了大量的时间才弄明白到底是怎么用的,...

28390
来自专栏Spring相关

第11章—使用对象关系映射持久化数据—SpringBoot+SpringData+Jpa进行查询修改数据库

JPA由EJB 3.0软件专家组开发,作为JSR-220实现的一部分。但它又不限于EJB 3.0,你可以在Web应用、甚至桌面应用中使用。JPA的宗旨是为POJ...

11830
来自专栏郭少华

Spring boot之JSON(二)

22240
来自专栏函数式编程语言及工具

Scalaz(49)- scalaz-stream: 深入了解-Sink/Channel

   一个完整的scalaz-stream有以下几个部分组成:Source -> Transducer -> Sink,用直白文字来描述就是:“输入 -> 传换...

22280
来自专栏编程坑太多

springboot(17)统一异常处理

20620
来自专栏码农笔录

java根据ip地址获取城市地域信息

49630
来自专栏菩提树下的杨过

velocity模板引擎学习(3)-异常处理

按上回继续,前面写过一篇Spring MVC下的异常处理、及Spring MVC下的ajax异常处理,今天看下换成velocity模板引擎后,如何处理异常页面:...

22680

扫码关注云+社区

领取腾讯云代金券