前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Kafka源码系列之实现自己的kafka监控

Kafka源码系列之实现自己的kafka监控

作者头像
Spark学习技巧
发布2018-01-30 18:00:55
1.8K0
发布2018-01-30 18:00:55
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

一,基本思路介绍

Kafka作为一个好用的且应用很广泛的消息队列,在大数据处理系统中基本是必不可少的。当然,作为缓存消息的消息队列,我们对其进行流量监控及消费滞后告警就显得异常重要了。

读过前面的文章,<Kafka源码系列之源码解析SimpleConsumer的消费过程>和<Kafka源码系列之Consumer高级API性能分析>这两篇文章的兄弟姐妹应该看本篇文章会很简单。实际就是利用SimpleConsumer获取Partition最新的offset,用Zookeeper的工具获取消费者组的各个分区的消费偏移,两者做差就是lagSize。

但是实际kafka的消费者组的消费偏移存储,kafka支持两个版本的:

1,基于Zookeeper。OffsetFetchRequest.CurrentVersion为0。

2,基于kafka自身。OffsetFetchRequest.CurrentVersion为1(默认)。

那么要实现一个消费者消费滞后预警,就要兼容两种方式,那么我们就详细的来介绍这两种方式的实现。

二,重要工具类

1,ConsumerOffsetChecker

Kafka提供的检查消费者消费偏移,LogEndSize和lagsize的工具。我们实现自己的监控均可以模仿该类实现。本文也仅限于基于该类将实现过程。

2,ZkUtils

Kafka提供的操作Zookeeper的工具类。

3,SimpleConsumer

Kafka消费者实现类。Kafka的副本同步,低级消费者,高级消费者都是基于该类实现从kafka消费消息的。

4,OffsetRequest

消费者去获取分区数据偏移的请求类,对应的请求key是:RequestKeys.OffsetsKey。在kafka的服务端kafkaApis的处理函数是:handleOffsetRequest(request)

5,OffsetFetchRequest

这个是请求某个topic的某个消费组的消费偏移,对应的请求key:RequestKeys.OffsetFetchKey。在kafka的服务端kafkaApis的处理函数是:handleOffsetFetchRequest(request)

6,OffsetManager

偏移管理器。内部维护了一个Scheduler,会定时执行compact,进行偏移的合并。

三,源代码实现

1,首先是获得消费者的消费偏移

ConsumerOffsetChecker当main方法中首先是获得topic列表

val topicList = topics match {
 case Some(x) => x.split(",").view.toList
 case None => ZkUtils.getChildren(zkClient, groupDirs.consumerGroupDir +  "/owners").toList
}

接着是建立到Broker链接,然后从kafka获取消费者偏移

val topicPartitions = topicPidMap.flatMap { case(topic, partitionSeq) => partitionSeq.map(TopicAndPartition(topic, _)) }.toSeq
val channel = ClientUtils.channelToOffsetManager(group, zkClient, channelSocketTimeoutMs, channelRetryBackoffMs)

debug("Sending offset fetch request to coordinator %s:%d.".format(channel.host, channel.port))
channel.send(OffsetFetchRequest(group, topicPartitions))
val offsetFetchResponse = OffsetFetchResponse.readFrom(channel.receive().buffer)
debug("Received offset fetch response %s.".format(offsetFetchResponse))

offsetFetchResponse.requestInfo.foreach { case (topicAndPartition, offsetAndMetadata) =>
 if (offsetAndMetadata == OffsetMetadataAndError.NoOffset) {
 val topicDirs = new ZKGroupTopicDirs(group, topicAndPartition.topic)
 // this group may not have migrated off zookeeper for offsets storage (we don't expose the dual-commit option in this tool
    // (meaning the lag may be off until all the consumers in the group have the same setting for offsets storage)
 try {
 val offset = ZkUtils.readData(zkClient, topicDirs.consumerOffsetDir + "/%d".format(topicAndPartition.partition))._1.toLong
 offsetMap.put(topicAndPartition, offset)
    } catch {
 case z: ZkNoNodeException =>
 if(ZkUtils.pathExists(zkClient,topicDirs.consumerOffsetDir))
 offsetMap.put(topicAndPartition,-1)
 else
          throw z
    }
  }
 else if (offsetAndMetadata.error == ErrorMapping.NoError)
 offsetMap.put(topicAndPartition, offsetAndMetadata.offset)
 else {
 println("Could not fetch offset for %s due to %s.".format(topicAndPartition, ErrorMapping.exceptionFor(offsetAndMetadata.error)))
  }
}

假如,获得的偏移信息为空,那么就从Zookeeper获取消费者偏移。

解决获取topic的分区的最大偏移,实际思路是构建simpleConsumer,然后由其 去请求偏移,再跟获取的消费者偏移做差就得到消费者最大偏移。

topicList.sorted.foreach {
  topic => processTopic(zkClient, group, topic)
}
topicPidMap.get(topic) match {
 case Some(pids) =>
    pids.sorted.foreach {
      pid => processPartition(zkClient, group, topic, pid)
    }
 case None => // ignore
}

在processPartition中

val offsetOpt = offsetMap.get(topicPartition)
val groupDirs = new ZKGroupTopicDirs(group, topic)
val owner = ZkUtils.readDataMaybeNull(zkClient, groupDirs.consumerOwnerDir + "/%s".format(pid))._1
ZkUtils.getLeaderForPartition(zkClient, topic, pid) match {
 case Some(bid) =>
 val consumerOpt = consumerMap.getOrElseUpdate(bid, getConsumer(zkClient, bid))
    consumerOpt match {
 case Some(consumer) =>
 val topicAndPartition = TopicAndPartition(topic, pid)
 val request =
 OffsetRequest(immutable.Map(topicAndPartition -> PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
 val logSize = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets.head

然后做差得到LagSize

val lagString = offsetOpt.map(o => if (o == -1) "unknown" else (logSize - o).toString)
println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offsetOpt.getOrElse("unknown"), logSize, lagString.getOrElse("unknown"),
 owner match {case Some(ownerStr) => ownerStr case None => "none"}))

getConsumer方法中

private def getConsumer(zkClient: ZkClient, bid: Int): Option[SimpleConsumer] = {
 try {
    ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1 match {
 case Some(brokerInfoString) =>
        Json.parseFull(brokerInfoString) match {
 case Some(m) =>
 val brokerInfo = m.asInstanceOf[Map[String, Any]]
 val host = brokerInfo.get("host").get.asInstanceOf[String]
 val port = brokerInfo.get("port").get.asInstanceOf[Int]
 Some(new SimpleConsumer(host, port, 10000, 100000, "ConsumerOffsetChecker"))
 case None =>
 throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
        }
 case None =>
 throw new BrokerNotAvailableException("Broker id %d does not exist".format(bid))
    }
  } catch {
 case t: Throwable =>
 println("Could not parse broker info due to " + t.getCause)
      None
  }
}

四,总结

该工具类的使用

bin/kafka-consumer-offset-checker.sh --group yourgroup -topic yourtopic --zookeeper localhost:2181

输出结果

Offset是消费者消费到的偏移,logsize是kafka数据的最大偏移,Lag是二者的差。也即

LagSize = LogSize - Offset

得到我们消费组的滞后情况后,我们就可以根据需求(比如,设定滞后多少消息后给出告警),给出相应的告警。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2017-07-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档