首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >重启Spring应用程序后,KafkaListener无法接收消息

重启Spring应用程序后,KafkaListener无法接收消息
EN

Stack Overflow用户
提问于 2022-05-31 02:01:21
回答 1查看 1K关注 0票数 0

我有一个带有Kafka应用程序的Springboot,我的Kafka集群从docker文件开始。但是,如果Kafka集群保持启动,但是Spring应用程序重新启动,则无法接收任何消息(函数正常工作)。

属性文件:

代码语言:javascript
运行
复制
server.port=18080
spring.kafka.bootstrap-servers=localhost:29092,localhost:39092
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.main.banner-mode=off

config.kafka.topic1=quickstart-events
config.kafka.retry.delay=10000
config.kafka.processing.interval=10000
config.kafka.producer.interval=10000
config.kafka.consumer.groupId=quickstart-events-group-id

KafkaConfiguration.java

代码语言:javascript
运行
复制
@Slf4j
@EnableKafka
@Configuration
public class KafkaConfiguration {

    @Bean
    public Serializer<Student> defaultJsonSerializer(ObjectMapper objectMapper) {
        return new JsonSerializer<>(objectMapper);
    }

    @Bean
    public NewTopic topicExample(@Value("${config.kafka.topic1}") String topic, @Value("${spring.kafka.bootstrap-servers}") String servers) {
        int length = servers.split(",").length;
        return TopicBuilder.name(topic)
                .partitions(length)
                .replicas(1)
                .build();
    }

}

StudentListener.java

代码语言:javascript
运行
复制
@Service
@Slf4j
public class StudentListener extends AbstractBaseKafkaListener implements ApplicationListener<ApplicationReadyEvent> {

    private final String listenerId = "kafka_consumerListener";

    private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @Autowired
    public StudentListener(List<MessageHandler> messageHandlers, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
        super(messageHandlers);
        this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
    }

    @KafkaListener(topics = {"${config.kafka.topic1}"}, groupId = "${config.kafka.consumer.groupId}", autoStartup = "true")
    public void onMessage(String message, MessageHeaders headers, Acknowledgment ack) {
        super.onMessage(message, headers, ack);
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        log.info("Application ready, start KafkaListener {}", listenerId);
        //kafkaListenerEndpointRegistry.getListenerContainer(listenerId).start();
    }
}

码头的一部分-Compose.yml

代码语言:javascript
运行
复制
  kafka-2:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    ports:
      - 39092:39092
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    networks:
      zk-kafka-network:
        ipv4_address: 10.5.0.6
EN

回答 1

Stack Overflow用户

发布于 2022-06-04 13:30:12

通过设置这些属性,并且不手动提交任何偏移量,该应用程序将在每次启动时都寻求主题的末尾。

代码语言:javascript
运行
复制
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false

在主题结束时,将不会使用任何数据,直到使用者在生产者发送数据时运行为止。因此,如果消费者没有按照您预期的方式工作,那么首先查看生产者应用程序。

如果生产者正在工作,那么您可以使用GetOffsetShell工具来验证主题中的偏移量正在增加。

如果偏移量正在增加,并且您希望捕获所有事件,而不仅仅是最新的事件集

代码语言:javascript
运行
复制
spring.kafka.consumer.auto-offset-reset=earliest

然后把信息

代码语言:javascript
运行
复制
@KafkaListener(topics = {"${config.kafka.topic1}"}, groupId = "${config.kafka.consumer.groupId}", autoStartup = "true")
public void onMessage(String message, MessageHeaders headers, Acknowledgment ack) {
    super.onMessage(message, headers, ack);
    acknowledgment.acknowledge(); 
}

如果消费者重新启动,消费群体在主题的末尾,和抵消没有增加,那么就没有数据可供消费,但应用程序将继续轮询代理。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72441523

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档