首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >Spring Boot KafkaListener在运行一段时间后停止消费消息

Spring Boot KafkaListener在运行一段时间后停止消费消息
EN

Stack Overflow用户
提问于 2018-06-05 03:48:04
回答 1查看 7.6K关注 0票数 2

我有一个Spring Boot项目,它在8个分区的汇合Kakfa主题上运行几个Kafka消费者(@KafkaListener)。每个消费者的并发设置为1。主题从一个文件加载大约一百万行消息&消费者批量使用它们来验证、处理和更新数据库。

消费者工厂有以下设置- max.poll.records=10000、fetch.min.bytes=100000、fetch.max.wait.ms=1000、session.timeout.ms=240000。

更新06/04这里是消费者工厂设置。Spring- Kafka -1.3.1.RELEASE,合流Kafka broker版本

代码语言:javascript
复制
@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
    props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
    props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);

    return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
        new JsonDeserializer<>(ListingMessage.class));
}

@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
    ConsumerFactory<String, ListingMessage> consumerFactory) {
    ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(listingConsumerFactory);
    factory.setConcurrency(1);
    factory.setAutoStartup(false);
    factory.setBatchListener(true);
    return factory;
}

注意:容器工厂已将自动启动设置为false。这是在加载大文件时手动启动/停止消费者。

在运行大约1小时后(时间不同),消费者停止消费来自其主题的消息,即使该主题有许多可用消息。consume方法中有一条log语句,它会在日志中停止打印。

我使用"./kafka-consumer-groups“命令跟踪消费者的状态,一段时间后发现这个组中没有消费者。

代码语言:javascript
复制
$ ./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group  group_name

此使用者失败的日志中没有错误。consumer方法被封装在try-catch块中,因此它将捕获在处理消息期间抛出的任何异常。

我们如何设计Spring-Kafka消费者,使其在停止消费时重新启动消费者?有没有监听程序可以记录消费者停止时的准确时间点?这是因为将并发设置为1吗?我必须将并发性设置为1的原因是,如果这个消费者具有更多的并发性,那么还有其他消费者的速度会变慢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-06-05 06:47:34

我刚刚用30秒的max.poll.interval.ms=30000运行了一个测试,挂起了监听器,30秒后恢复了它;我在日志中看到了这一点……

代码语言:javascript
复制
2018-06-04 18:35:59.361  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so50687794-0]
foo

2018-06-04 18:37:07.347 ERROR 4191 --- [      foo-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) ~[kafka-clients-1.0.1.jar:na]
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250) ~[kafka-clients-1.0.1.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1329) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1190) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]

2018-06-04 18:37:07.350  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Revoking previously assigned partitions [so50687794-0]
2018-06-04 18:37:07.351  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: [so50687794-0]
2018-06-04 18:37:07.351  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=foo] (Re-)joining group
2018-06-04 18:37:10.400  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Successfully joined group with generation 15
2018-06-04 18:37:10.401  INFO 4191 --- [      foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=foo] Setting newly assigned partitions [so50687794-0]
2018-06-04 18:37:10.445  INFO 4191 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so50687794-0]
foo

您可以看到,在重新平衡之后,消费者被重新添加,相同的消息被重新传递;这正是我所期望的。

我得到了相同的结果;即使是1.3.1。

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50687794

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档