我已经使用kafka-python
库编写了一个python脚本,它将消息写入和读取到kafka
中。我编写消息没有任何问题;我可以使用kafka
控制台工具检索它们。但是我不能使用我的python脚本来读取它们。我在我的消费者上有一个for,它在迭代的第一行冻结,永远不会返回。下面是我的代码:
from kafka import KafkaConsumer
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8")
)
for msg in consumer:
print(type(msg))
消费者已完全创建和订阅;我可以看到my-topic
列在其_client
属性的topic列表中。
有什么想法吗?
发布于 2020-03-01 20:32:03
默认情况下,kafka python从上一个偏移量开始,即只读取新的消息。一种方法是从头开始读取,或者另一种方法是将轮询主题保持在无限循环中,如以下代码所示:
while True:
try:
records = consumer.poll(60 * 1000) # timeout in millis , here set to 1 min
record_list = []
for tp, consumer_records in records.items():
for consumer_record in consumer_records:
record_list.append(consumer_record.value)
print(record_list) # record_list will be list of dictionaries
编辑
要从头开始读取,我们需要在创建消费者对象时添加auto_offset_reset=earliest
早些时候
consumer = KafkaConsumer(
"my-topic",
bootstrap_servers="localhost:9092"),
value_deserializer=lambda v: json.dumps(v).encode("utf-8"),
auto_offset_reset='earliest')
如果这有帮助,请告诉我!
https://stackoverflow.com/questions/60479348
复制