使用Spark (1.6),我有一个文件来读取批量大小为2s的查找数据,但是文件每小时只被拷贝到目录中。
一旦有了一个新文件,它的内容就会被流读取,这就是我想要缓存到内存中的内容,并一直保存到读取新文件为止。
有另一个流,我想加入这个数据集,因此我想缓存。
这是用于星火流的批量查找数据的后续问题.
答案在updateStateByKey
中确实很好,但是我不知道如何处理KV对从查找文件中删除的情况,因为updateStateByKey
中的值序列一直在增长。此外,任何提示,如何使用mapWithState
将是很好的。
到目前为止,我尝试过这样做,但这些数据似乎并没有持久化:
val dictionaryStream = ssc.textFileStream("/my/dir")
dictionaryStream.foreachRDD{x =>
if (!x.partitions.isEmpty) {
x.unpersist(true)
x.persist()
}
}
发布于 2016-05-31 17:08:42
DStreams
可以直接使用persist
方法持久化,该方法将流中的每个RDD持久化:
dictionaryStream.persist
根据正式文件,这是自动申请的
基于窗口的操作(如
reduceByWindow
和reduceByKeyAndWindow
)和基于状态的操作(如updateStateByKey
)
因此,在您的情况下,不应该需要显式缓存。此外,也不需要手动不持久化。再次引用医生们的话:
默认情况下,由DStream转换生成的所有输入数据和持久化RDDs都会自动清除。
并根据流水线中使用的转换自动调整保留期。
关于mapWithState
,您必须提供一个StateSpec
。一个最小的例子需要一个函数,该函数采用key
、当前value
的Option
和以前的状态。假设您有DStream[(String, Long)]
,并且希望记录到目前为止的最大值:
val state = StateSpec.function(
(key: String, current: Option[Double], state: State[Double]) => {
val max = Math.max(
current.getOrElse(Double.MinValue),
state.getOption.getOrElse(Double.MinValue)
)
state.update(max)
(key, max)
}
)
val inputStream: DStream[(String, Double)] = ???
inputStream.mapWithState(state).print()
还可以提供初始状态、超时间隔和捕获当前批处理时间。最后两种方法可以用来实现对有一段时间没有更新的密钥的删除策略。
https://stackoverflow.com/questions/37550054
复制相似问题