我在Kafka NodeJS中使用了一个简单的消费者/生产者。我的生产者正在发送消息,我很容易在消费者中获得这些消息。生产者和消费者的代码如下。在消费者中,我期望offset.fetch()给我今天的所有偏移量it,但它没有。KIndly指导我如何从这个方法中获得结果,并且还提到了一个方法,它可以直接给出任何分区主题中的最后一个偏移量。我还想知道如何过滤即将到来的流中的偏移量。例如:如果我只想在我的消费者中获得最后20条消息?
我的制片人是:
var kafka = require('kafka-node');
var Producer = kafka.Producer;
va
我想写一个函数,一旦主题中的最后一条消息被读取,它就会调用回调。
function getCurrentMessages(kafka, topic, cb_done){
// Start consuming from the beginning
var consumer = new kafka.Consumer(new kafka.Client(), [{topic: topic, offset: 0}], {fromOffset: true});
consumer.on('message', function(msg){
// Do something w
KafkaProperties java文档:
/**
* What to do when there is no initial offset in Kafka or if the current offset
* does not exist any more on the server.
*/
private String autoOffsetReset;
我有hello掌声,其中包含application.properties
spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset
我在跟踪这个github回购
https://github.com/hannesstockner/kafka-connect-elasticsearch/
我正在尝试将数据从文件源读取到弹性搜索。
当我运行standalone.sh脚本时会收到一个错误
Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding messages, 1 left ({ProducerRecord(topic=recipes, partitio
我刚开始接触加缪。我计划每小时运行一次camus作业。我们每小时会收到大约80000000条消息(平均大小约为4KB)。
如何设置以下属性:
# max historical time that will be pulled from each partition based on event timestamp
kafka.max.pull.hrs=1
# events with a timestamp older than this will be discarded.
kafka.max.historical.days=3
我不能清楚地看清这些配置。是否应该将天作为1,将小时属性作为2?
这是一个基于Armando Ballaci提供的答案的"Where do zookeeper store Kafka cluster and related information?“的后续问题。 现在很明显,消费者偏移量存储在Kafka集群中的一个名为__consumer_offsets的特殊主题中。很好,我只是想知道这些偏移量的检索是如何工作的。 主题不像RDBS,我们可以在RDBS上根据某个谓词查询任意数据。例如,如果数据存储在RDBMS中,下面这样的查询可能会获得某个消费者组的特定消费者的特定主题分区的消费者偏移量。 select consumer_offset__read,
我引用了,并尝试使用重置kafka streams应用程序。但即使我做了所有的前提条件和必要的步骤,当我检查重置主题offset by /opt/cloudera/parcels/KAFKA/lib/kafka/bin/kafka-consumer-groups.sh时,CURRENT-OFFSET仍然不是0。
有人能帮上忙吗?会非常感谢的。