当我在对一个老话题做一些测试时,我发现了一些奇怪的行为。读了一下卡夫卡的日志,我注意到这条“删除了8个过期的偏移量”的信息:
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)
我试图在每次调用消费者时重置消费者offset,这样当我多次调用消费者时,它仍然可以读取生产者发送的记录。我设置了props.put("auto.offset.reset","earliest");并调用了consumer.seekToBeginning(consumer.assignment());,但是当我第二次调用消费者时,它将不会收到任何记录。我该如何解决这个问题呢?
public ConsumerRecords<String, byte[]> consumer(){
Properties props = new Propert
我用scala编写了kafka consumer。当我运行consumer时,控制台上显示为空白。我使用了以下代码:
val topicProducer = "testOutput"
val props = new Properties()
props.put("bootstrap.servers","host:9092,host:9092")
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeseria