使用kafka-python消费kafka,seek方法重置的本地偏移量会提交给kafka吗?我正在研究获取双中心机房Kafka集群的rpo索引的解决方案。使用kafka-python获取Kafka集群的最大时间戳,取两个机房的Kafka集群的最大时间戳之差。
使用seek()将偏移量重置为分区的最大偏移量-1,然后使用poll()获取最新的消息,但该消息无法在循环中获取,检查当前消费组的偏移量,发现堆叠的消息为0
#reset offset to (max_offset-1)
for tp,offset in offsets_dict.items():
offset
前几天我们确实升级了我们的Kafka集群(滚动升级到2.3),我们注意到我们的一个消费者(kafka-console)出现了大量的以下错误消息: [2019-09-26 13:02:25,115] ERROR [Consumer clientId=consumer-1, groupId=console-consumer-96306] Offset commit failed on partition TOPICNAME at offset 18834877: The coordinator is loading and hence can't process requests. (o
我使用kafka节点(kafka的节点客户端),使用使用者检索有关主题的消息。不幸的是,我收到了一个" offsetOutOfRange“条件(调用offsetOutOfRange回调)。我的应用程序运行良好,直到消费者明显落后于生产者,在最早的和最新的抵消之间留下了很大的差距。此时,我(可能是错误的)假定使用者将能够继续接收消息(并希望赶上生产者)。
我的kafka客户端代码如下:
:
:
var kafka = require('kafka-node');
var zookeeper = "10.0.1.201:2181";
var id =
我想用NodeJS和Kafka构建一个API,它可以将一个偏移量和一个主题作为输入,并输出从偏移量开始的前10条消息。我在和中尝试了这种方法。
它们提供的使用者API允许使用来自特定偏移量的消息。我想停止消费的信息,一旦我已经阅读了大约10条消息。但是两个API调用都将继续获取消息,直到最后一条消息。我怎么能停止那样做呢?
这是我的编辑的完整代码
var Kafka = require('no-kafka');
var express = require("express");
var app = express();
var producer = new K
我是kafka的新手,正在尝试理解是否有一种方法可以从上次消费的偏移量读取消息,而不是从开始读取。
我正在写一个示例案例,这样我的意图就不会偏离。
Eg:
1) I produced 5 messages at 7:00 PM and console consumer consumed those.
2) I stopped consumer at 7:10 PM
3) I produced 10 message at 7:20 PM. No consumer had read those messages.
4) Now, i have started console consumer at
我有一个定制的Kafka使用者,我用它向REST发送一些请求。根据API的响应,我要么提交偏移量,要么跳过消息而不提交。
最小示例:
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(200);
for (ConsumerRecord<String, Object> record : records) {
// Sending a POST request and retrieving the answer
// ...
目前,我的Kafka Consumer流媒体应用程序正在手动将偏移量提交到Kafka中,并将enable.auto.commit设置为false。当我尝试重新启动应用程序时,它失败了,抛出以下异常: org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions:{partition-12=155555555} 假设上面的错误是由于消息不存在/由于保留期而删除分区,我尝试了以下方法: 我禁用了手动提交
public void run() {
// find the meta data about the topic and partition we are interested in
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
if (metadata == null) {
System.out.println("Can't find metadata for Topic and Partition. Exit
统计topic消息后,我的Kafka消费者死机了
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='127.0.0.1:9092')
for _ in range(100):
producer.send('foobar', b'some_message_bytes')
from kafka import KafkaConsumer
consumer = KafkaConsumer('foobar')
for msg in