流和静态数据集的合并是结构化流的一个很好的特点。但是每一批数据集都会从数据源中刷新。由于这些源并不总是动态的,因此在指定的时间段(或批数)缓存静态数据集将是一种性能增益。在指定的批处理期间/批数之后,数据集将从源重新加载,否则将从缓存中检索。
在星火流中,我使用缓存的数据集来管理它,并在指定数量的批处理运行后取消它的持久化,但是由于某种原因,这不再适用于结构化流。
有什么建议可以用结构化流来实现吗?
发布于 2021-03-04 08:41:58
我为另一个问题Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically开发了一个解决方案,它也可能有助于解决您的问题:
您可以通过使用结构化流提供的流调度功能来做到这一点。
通过创建定期刷新静态数据集的人工“速率”流,可以触发静态数据集的刷新(未持久化->加载->持久化)。其想法是:
var
foreachBatch
接收器以下代码在Spark3.0.1、Scala2.12.10和Delta0.7.0中运行良好。
// 1. Load the staticDataframe initially and keep as `var`
var staticDf = spark.read.format("delta").load(deltaPath)
staticDf.persist()
// 2. Define a method that refreshes the static Dataframe
def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
staticDf.unpersist()
staticDf = spark.read.format("delta").load(deltaPath)
staticDf.persist()
println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
}
// 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
val staticRefreshStream = spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.option("numPartitions", 1)
.load()
.selectExpr("CAST(value as LONG) as trigger")
.as[Long]
// 4. Read actual streaming data and perform join operation with static Dataframe
// As an example I used Kafka as a streaming source
val streamingDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING) as id", "offset as streamingField")
val joinDf = streamingDf.join(staticDf, "id")
val query = joinDf.writeStream
.format("console")
.option("truncate", false)
.option("checkpointLocation", "/path/to/sparkCheckpoint")
.start()
// 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
staticRefreshStream.writeStream
.outputMode("append")
.foreachBatch(foreachBatchMethod[Long] _)
.queryName("RefreshStream")
.trigger(Trigger.ProcessingTime("5 seconds"))
.start()
为了获得完整的示例,创建了delta表,如下所示:
val deltaPath = "file:///tmp/delta/table"
import spark.implicits._
val df = Seq(
(1L, "static1"),
(2L, "static2")
).toDF("id", "deltaField")
df.write
.mode(SaveMode.Overwrite)
.format("delta")
.save(deltaPath)
https://stackoverflow.com/questions/47793917
复制相似问题