首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >当使用Spark结构化流时,如何才能获得当前批处理的聚合结果,如Spark流?

当使用Spark结构化流时,如何才能获得当前批处理的聚合结果,如Spark流?
EN

Stack Overflow用户
提问于 2018-01-23 15:12:02
回答 3查看 1.6K关注 0票数 3

星火结构流( SSS )和星火流(SS)的一个很大的区别是SSS可以利用状态。它可以存储以前批的聚合结果,并将当前结果与先前的结果一起应用。因此,它可以从输入流的一开始就得到真正的聚合结果。

但有一种情况是,我们不希望得到与以前的statestore值合并的最终结果。我们只想得到(输出)当前批处理的聚合结果。在平台和框架方面,我们不能回到党卫军。

因此,我的问题是,在SSS中,是否仍然可以像SS一样,获得当前批产品的受害结果?

以word计数应用为例,在火花结构流指南:https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html中给出了该应用程序。

当有一个"cat cat“在一个批,我的预期输出是cat|2

当下一批"cat“出现时,我的预期输出是cat|1

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-01-23 15:26:17

在SSS中还能像SS一样获得当前批次的得失结果吗?

实现目标的一种方法是使用mapGroupsWithState控制状态存储,并将其作为一种实际上什么都不做的退化存储。例如:

代码语言:javascript
运行
复制
val spark =
  SparkSession.builder().appName("bla").master("local[*]").getOrCreate()

import spark.implicits._

val socketDF = spark.readStream
  .format("socket")
  .option("host", "127.0.0.1")
  .option("port", 9999)
  .load()

socketDF
  .as[String]
  .map { str =>
    val Array(key, value) = str.split(';')
    (key, value)
  }
  .groupByKey { case (key, _) => key }
  .mapGroupsWithState((str: String,
                       tuples: Iterator[(String, String)],
                       value: GroupState[Int]) => {
    (str, tuples.size)
  })
  .writeStream
  .outputMode(OutputMode.Update())
  .format("console")
  .start()
  .awaitTermination()

假设我有一个key;value格式的值流,这将只使用mapGroupsWithState作为一个通过存储,而不是实际积累任何结果。这样,对于每个批处理,您将得到一个干净的状态,没有以前聚合过的数据。

票数 2
EN

Stack Overflow用户

发布于 2019-01-22 14:55:19

在Spark2.4中,似乎有一种更容易实现这一目标的方法,即使用

foreachBatch

操作,因为您可以在火花文档中阅读。

不过,我使用的是2.3版本的星火,并没有设法解决这个问题。

票数 1
EN

Stack Overflow用户

发布于 2018-01-24 21:27:47

如何使用附加输出模式?

追加模式-只有添加到结果表中的新行,因为最后一个触发器将写入外部存储。这仅适用于预期结果表中的现有行不会更改的查询。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48404960

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档