我有一个Spring Boot项目,它在8个分区的汇合Kakfa主题上运行几个Kafka消费者(@KafkaListener)。每个消费者的并发设置为1。主题从一个文件加载大约一百万行消息&消费者批量使用它们来验证、处理和更新数据库。
消费者工厂有以下设置- max.poll.records=10000、fetch.min.bytes=100000、fetch.max.wait.ms=1000、session.timeout.ms=240000。
更新06/04这里是消费者工厂设置。Spring- Kafka -1.3.1.RELEASE,合流Kafka broker版本
@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(ListingMessage.class));
}
@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
ConsumerFactory<String, ListingMessage> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(listingConsumerFactory);
factory.setConcurrency(1);
factory.setAutoStartup(false);
factory.setBatchListener(true);
return factory;
}
注意:容器工厂已将自动启动设置为false。这是在加载大文件时手动启动/停止消费者。
在运行大约1小时后(时间不同),消费者停止消费来自其主题的消息,即使该主题有许多可用消息。consume方法中有一条log语句,它会在日志中停止打印。
我使用"./kafka-consumer-groups“命令跟踪消费者的状态,一段时间后发现这个组中没有消费者。
$ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_name
此使用者失败的日志中没有错误。consumer方法被封装在try-catch块中,因此它将捕获在处理消息期间抛出的任何异常。
我们如何设计Spring-Kafka消费者,使其在停止消费时重新启动消费者?有没有监听程序可以记录消费者停止时的准确时间点?这是因为将并发设置为1吗?我必须将并发性设置为1的原因是,如果这个消费者具有更多的并发性,那么还有其他消费者的速度会变慢。
发布于 2018-06-05 06:47:34
我刚刚用30秒的max.poll.interval.ms=30000
运行了一个测试,挂起了监听器,30秒后恢复了它;我在日志中看到了这一点……
2018-06-04 18:35:59.361 INFO 4191 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so50687794-0]
foo
2018-06-04 18:37:07.347 ERROR 4191 --- [ foo-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250) ~[kafka-clients-1.0.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1329) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1190) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
2018-06-04 18:37:07.350 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=foo] Revoking previously assigned partitions [so50687794-0]
2018-06-04 18:37:07.351 INFO 4191 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [so50687794-0]
2018-06-04 18:37:07.351 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=foo] (Re-)joining group
2018-06-04 18:37:10.400 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=foo] Successfully joined group with generation 15
2018-06-04 18:37:10.401 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=foo] Setting newly assigned partitions [so50687794-0]
2018-06-04 18:37:10.445 INFO 4191 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so50687794-0]
foo
您可以看到,在重新平衡之后,消费者被重新添加,相同的消息被重新传递;这正是我所期望的。
我得到了相同的结果;即使是1.3.1。
https://stackoverflow.com/questions/50687794
复制相似问题