我试图用kafka-python构建一个应用程序,在这个应用程序中,使用者从一系列主题中读取数据。非常重要的是,消费者不会两次阅读同一条消息,但也不会错过一条消息。
一切似乎都很好,除了当我关闭消费者(例如失败),并试图开始阅读从抵消。我只能读取主题中的所有消息(这会创建双读),或者只侦听新消息,只监听(并且错过了在故障期间发出的消息)。当暂停消费者时,我不会遇到这个问题。
为了解决这个问题,我创建了一个孤立的模拟。
在这里,通用生产者:
from time import sleep
from json import dumps
from kafka import KafkaProducer
我使用手动kafka commit,将属性enable.auto.commit设置为false,同时初始化Kafka消费者,并在接收和处理消息后手动调用kafka commit。
然而,因为在我的消费者中处理消息是很耗时的,所以我得到了Exception with message "error": "Broker: Group rebalance in progress"
原因是重新平衡超时后提交被拒绝,并出现此错误。现在,恢复操作是退出并重新实例化进程,这将再次触发重新平衡和分区分配。另一种方法是捕获此异常,然后照常继续,只有在poll()调用被阻塞直到重新平
目前,我的Kafka Consumer流媒体应用程序正在手动将偏移量提交到Kafka中,并将enable.auto.commit设置为false。当我尝试重新启动应用程序时,它失败了,抛出以下异常: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{partition-12=155555555} 假设上面的错误是由于消息不存在/由于保留期而删除分区,我尝试了以下方法: 我禁用了手动提交
我对如何在pycharm中创建一个kafka生产者和消费者感到困惑。我已经创建了一个produce.py
from time import sleep
from json import dumps
from kafka import KafkaProducer
producer = KafkaProducer(
value_serializer = lambda x:dumps(x).encode('utf-8'),
bootstrap_servers = ["localhost:9092"]
)
for i in range(1,100):
我试图在每次调用消费者时重置消费者offset,这样当我多次调用消费者时,它仍然可以读取生产者发送的记录。我设置了props.put("auto.offset.reset","earliest");并调用了consumer.seekToBeginning(consumer.assignment());,但是当我第二次调用消费者时,它将不会收到任何记录。我该如何解决这个问题呢?
public ConsumerRecords<String, byte[]> consumer(){
Properties props = new Propert
当我在对一个老话题做一些测试时,我发现了一些奇怪的行为。读了一下卡夫卡的日志,我注意到这条“删除了8个过期的偏移量”的信息:
[GroupCoordinator 1001]: Stabilized group GROUP_NAME generation 37 (kafka.coordinator.GroupCoordinator)
[GroupCoordinator 1001]: Assignment received from leader for group GROUP_NAME for generation 37 (kafka.coordinator.GroupCoordinator)