前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >你说通过Kafka AdminClient获取Lag会有性能问题?尊嘟假嘟0.o

你说通过Kafka AdminClient获取Lag会有性能问题?尊嘟假嘟0.o

作者头像
泊浮目
发布2024-09-25 08:20:09
850
发布2024-09-25 08:20:09
举报
文章被收录于专栏:狗哥的专栏

版本

日期

备注

1.0

2024.8.25

文章首发

本文内容已用一种抽象的方式做成了视频,喜欢看视频的同学可以在B站、抖音、微信视频号 上搜索“泊浮目”观看相应的内容。

0.前言

前阵子团队里出了个大故障,本质是因为其他语言实现的client有问题,非常频繁的请求大量元数据,而Kafka服务端这边也没有做什么限制,导致Kafka Broker宕了。

在相关的复盘报告中,复盘方提到了我这边的监控程序(用于观察线上实时作业的堆压)会频繁的去获取一些元数据,也是在间接的增加Kafka集群的压力,建议修改成消费__consumer_offsets的方式。(我这边用的是AdminClient#listConsumerGroupOffsets和AdminClient.listOffsets来获取commit和end的offset)

有点像stackoverflow.com/questions/6…Adán Escobar提供的答案。

这个事老哥之前有和我沟通过几次,那时我问他:你这边有什么根据吗?他没有正面回答我——听说这老哥之前在别的地方维护过很大的Kafka集群,对此我半信半疑的在网上搜索过一阵子,但是并没有找到对应的答案。

直到这次,我这边的监控程序被要求整改。对此我觉得莫名其妙,于是有了这篇文章——我们来扒一扒源码。

本文的代码基于Kafka 3.9。

消费__consumer_offsets本质上来说就是Consumer顺序读Broker上的日志,消费过程这块网上源码解析非常多,总体来说代价也不大,就不再赘述了。我们直接来看AdminClient上的实现。

1.AdminClient相关源码分析

1.1 AdminClient#listConsumerGroupOffsets

代码语言:javascript
复制
|--ListConsumerGroupOffsetsHandler

|--ApiKeys.OFFSET_FETCH

    \--handleOffsetFetchRequest

    \--handleOffsetFetchRequestFromCoordinator

    \--handleOffsetFetchRequestFromZookeeper

在早期版本中,kafka的元数据是保存在的zk里的。为了更全面的带大家阅读代码,我们把两个实现都读一遍。

From KRaft

代码语言:javascript
复制
    \--fetchOffsetsForGroup

|--GroupCoordinatorAdapter

    \-- fetchOffsets

     \--handleFetchOffset

|--GroupCoordinator

    \--handleFetchOffsets

|--GroupmetadataManager

    \--getOffsets

那么从getOffsets的实现为:

代码语言:javascript
复制
  def getOffsets(groupId: String, requireStable: Boolean, topicPartitionsOpt: Option[Seq[TopicPartition]]): Map[TopicPartition, PartitionData] = {
    trace("Getting offsets of %s for group %s.".format(topicPartitionsOpt.getOrElse("all partitions"), groupId))
    val group = groupMetadataCache.get(groupId)
    if (group == null) {
      topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
        val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
          Optional.empty(), "", Errors.NONE)
        topicPartition -> partitionData
      }.toMap
    } else {
      group.inLock {
        if (group.is(Dead)) {
          topicPartitionsOpt.getOrElse(Seq.empty[TopicPartition]).map { topicPartition =>
            val partitionData = new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
              Optional.empty(), "", Errors.NONE)
            topicPartition -> partitionData
          }.toMap
        } else {
          val topicPartitions = topicPartitionsOpt.getOrElse(group.allOffsets.keySet)

          topicPartitions.map { topicPartition =>
            if (requireStable && group.hasPendingOffsetCommitsForTopicPartition(topicPartition)) {
              topicPartition -> new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                Optional.empty(), "", Errors.UNSTABLE_OFFSET_COMMIT)
            } else {
              val partitionData = group.offset(topicPartition) match {
                case None =>
                  new PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                    Optional.empty(), "", Errors.NONE)
                case Some(offsetAndMetadata) =>
                  new PartitionData(offsetAndMetadata.offset,
                    offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
              }
              topicPartition -> partitionData
            }
          }.toMap
        }
      }
    }
  }

在这里我们可以看到,相关的信息其实从groupMetadataCache这个内存缓存中获取的, 并不是一个很重的操作。而缓存的load方法是loadGroupsAndOffsets,因为篇幅原因,不再展开,有兴趣的同学可以自行阅读。

From Zookeeper

逻辑非常简单,直接粘代码:

