首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >CommitFailedException由春季卡夫卡消费者

CommitFailedException由春季卡夫卡消费者
EN

Stack Overflow用户
提问于 2021-09-01 15:07:34
回答 1查看 4K关注 0票数 0

在使用Spring时,我有时会收到以下错误消息:.I至少实现了一次语义,如代码片段所示

(1)我怀疑我是否错过了消费带来的任何信息?

2)我是否需要处理这个错误-- .As -- seekToCurrentErrorHandler()没有报告这个错误

无法完成org.apache.kafka.clients.consumer.CommitFailedException:偏移提交,因为使用者不是用于自动分区分配的活动组的一部分;很可能使用者被踢出了组。

我的春季卡夫卡消费者代码片段

代码语言:javascript
运行
复制
  public class KafkaConsumerConfig implements KafkaListenerConfigurer 
    @Bean
        public SeekToCurrentErrorHandler seekToCurrentErrorHandler() {
            SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((record, e) -> {
                System.out.println("RECORD from topic " + record.topic() + " at partition " + record.partition()
                        + " at offset " + record.offset() + " did not process correctly due to a " + e.getCause());
            }, new FixedBackOff(500L, 3L));
            return seekToCurrentErrorHandler;
        }
    
        @Bean 
        public ConsumerFactory<String, ValidatedConsumerClass> consumerFactory() {
      
      ErrorHandlingDeserializer<ValidatedConsumerClass> errorHandlingDeserializer;
      errorHandlingDeserializer = new ErrorHandlingDeserializer<>( new JsonDeserializer<>(ValidatedConsumerClass.class));
      
      Map<String, Object> props = new HashMap<>();
      props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-098");
      props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
      props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
      
      
      return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
      errorHandlingDeserializer);
      
      }
    
        @Bean
        KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ValidatedConsumerClass>> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, ValidatedConsumerClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setAckMode(AckMode.RECORD);
            factory.setErrorHandler(seekToCurrentErrorHandler());
            return factory;
        }

消费者阅读信息

代码语言:javascript
运行
复制
@Service
public class KafKaConsumerService extends  AbstractConsumerSeekAware {

@KafkaListener(id = "foo", topics = "mytopic-5", concurrency = "5", groupId = "mytopic-1-groupid")
    public void consumeFromTopic1(@Payload @Valid ValidatedConsumerClass message, ConsumerRecordMetadata c) {


        
    databaseService.save(message);
        
        System.out.println( "-- Consumer End -- "   + c.partition() + " ---consumer thread-- " + Thread.currentThread().getName());


    }
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-09-01 16:22:05

  1. 不,你没有遗漏任何东西。
  2. 不,你不需要处理它,STCEH已经处理了它,记录将在下一次投票中重发。

在这种情况下,异常是在记录处理之外引起的(在处理完成之后)。因为提交由于重新平衡而失败,所以STCEH不需要重新启动(而且它无论如何也不能,因为记录不再可用)。它只是重新抛出了异常。

一切都如期而至。

代码语言:javascript
运行
复制
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.interval.ms=5000
代码语言:javascript
运行
复制
@SpringBootApplication
public class So69016372Application {

    public static void main(String[] args) {
        SpringApplication.run(So69016372Application.class, args);
    }

    @KafkaListener(id = "so69016372", topics = "so69016372")
    public void listen(String in, @Header(KafkaHeaders.OFFSET) long offset) throws InterruptedException {
        System.out.println(in + " @" + offset);
        Thread.sleep(6000);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so69016372").partitions(1).replicas(1).build();
    }

}

结果

代码语言:javascript
运行
复制
2021-09-01 13:47:26.963  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
foo @0
2021-09-01 13:47:31.991  INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-f02f8d74-c2b8-47d9-92d3-bf68e5c81a8f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-09-01 13:47:32.989  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
2021-09-01 13:47:32.994 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 3 common frames omitted

2021-09-01 13:47:32.994  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-09-01 13:47:32.994  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
2021-09-01 13:47:32.995  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions lost: [so69016372-0]
2021-09-01 13:47:32.995  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions revoked: [so69016372-0]
...
2021-09-01 13:47:33.102  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
foo @0
2021-09-01 13:47:38.141  INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-e6ec685a-d9aa-43d3-b526-b04418095f09 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-09-01 13:47:39.108  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
2021-09-01 13:47:39.109 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
    ... 3 common frames omitted

2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions lost: [so69016372-0]
2021-09-01 13:47:39.109  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions revoked: [so69016372-0]
...
2021-09-01 13:47:39.217  INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so69016372: partitions assigned: [so69016372-0]
foo @0

它将无限期地重试。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69016372

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档