首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【kafka】__consumer_offsets部分分区异常导致消费不到数据问题排查

【kafka】__consumer_offsets部分分区异常导致消费不到数据问题排查

原创
作者头像
皮皮熊
修改2020-06-02 20:10:34
4.8K0
修改2020-06-02 20:10:34
举报

记一次kafka消费异常问题的排查 https://github.com/pierre94/kafka-notes

一、问题描述

问题描述

部分消费组无法通过broker(new-consumer)正常消费数据,更改消费组名后恢复正常。

group名(可能涉及业务信息,group名非真实名):

  • group1-打马赛克
  • group2-打马赛克

kafka版本: 0.9.0.1

二、简单分析

1、describe对应消费组

describe对应消费组时抛如下异常:

Error while executing consumer group command The group coordinator is not available.
org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException: The group coordinator is not available.

2、问题搜索

搜索到业界有类似问题,不过都没有解释清楚为什么出现这种问题,以及如何彻底解决(重启不算)!

  • http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Kafka-0-9-Client-td4975.html

三、深入分析

日志是程序员的第一手分析资料。Kafka服务端因为现网有大量服务在运营,不适合开启debug日志,所以我们只能从客户端入手。

1、开启客户端debug日志

将客户端日志等级开成debug级别,发现持续循环地滚动如下日志:

19:52:41.785 TKD [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Issuing group metadata request to broker 43
19:52:41.788 TKD [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Group metadata response ClientResponse(receivedTimeMs=1587642761788, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@1b68ddbd, request=RequestSend(header={api_key=10,api_version=0,correlation_id=30,client_id=consumer-1}, body={group_id=30cab231-05ed-43ef-96aa-a3ca1564baa3}), createdTimeMs=1587642761785, sendTimeMs=1587642761785), responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
19:52:41.875 TKD [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=31,client_id=consumer-1}, body={topics=[topic打马赛克]}), isInitiatedByNetworkClient, createdTimeMs=1587642761875, sendTimeMs=0) to node 43

我们大致可以看出循环在做着几件事情(先后不一定准确):

  • 从某个broker Issuing group metadata request
  • 获取Group metadata
  • 发起metadata request

我们聚焦到获取Group metadata的error关键字responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}},大致得出是kafka服务端没有给出coordinator的node结点信息。

2、服务端如何响应请求

请求对应的入口函数

首先我们需要查看api_key=10请求对应的服务端源码:

需要从kafka.server.KafkaApis中寻找对应的api接口函数

  def handle(request: RequestChannel.Request) {
  ……
        case RequestKeys.GroupCoordinatorKey => handleGroupCoordinatorRequest(request)
  ……
    }
handleGroupCoordinatorRequest逻辑
  def handleGroupCoordinatorRequest(request: RequestChannel.Request) {
    val groupCoordinatorRequest = request.body.asInstanceOf[GroupCoordinatorRequest]
    val responseHeader = new ResponseHeader(request.header.correlationId)

    if (!authorize(request.session, Describe, new Resource(Group, groupCoordinatorRequest.groupId))) {
      val responseBody = new GroupCoordinatorResponse(Errors.GROUP_AUTHORIZATION_FAILED.code, Node.noNode)
      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
    } else {
      val partition = coordinator.partitionFor(groupCoordinatorRequest.groupId)

      // get metadata (and create the topic if necessary)
      val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)
        // 第一个可能存在的问题:offsetsTopicMetadata的errCode不为空
      val responseBody = if (offsetsTopicMetadata.errorCode != Errors.NONE.code) {
        new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
      } else {
        val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata
          .find(_.partitionId == partition)
          .flatMap {
            partitionMetadata => partitionMetadata.leader
          }
        // 第二个可能存在的问题:coordinatorEndpoint为空
        coordinatorEndpoint match {
          case Some(endpoint) =>
            new GroupCoordinatorResponse(Errors.NONE.code, new Node(endpoint.id, endpoint.host, endpoint.port))
          case _ =>
            new GroupCoordinatorResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code, Node.noNode)
        }
      }

      trace("Sending consumer metadata %s for correlation id %d to client %s."
        .format(responseBody, request.header.correlationId, request.header.clientId))
      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
    }
  }

其中error_code=15对应的是Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code