代码语言:javascript
复制
  private def handleOffsetFetchRequestFromZookeeper(request: RequestChannel.Request): CompletableFuture[Unit] = {
    val header = request.header
    val offsetFetchRequest = request.body[OffsetFetchRequest]

    def createResponse(requestThrottleMs: Int): AbstractResponse = {
      val offsetFetchResponse =
        // reject the request if not authorized to the group
        if (!authHelper.authorize(request.context, DESCRIBE, GROUP, offsetFetchRequest.groupId))
          offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
        else {
          val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch requests"))
          val (authorizedPartitions, unauthorizedPartitions) = partitionByAuthorized(
            offsetFetchRequest.partitions.asScala, request.context)

          // version 0 reads offsets from ZK
          val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
            try {
              if (!metadataCache.contains(topicPartition))
                (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
              else {
                val payloadOpt = zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
                payloadOpt match {
                  case Some(payload) =>
                    (topicPartition, new OffsetFetchResponse.PartitionData(payload,
                      Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.NONE))
                  case None =>
                    (topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
                }
              }
            } catch {
              case e: Throwable =>
                (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
                  Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
            }
          }.toMap

          val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
          new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
        }
      trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
      offsetFetchResponse
    }
    requestHelper.sendResponseMaybeThrottle(request, createResponse)
    CompletableFuture.completedFuture[Unit](())
  }

首先检查用户是否被授权访问指定的组,如果没有授权,则返回授权失败的错误响应。然后根据请求中的分区信息,将分区分为授权和未授权的分区。对于授权的分区,尝试从ZooKeeper中获取消费偏移量,并根据结果生成相应的分区数据。如果出现异常,则生成一个包含无效偏移量的分区数据。最后将授权和未授权的分区数据合并,并将响应发送回客户端。

1.2 AdminClient#listOffsets

代码语言:javascript
复制
|--ListOffsetsHandler

|--ApiKeys.LIST_OFFSETS

  \--handleListOffsetRequest

    \--handleListOffsetRequestV0

    \--handleListOffsetRequestV1AndAbove

这里也分成了两个版本,引入KRaft之前是handleListOffsetRequestV0,之后则是handleListOffsetRequestV1AndAbove。除了在部分功能支持的差异和错误处理更加细致外,核心调用的replicaManager.fetchOffsetForTimestamp并无变化。而这个函数的底层实现本质是调用Kafka Log,即去Broker的Log上查询相关的信息。

2.小结

listConsumerGroupOffsets这个命令在KRaft之前的实现是读取Zookeeper,但由于ZK存储的特性,小量点查的代价并不大。如果在启用KRaft的情况下,并不是什么性能瓶颈。

listOffsets则是通过Kafka Broker读取对应Topic Partition中的Log实现的,相比Consumer消费__consumer_offsets来说,性能在其之下——如果进行大频次的读,本质上来说是在做随机IO读,是比不上消费__consumer_offsets的顺序读的。如果高频次的做读取操作,是一定会引起IO压力的。

2.1 其他答疑

以下问题来自于一些视频号底下的提问,这边统一回答。

Q1:Kafka百万吞吐,几个查询接口就查挂了?

A:

  1. 高吞吐基于顺序读写与PageCache等特性。seek多个topic parition的end offset是没法利用以上特性的,和高吞吐毫无相关。
  2. 并没有说因为调admin client API导致KAFKA挂了。但这里面的确是有可优化的点。

Q2:不就是查这么点信息吗?能消耗多少资源?我管过的集群多了,定时任务半小时查一次,从来没见查挂过。

A:文中提到了高频次的读取操作是分钟级的。实际上我们的Kafka也不小,正是因为故障影响面大,所以我这边也有幸参与了复盘。

Q3:获取元数据会导致集群压力,认真的嘛?我怎么记得Kafka发消息前都会检查一次当前topic的元数据

A:我们这里的获取元数据特指seek到kafka log的对应位置去获取end offset。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-09-25,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 0.前言
  • 1.AdminClient相关源码分析
    • 1.1 AdminClient#listConsumerGroupOffsets
      • From KRaft
      • From Zookeeper
    • 1.2 AdminClient#listOffsets
    • 2.小结
      • 2.1 其他答疑
        • Q1:Kafka百万吞吐,几个查询接口就查挂了?
        • Q2:不就是查这么点信息吗?能消耗多少资源?我管过的集群多了,定时任务半小时查一次,从来没见查挂过。
        • Q3:获取元数据会导致集群压力,认真的嘛?我怎么记得Kafka发消息前都会检查一次当前topic的元数据
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档