我目前正在处理一个用例,我正在使用kafka中的记录,并对这些记录进行处理。但问题是,有时我们从生产者端收到字段为空的记录,或者有时甚至kafka记录合同正在被破坏。我想记录这些记录,并决定在生产者端可能出现问题的地方。 因此,为了完成任务,我需要在使用它之前添加验证。记录只是一个字节数组。private Long restaurantId;
@JsonProperty("cart
我有一个java Kafka消费者,在其中我正在批量获取ConsumerRecords进行处理。system which can take random time for different requests or timeout in 5 seconds.我遇到的问题是,如果产生了较晚的记录,但前一条记录仍未超时,则如何提交或提交哪个偏移量。假设我在一批中获得了2条记录,第一条消息的外部调用仍在等待,而第二次调用已完成。如果我等待5秒等待外部响应,Kaf
是在使用者接收消息之后还是在实现KafkaListener的整个方法完成之后?KafkaConsumer.java public ResponseEntity如果卡夫卡消费者在3秒前崩溃,那么在这3 seconds?Between期间处理的任何记录都不会提交偏移量,auto-commit-interval: 3000 K