如何从KafkaStream获取主题名称和分区id。对于任何其他的Kafka用户,我们可以获得主题名称和partitionId,如下所示:
ConsumerRecords<String, String> records = consumer.pollConsumerRecord<String, String> record : records) {System.out.printf("consumed: key = %s, value = %s, part
我有一个小组,有两个主题。我试图将其中一个偏移量重置为1,但得到的消息是偏移量(1)低于主题分区的最早偏移量。) is lower than earliest offset for topic partition mytopic-0.NEW-OFFSET当我尝试不同的分区时,我会得到相同的消息:
➜ local-kafka_2
, io.confluent.kafka.serializers.KafkaAvroDeserializer.class);
因此,我不需要获取字节,然后将它们反序列化为有效负载等等。key/value for partition ri00-q-log-et-final-0 at offset 36833.所以我决定从话题的结尾开始听,我检查了文档,它建议实现ConsumerSeekAware和它的子接口ConsumerSeekAware.ConsumerSe