我从Kafka主题中获取数据,并以Deltalake(拼花)格式存储它们。我希望找到在特定的日子中获取的消息的数量。
My thought :我想使用spark读取存储数据的目录,并在特定的一天使用".parquet“的文件进行计数。这会返回一个计数,但我不确定这是否正确。
这条路对吗?是否还有其他方法来计算某一天(或某段时间)从卡夫卡主题中获取的信息数量?
我正在使用Kafka-Net nuget软件包为卡夫卡的生产和消费做一个基本的POC。
然而,我遇到的问题是,它发布给主题的消息似乎没有任何时间戳(在Kafka工具最新版本中查看)。这是因为Kafka-Net软件包还没有更新以支持在新版本的Kafka中处理时间戳的方式吗?我需要转换到使用融合卡夫卡吗?
消息以正确的偏移量和有效负载附加到主题中,它们只有一个空白的时间戳。
这是我的密码
using System;
using System.Collections.Generic;
using System.Configuration;
using KafkaNet;
using KafkaNet
我正在尝试编写一个Pyflink应用程序来测量延迟和吞吐量。我的数据是来自kafka主题的json对象,并使用SimpleStringSchema-class加载到DataStream中进行反序列化。在这篇文章的答案()之后,我让Kafka的制作人在事件中添加了时间戳,但我现在很难理解我如何才能访问这些时间戳。我知道前面提到的这篇文章提供了一个解决这个问题的方法,但我很难把这个例子移植到python上,因为它的文档/例子很少。
另一篇文章()建议我应该定义一个ProcessFunction。然而,在这里我也不确定语法。我可能不得不做这样的事情(摘自:)
class MyProcessFunct
在Kafka(0.11.0.1)流中,一个演示应用程序
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message value
我们可以为kafka ProducerRecord构造函数指定时间戳。
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
这个时间放大器的目的是什么?它和卡夫卡经纪人的信息一起传递吗?