我的代码看起来像这样。
def message_reader(consumer):
consumed_message = consumer.consume_batch()
if consumed_message:
#do something
def run_reader():
process_consumer = get_consumer() #gets a SimpleConsumer()
message_reader(process_consumer)
process_consumer.commit()
process_consumer.close()所以,我的问题是,假设主题中没有消息,也没有消息被消费- commit()会增加偏移量吗?
另外,生产者是否在生成消息之前检查最新的偏移量?
发布于 2020-03-06 00:56:50
不是python客户端的专家,但如果java客户端在提交调用之间没有实际消耗任何东西,它会重新提交相同的位置。
然而,我敢肯定,所有的客户端都会做同样的事情(提交相同的位置),否则会导致您跳过记录。还有一些完整的Kafka监控系统已经被编写为依赖于这种行为-例如burrow。
https://stackoverflow.com/questions/60538777
复制相似问题