首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >火花流:跨批缓存DStream结果

火花流:跨批缓存DStream结果
EN

Stack Overflow用户
提问于 2016-05-31 15:38:31
回答 1查看 1.7K关注 0票数 2

使用Spark (1.6),我有一个文件来读取批量大小为2s的查找数据,但是文件每小时只被拷贝到目录中。

一旦有了一个新文件,它的内容就会被流读取,这就是我想要缓存到内存中的内容,并一直保存到读取新文件为止。

有另一个流,我想加入这个数据集,因此我想缓存。

这是用于星火流的批量查找数据的后续问题.

答案在updateStateByKey中确实很好,但是我不知道如何处理KV对从查找文件中删除的情况,因为updateStateByKey中的值序列一直在增长。此外,任何提示,如何使用mapWithState将是很好的。

到目前为止,我尝试过这样做,但这些数据似乎并没有持久化:

代码语言:javascript
运行
复制
val dictionaryStream = ssc.textFileStream("/my/dir")
dictionaryStream.foreachRDD{x => 
  if (!x.partitions.isEmpty) {
    x.unpersist(true)
    x.persist()
  }
}
EN

回答 1

Stack Overflow用户

发布于 2016-05-31 17:08:42

DStreams可以直接使用persist方法持久化,该方法将流中的每个RDD持久化:

代码语言:javascript
运行
复制
dictionaryStream.persist

根据正式文件,这是自动申请的

基于窗口的操作(如reduceByWindowreduceByKeyAndWindow )和基于状态的操作(如updateStateByKey )

因此,在您的情况下,不应该需要显式缓存。此外,也不需要手动不持久化。再次引用医生们的话:

默认情况下,由DStream转换生成的所有输入数据和持久化RDDs都会自动清除。

并根据流水线中使用的转换自动调整保留期。

关于mapWithState,您必须提供一个StateSpec。一个最小的例子需要一个函数,该函数采用key、当前valueOption和以前的状态。假设您有DStream[(String, Long)],并且希望记录到目前为止的最大值:

代码语言:javascript
运行
复制
val state = StateSpec.function(
  (key: String, current: Option[Double], state: State[Double]) => {
    val max  = Math.max(
      current.getOrElse(Double.MinValue),
      state.getOption.getOrElse(Double.MinValue)
    )
    state.update(max)
    (key, max)
  }
)

val inputStream: DStream[(String, Double)] = ??? 
inputStream.mapWithState(state).print()

还可以提供初始状态、超时间隔和捕获当前批处理时间。最后两种方法可以用来实现对有一段时间没有更新的密钥的删除策略。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/37550054

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档