星火结构流( SSS )和星火流(SS)的一个很大的区别是SSS可以利用状态。它可以存储以前批的聚合结果,并将当前结果与先前的结果一起应用。因此,它可以从输入流的一开始就得到真正的聚合结果。
但有一种情况是,我们不希望得到与以前的statestore值合并的最终结果。我们只想得到(输出)当前批处理的聚合结果。在平台和框架方面,我们不能回到党卫军。
因此,我的问题是,在SSS中,是否仍然可以像SS一样,获得当前批产品的受害结果?
以word计数应用为例,在火花结构流指南:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html中给出了该应用程序。
当有一个"cat cat
“在一个批,我的预期输出是cat|2
。
当下一批"cat
“出现时,我的预期输出是cat|1
发布于 2018-01-23 15:26:17
在SSS中还能像SS一样获得当前批次的得失结果吗?
实现目标的一种方法是使用mapGroupsWithState
控制状态存储,并将其作为一种实际上什么都不做的退化存储。例如:
val spark =
SparkSession.builder().appName("bla").master("local[*]").getOrCreate()
import spark.implicits._
val socketDF = spark.readStream
.format("socket")
.option("host", "127.0.0.1")
.option("port", 9999)
.load()
socketDF
.as[String]
.map { str =>
val Array(key, value) = str.split(';')
(key, value)
}
.groupByKey { case (key, _) => key }
.mapGroupsWithState((str: String,
tuples: Iterator[(String, String)],
value: GroupState[Int]) => {
(str, tuples.size)
})
.writeStream
.outputMode(OutputMode.Update())
.format("console")
.start()
.awaitTermination()
假设我有一个key;value
格式的值流,这将只使用mapGroupsWithState
作为一个通过存储,而不是实际积累任何结果。这样,对于每个批处理,您将得到一个干净的状态,没有以前聚合过的数据。
发布于 2019-01-22 14:55:19
在Spark2.4中,似乎有一种更容易实现这一目标的方法,即使用
foreachBatch
操作,因为您可以在火花文档中阅读。
不过,我使用的是2.3版本的星火,并没有设法解决这个问题。
发布于 2018-01-24 21:27:47
如何使用附加输出模式?
追加模式-只有添加到结果表中的新行,因为最后一个触发器将写入外部存储。这仅适用于预期结果表中的现有行不会更改的查询。
https://stackoverflow.com/questions/48404960
复制相似问题