我使用的是KafkaConsumer 0.10Java api。我想从一个特定的分区和特定的偏移量消费。我抬头一看,发现有一个seek方法,但它抛出了一个异常。有没有人有类似的用例或解决方案?
代码:
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(consumerProps);
consumer.seek(new TopicPartition("mytopic", 1), 4);
异常
java.lang.IllegalStateException: No current ass
我正在用scala编写一个与kafka进行火花流连接的程序,我得到了以下错误:
18/02/19 12:31:39 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 39)
org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {prensa4-0=744}
at org.apache.kafka.clients.
每次从一开始就使用相同的信息。
def consume_from_kafka():
client = KafkaClient(hosts=IP)
topic = client.topics[TOPIC]
consumer = topic.get_simple_consumer(consumer_group="mygroup",reset_offset_on_start=True)
consumer.commit_offsets()
for message in consumer:
if message is not Non