在我的应用程序中,我将对我的Kafka集群执行某种健康检查。
目前,我创建了一个TopicMetadataRequest来检测死掉的代理:
Future {
// this will fail if Kafka is unavailable
consumer.send(new TopicMetadataRequest(Seq("health-check-topic"), 1))
}
不幸的是,由于集群拓扑/设置的原因,此调用会产生巨大的网络流量。
有没有更好的方法来检查kafka经纪人?我需要的是像true/false指示器这样简单的东西。
这是我的Kafka流代码,它使用滑动窗口对时间窗口中的所有整数数据进行求和。
public class KafkaWindowingLIS {
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(StreamsConfig.APPLICATION_ID_CON
我正在尝试计算基于事件时间的来自Kafka主题的每分钟传入事件的速率。我使用的是1分钟的TumblingEventTimeWindows。下面给出了代码片段。我观察到,如果我没有接收到特定窗口的任何事件,例如从2.34到2.35,则上一个2.33到2.34的窗口不会接近。我理解2.33到2.34窗口丢失数据的风险(可能是由于系统故障,更大的Kafka延迟等),但我不能无限期地等待。我需要在等待一段时间后关闭此窗口,后续窗口可以在系统恢复后继续。我如何才能做到这一点? 我正在尝试下面的代码,它给出了连续事件流每分钟的事件计数。 StreamExecutionEnvironment ex
我有一些(非常大的)元组列表,它们来自包含id、start_time和end_time的数据库。
我也有一个定期间隔和排序的时间列表(这些都是datetime对象)。
我基本上需要循环这些时间,并找到所有元组的时间在他们的范围内。
我想知道最有效的方法是什么。想到的第一个想法是这样的(伪代码):
for time in times:
for tuple in tuples:
if tuple.start_time <= time <= tuple.end_time:
# add tuple to some_other_list
偶尔,我的kafka streams应用程序会因以下错误而死亡:
[-StreamThread-4] o.a.k.s.p.i.AssignedStreamsTasks : Failed to commit stream task 0_9 due to the
following error:
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before successfully
committing offsets {my-topic-9=OffsetAndMetadata{offset=5
我有一个spark流上下文从kafka读取事件数据的时间间隔为10秒。我想用postgres表中的现有数据来补充这个事件数据。
我可以用如下内容加载postgres表:
val sqlContext = new SQLContext(sc)
val data = sqlContext.load("jdbc", Map(
"url" -> url,
"dbtable" -> query))
..。
val broadcasted = sc.broadcast(data.collect())
然后我可以像这样跨过它:
val db