首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >外部批处理的火花访问数据帧(结构化流)

外部批处理的火花访问数据帧(结构化流)
EN

Stack Overflow用户
提问于 2021-03-25 14:22:48
回答 2查看 1.3K关注 0票数 0

我想要在火花流的foreach批处理中创建和更新一个数据帧,并在下面的foreach批处理迭代器之外访问它,这就是我在火花结构化流中试图做的事情。是否可以从外部访问为火花结构化流中的每一批创建或更新的数据帧?

代码语言:javascript
运行
复制
// assign a empty data frame
var df1: Option[DataFrame] = None: Option[DataFrame]

validatedFinalDf.writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    println("I am here printing batchDF")
    batchDF.withColumn("extra", lit("batch-df")).show()

    // un presist the data frame if it has data
    if (df1 != None) {
      df1.get.unpersist()
    }

    // assign data to data frame
    df1 = Some(batchDF.withColumn("extra", lit("batch-df-dim")))
}.start()

// access data frame outside foreach not working stale data ....
if (df1 != None) {
      df1.get.show()
}

spark.streams.awaitAnyTermination()

我甚至不能访问临时表,这些表是从外部为每个批创建的。即使是在foreach批内更新的数据框架也显示来自外部foreach批的陈旧数据。

谢谢斯里

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-03-29 18:44:01

一个小的工作围绕着这个技巧,将批处理数据帧转换成内存流中的数据帧,而内存流被外部的foreach批处理访问。

代码语言:javascript
运行
复制
case class StreamData(
                      account_id: String,
                      run_dt: String,
                      trxn_dt: String,
                      trxn_amt: String)

    import spark.implicits._
    implicit val ctx = spark.sqlContext
    val streamDataSource = MemoryStream[StreamData]

    source.writeStream
      .foreachBatch { (batchDf: DataFrame, batchId: Long) =>
        val batchDs = batchDf.as[StreamData]
        val obj = batchDs
          .map(x => StreamData(x.account_id,x.run_dt,x.trxn_dt, x.trxn_amt))
          .collect()

        streamDataSource.addData(obj)
      }
      .start()

    val datasetStreaming: Dataset[StreamData] = streamDataSource.toDS()

    println("This is the streaming dataset:")
    datasetStreaming
      .writeStream
      .format("console")
      .outputMode("append")
      .start()

spark.streams.awaitAnyTermination()
票数 1
EN

Stack Overflow用户

发布于 2021-03-26 08:55:28

集合上的foreachBatch iterates,如果我没有弄错的话,希望使用effectful操作(如写、打印等)。然而,您在体内所做的是将一个临时结果分配给外部var

所以问题是:

  • 从概念上讲,这是错误的,因为即使它工作得很好,最终也只会得到分配给您的lastvar数据。
  • 我认为您需要start操作,如doc 这里中所示。

DF是不变的。如果您想在使用DataFrame函数(例如,withColumn)或其他转换API时更改您的mapping,并返回新的DF。

当您对结果感到满意时,才使用foreach / foreachBatch调用持久化

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66801505

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档