在使用Spring时,我有时会收到以下错误消息:.I至少实现了一次语义,如代码片段所示
(1)我怀疑我是否错过了消费带来的任何信息?
2)我是否需要处理这个错误-- .As -- seekToCurrentErrorHandler()
没有报告这个错误
无法完成org.apache.kafka.clients.consumer.CommitFailedException:偏移提交,因为使用者不是用于自动分区分配的活动组的一部分;很可能使用者被踢出了组。
我的春季卡夫卡消费者代码片段
public class KafkaConsumerConfig implements KafkaListenerConfigurer
@Bean
public SeekToCurrentErrorHandler seekToCurrentErrorHandler() {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((record, e) -> {
System.out.println("RECORD from topic " + record.topic() + " at partition " + record.partition()
+ " at offset " + record.offset() + " did not process correctly due to a " + e.getCause());
}, new FixedBackOff(500L, 3L));
return seekToCurrentErrorHandler;
}
@Bean
public ConsumerFactory<String, ValidatedConsumerClass> consumerFactory() {
ErrorHandlingDeserializer<ValidatedConsumerClass> errorHandlingDeserializer;
errorHandlingDeserializer = new ErrorHandlingDeserializer<>( new JsonDeserializer<>(ValidatedConsumerClass.class));
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "grpid-098");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
errorHandlingDeserializer);
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, ValidatedConsumerClass>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ValidatedConsumerClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setErrorHandler(seekToCurrentErrorHandler());
return factory;
}
消费者阅读信息
@Service
public class KafKaConsumerService extends AbstractConsumerSeekAware {
@KafkaListener(id = "foo", topics = "mytopic-5", concurrency = "5", groupId = "mytopic-1-groupid")
public void consumeFromTopic1(@Payload @Valid ValidatedConsumerClass message, ConsumerRecordMetadata c) {
databaseService.save(message);
System.out.println( "-- Consumer End -- " + c.partition() + " ---consumer thread-- " + Thread.currentThread().getName());
}
发布于 2021-09-01 16:22:05
在这种情况下,异常是在记录处理之外引起的(在处理完成之后)。因为提交由于重新平衡而失败,所以STCEH不需要重新启动(而且它无论如何也不能,因为记录不再可用)。它只是重新抛出了异常。
一切都如期而至。
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.max.poll.interval.ms=5000
@SpringBootApplication
public class So69016372Application {
public static void main(String[] args) {
SpringApplication.run(So69016372Application.class, args);
}
@KafkaListener(id = "so69016372", topics = "so69016372")
public void listen(String in, @Header(KafkaHeaders.OFFSET) long offset) throws InterruptedException {
System.out.println(in + " @" + offset);
Thread.sleep(6000);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so69016372").partitions(1).replicas(1).build();
}
}
结果
2021-09-01 13:47:26.963 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions assigned: [so69016372-0]
foo @0
2021-09-01 13:47:31.991 INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-f02f8d74-c2b8-47d9-92d3-bf68e5c81a8f sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-09-01 13:47:32.989 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
2021-09-01 13:47:32.994 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
... 3 common frames omitted
2021-09-01 13:47:32.994 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-09-01 13:47:32.994 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
2021-09-01 13:47:32.995 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions lost: [so69016372-0]
2021-09-01 13:47:32.995 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions revoked: [so69016372-0]
...
2021-09-01 13:47:33.102 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions assigned: [so69016372-0]
foo @0
2021-09-01 13:47:38.141 INFO 13195 --- [ad | so69016372] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Member consumer-so69016372-1-e6ec685a-d9aa-43d3-b526-b04418095f09 sending LeaveGroup request to coordinator localhost:9092 (id: 2147483647 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2021-09-01 13:47:39.108 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Failing OffsetCommit request since the consumer is not part of an active group
2021-09-01 13:47:39.109 ERROR 13195 --- [o69016372-0-C-1] essageListenerContainer$ListenerConsumer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:200) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:112) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1602) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1210) ~[spring-kafka-2.7.6.jar:2.7.6]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1139) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1004) ~[kafka-clients-2.7.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1495) ~[kafka-clients-2.7.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2710) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2705) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2691) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2489) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.7.6.jar:2.7.6]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.6.jar:2.7.6]
... 3 common frames omitted
2021-09-01 13:47:39.109 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2021-09-01 13:47:39.109 INFO 13195 --- [o69016372-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-so69016372-1, groupId=so69016372] Lost previously assigned partitions so69016372-0
2021-09-01 13:47:39.109 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions lost: [so69016372-0]
2021-09-01 13:47:39.109 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions revoked: [so69016372-0]
...
2021-09-01 13:47:39.217 INFO 13195 --- [o69016372-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so69016372: partitions assigned: [so69016372-0]
foo @0
它将无限期地重试。
https://stackoverflow.com/questions/69016372
复制相似问题