我有一个带有Kafka应用程序的Springboot,我的Kafka集群从docker文件开始。但是,如果Kafka集群保持启动,但是Spring应用程序重新启动,则无法接收任何消息(函数正常工作)。
属性文件:
server.port=18080
spring.kafka.bootstrap-servers=localhost:29092,localhost:39092
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
spring.main.banner-mode=off
config.kafka.topic1=quickstart-events
config.kafka.retry.delay=10000
config.kafka.processing.interval=10000
config.kafka.producer.interval=10000
config.kafka.consumer.groupId=quickstart-events-group-id
KafkaConfiguration.java
@Slf4j
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Bean
public Serializer<Student> defaultJsonSerializer(ObjectMapper objectMapper) {
return new JsonSerializer<>(objectMapper);
}
@Bean
public NewTopic topicExample(@Value("${config.kafka.topic1}") String topic, @Value("${spring.kafka.bootstrap-servers}") String servers) {
int length = servers.split(",").length;
return TopicBuilder.name(topic)
.partitions(length)
.replicas(1)
.build();
}
}
StudentListener.java
@Service
@Slf4j
public class StudentListener extends AbstractBaseKafkaListener implements ApplicationListener<ApplicationReadyEvent> {
private final String listenerId = "kafka_consumerListener";
private final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
public StudentListener(List<MessageHandler> messageHandlers, KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry) {
super(messageHandlers);
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
}
@KafkaListener(topics = {"${config.kafka.topic1}"}, groupId = "${config.kafka.consumer.groupId}", autoStartup = "true")
public void onMessage(String message, MessageHeaders headers, Acknowledgment ack) {
super.onMessage(message, headers, ack);
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("Application ready, start KafkaListener {}", listenerId);
//kafkaListenerEndpointRegistry.getListenerContainer(listenerId).start();
}
}
码头的一部分-Compose.yml
kafka-2:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
ports:
- 39092:39092
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
zk-kafka-network:
ipv4_address: 10.5.0.6
发布于 2022-06-04 13:30:12
通过设置这些属性,并且不手动提交任何偏移量,该应用程序将在每次启动时都寻求主题的末尾。
spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.enable-auto-commit=false
在主题结束时,将不会使用任何数据,直到使用者在生产者发送数据时运行为止。因此,如果消费者没有按照您预期的方式工作,那么首先查看生产者应用程序。
如果生产者正在工作,那么您可以使用GetOffsetShell工具来验证主题中的偏移量正在增加。
如果偏移量正在增加,并且您希望捕获所有事件,而不仅仅是最新的事件集
spring.kafka.consumer.auto-offset-reset=earliest
然后把信息
@KafkaListener(topics = {"${config.kafka.topic1}"}, groupId = "${config.kafka.consumer.groupId}", autoStartup = "true")
public void onMessage(String message, MessageHeaders headers, Acknowledgment ack) {
super.onMessage(message, headers, ack);
acknowledgment.acknowledge();
}
如果消费者重新启动,消费群体在主题的末尾,和抵消没有增加,那么就没有数据可供消费,但应用程序将继续轮询代理。
https://stackoverflow.com/questions/72441523
复制相似问题