从源码不难看出,导致Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code可能点有二:

  • 疑似问题点一:offsetsTopicMetadata的errCode不为空 offsetsTopicMetadata.errorCode != Errors.NONE.code offsetsTopicMetadata的errCode不为空,意味着整个__consumer_offsets元数据获取都有问题。但是现场只是部分group有问题,这里出问题的可能性不大。
  • 疑似问题点二:coordinatorEndpoint为空 val coordinatorEndpoint = offsetsTopicMetadata.partitionsMetadata .find(_.partitionId == partition) .flatMap { partitionMetadata => partitionMetadata.leader } 从offsetsTopicMetadata获取到的元数据,过滤出coordinator.partitionFor(groupCoordinatorRequest.groupId)分区的leader。而coordinator.partitionFor(groupCoordinatorRequest.groupId)正是与group名相关!这里出问题的可能性极大!
partitionFor相关的逻辑:
  def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount

即取group名的正hashCode模groupMetadataTopicPartitionCount(即__consumer_offsets对应的分区数)。

注:可能涉及业务信息,group名非真实名。而结果是正式group名算出的结果。

scala> "group1-打马赛克".hashCode % 50
res2: Int = 43

scala> "group2-打马赛克".hashCode % 50
res3: Int = 43

我们发现2个异常的消费组,其partitionFor后的值均为43,我们初步判断分区可能与__consumer_offsets的43分区相关! 接下来我们就要看下offsetsTopicMetadata相关的逻辑,来确认异常。

offsetsTopicMetadata的逻辑
val offsetsTopicMetadata = getOrCreateGroupMetadataTopic(request.securityProtocol)

getOrCreateGroupMetadataTopic -> metadataCache.getTopicMetadata -> getPartitionMetadata

  private def getPartitionMetadata(topic: String, protocol: SecurityProtocol): Option[Iterable[PartitionMetadata]] = {
    cache.get(topic).map { partitions =>
      partitions.map { case (partitionId, partitionState) =>
        val topicPartition = TopicAndPartition(topic, partitionId)

        val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
        val maybeLeader = aliveBrokers.get(leaderAndIsr.leader)

        val replicas = partitionState.allReplicas
        val replicaInfo = getAliveEndpoints(replicas, protocol)

        maybeLeader match {
          case None =>
            debug("Error while fetching metadata for %s: leader not available".format(topicPartition))
            new PartitionMetadata(partitionId, None, replicaInfo, Seq.empty[BrokerEndPoint],
              Errors.LEADER_NOT_AVAILABLE.code)

          case Some(leader) =>
            val isr = leaderAndIsr.isr
            val isrInfo = getAliveEndpoints(isr, protocol)

            if (replicaInfo.size < replicas.size) {
              debug("Error while fetching metadata for %s: replica information not available for following brokers %s"

  }

offsetsTopicMetadata即对于topic下所有leader、replicaInfo、isr正常分区的元数据信息,所以我们判断__consumer_offsets 43分区leader、replicaInfo、isr等可能存在异常,导致find(_.partitionId == partition)时找不到根据hashCode取模后对应的分区。

四、回到现网

1、__consumer_offsets分区信息验证

43分区果然存在leader异常的情况

2、问题复现

我们使用UUID批量生成消费组名,使其hashCode取模后为异常分区的分区号,再使用其进行消费时均出现消费异常的问题。

3、问题思考

  • 为什么__consumer_offsets部分分区会产生leader、replicaInfo、isr异常? 与网络抖动和一些集群操作可能有关,需要具体问题具体分析
  • 如何将__consumer_offsets异常分区恢复正常? 这里不详细介绍可以参考http://blog.itpub.net/31543630/viewspace-2212467/ 。

五、参考资料

  • Kafka new-consumer设计文档 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design
  • Kafka无法消费?!我的分布式消息服务Kafka却稳如泰山!http://blog.itpub.net/31543630/viewspace-2212467/
  • Problem with Kafka 0.9 Client http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Kafka-0-9-Client-td4975.html
  • ErrorMapping https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一、问题描述
    • 问题描述
    • 二、简单分析
      • 1、describe对应消费组
        • 2、问题搜索
        • 三、深入分析
          • 1、开启客户端debug日志
            • 2、服务端如何响应请求
              • 请求对应的入口函数
              • handleGroupCoordinatorRequest逻辑
              • partitionFor相关的逻辑:
              • offsetsTopicMetadata的逻辑
          • 四、回到现网
            • 1、__consumer_offsets分区信息验证
              • 2、问题复现
                • 3、问题思考
                • 五、参考资料
                相关产品与服务
                消息队列 CKafka 版
                消息队列 CKafka 版(TDMQ for CKafka)是一个分布式、高吞吐量、高可扩展性的消息系统,100%兼容开源 Kafka API 2.4、2.8、3.2 版本。CKafka 基于发布/订阅模式,通过消息解耦,使生产者和消费者异步交互,无需彼此等待。CKafka 具有高可用、数据压缩、同时支持离线和实时数据处理等优点,适用于日志压缩收集、监控数据聚合、流式数据集成等场景。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档