我想要在火花流的foreach批处理中创建和更新一个数据帧,并在下面的foreach批处理迭代器之外访问它,这就是我在火花结构化流中试图做的事情。是否可以从外部访问为火花结构化流中的每一批创建或更新的数据帧?
// assign a empty data frame
var df1: Option[DataFrame] = None: Option[DataFrame]
validatedFinalDf.writeStream
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
println("I am here printing batchDF")
batchDF.withColumn("extra", lit("batch-df")).show()
// un presist the data frame if it has data
if (df1 != None) {
df1.get.unpersist()
}
// assign data to data frame
df1 = Some(batchDF.withColumn("extra", lit("batch-df-dim")))
}.start()
// access data frame outside foreach not working stale data ....
if (df1 != None) {
df1.get.show()
}
spark.streams.awaitAnyTermination()
我甚至不能访问临时表,这些表是从外部为每个批创建的。即使是在foreach批内更新的数据框架也显示来自外部foreach批的陈旧数据。
谢谢斯里
发布于 2021-03-29 18:44:01
一个小的工作围绕着这个技巧,将批处理数据帧转换成内存流中的数据帧,而内存流被外部的foreach批处理访问。
case class StreamData(
account_id: String,
run_dt: String,
trxn_dt: String,
trxn_amt: String)
import spark.implicits._
implicit val ctx = spark.sqlContext
val streamDataSource = MemoryStream[StreamData]
source.writeStream
.foreachBatch { (batchDf: DataFrame, batchId: Long) =>
val batchDs = batchDf.as[StreamData]
val obj = batchDs
.map(x => StreamData(x.account_id,x.run_dt,x.trxn_dt, x.trxn_amt))
.collect()
streamDataSource.addData(obj)
}
.start()
val datasetStreaming: Dataset[StreamData] = streamDataSource.toDS()
println("This is the streaming dataset:")
datasetStreaming
.writeStream
.format("console")
.outputMode("append")
.start()
spark.streams.awaitAnyTermination()
发布于 2021-03-26 08:55:28
集合上的foreachBatch iterates
,如果我没有弄错的话,希望使用effectful
操作(如写、打印等)。然而,您在体内所做的是将一个临时结果分配给外部var
。
所以问题是:
last
的var
数据。start
操作,如doc 这里中所示。DF是不变的。如果您想在使用DataFrame函数(例如,withColumn
)或其他转换API时更改您的mapping
,并返回新的DF。
当您对结果感到满意时,才使用foreach / foreachBatch调用持久化
https://stackoverflow.com/questions/66801505
复制相似问题