首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用主题模式和大型消息时,Kafka使用者不会获取新记录。

使用主题模式和大型消息时,Kafka使用者不会获取新记录。
EN

Stack Overflow用户
提问于 2021-01-16 14:26:33
回答 1查看 1.8K关注 0票数 1

我希望你们中有人能帮我。

我使用的是弹簧引导2.3.4spring 2.5.6。我最近不得不重置一个偏移,并看到了一些奇怪的行为。我们使用了这些消息,但是在每一个X(变量)消息之后,在消费继续之前,我们有10秒的超时时间。

这是我的配置:

代码语言:javascript
运行
复制
spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      heartbeat-interval: 1000
      max-poll-records: 50
      group-id: kafka-fetch-demo
      fetch-max-wait: 10000
    listener:
      type: single
      concurrency: 1
      poll-timeout: 1000
      no-poll-threshold: 2
      monitor-interval: 10
      ack-mode: manual
    producer: 
      acks: all
      batch-size: 0
      retries: 0

这是一个示例侦听器代码:

代码语言:javascript
运行
复制
  @KafkaListener(id = LISTENER_ID, idIsGroup = false, topicPattern = "#{demoProperties.getTopicPattern()}")
  public void onEvent(Acknowledgment acknowledgment, ConsumerRecord<byte[], String> record) {
    log.info("Received record on topic {}, partition {} and offset {}",
            record.topic(),
            record.partition(),
            record.offset());

    acknowledgment.acknowledge();
  }

分析

我发现10秒超时来自fetch.max.wait.ms属性。然而,我不知道为什么这个属性适用。

据我所知,fetch-max-wait属性只确定代理在向使用者提供新记录之前等待的最长时间,即使没有超出fetch.min.bytes。此外,我还分析了这个问题只适用于使用主题模式和“较大”消息时。

Reproduction

我将一个演示应用程序上传到Github上,以再现这个问题:https://github.com/kraennix/kafka-fetch-demo

我是如何复制的:

  1. 我在一个卡夫卡主题上放置了一千条消息,每条消息17,1 KB。
  2. --我启动了我的消费应用程序,该应用程序监听每个主题的模式。然后您可以看到这种停止行为。

注意:如果我对“小”消息(89字节)做同样的操作,它就会像预期的那样工作。

日志

在日志中,您可以看到成功的提交,但是它表示跳过了提取

代码语言:javascript
运行
复制
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}}
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}}
2021-01-16 15:04:40.773 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Sending OffsetCommit request with {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}} to coordinator localhost:9092 (id: 2147483647 rack: null)
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Using older server API v7 to send OFFSET_COMMIT {group_id=kafka-fetch-demo,generation_id=4,member_id=consumer-kafka-fetch-demo-1-cf8e747f-531d-457a-aca8-18960c518ef9,group_instance_id=null,topics=[{name=publish.LargeTopic.2.test,partitions=[{partition_index=0,committed_offset=488,committed_leader_epoch=-1,committed_metadata=}]}]} with correlation id 62 to node 2147483647
2021-01-16 15:04:40.778 TRACE 19244 --- [_LISTENER-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Completed receive from node 2147483647 for OFFSET_COMMIT with correlation id 62, received {throttle_time_ms=0,topics=[{name=publish.LargeTopic.2.test,partitions=[{partition_index=0,error_code=0}]}]}
2021-01-16 15:04:40.779 DEBUG 19244 --- [_LISTENER-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Committed offset 488 for partition publish.LargeTopic.2.test-0
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.1.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.2.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.1.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.2.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
EN

回答 1

Stack Overflow用户

发布于 2021-01-18 02:45:28

当消息的大小发生变化时,您可能需要更改以下两个道具心跳间隔: 1000最大民意测验记录: 50。

您的心跳间隔为1秒,最大轮询等待时间为10秒。如果消息的大小很大,并且您正在同一个线程中处理所消耗的消息,那么到下一次拉动触发时,心跳检查将失败。确保执行程序使用可调用的方式处理消息。

将心跳间隔增加到5到10秒,当消息大小较高时,将最大Poll记录减少到15。霍普,这能帮上忙

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65750740

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档