首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >具有周期性更新静态数据集的结构化流

具有周期性更新静态数据集的结构化流
EN

Stack Overflow用户
提问于 2017-12-13 13:13:13
回答 1查看 2.1K关注 0票数 9

流和静态数据集的合并是结构化流的一个很好的特点。但是每一批数据集都会从数据源中刷新。由于这些源并不总是动态的,因此在指定的时间段(或批数)缓存静态数据集将是一种性能增益。在指定的批处理期间/批数之后,数据集将从源重新加载,否则将从缓存中检索。

在星火流中,我使用缓存的数据集来管理它,并在指定数量的批处理运行后取消它的持久化,但是由于某种原因,这不再适用于结构化流。

有什么建议可以用结构化流来实现吗?

EN

回答 1

Stack Overflow用户

发布于 2021-03-04 08:41:58

我为另一个问题Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically开发了一个解决方案,它也可能有助于解决您的问题:

您可以通过使用结构化流提供的流调度功能来做到这一点。

通过创建定期刷新静态数据集的人工“速率”流,可以触发静态数据集的刷新(未持久化->加载->持久化)。其想法是:

  1. 最初加载staticDataframe并保持为var
  2. 定义一个刷新静态数据数据的方法。
  3. 使用在所需时间间隔(例如1小时)触发的“速率”流
  4. 读取实际流数据并使用静态Dataframe执行连接操作
  5. 在该速率流中,有一个调用刷新方法的foreachBatch接收器

以下代码在Spark3.0.1、Scala2.12.10和Delta0.7.0中运行良好。

代码语言:javascript
运行
复制
  // 1. Load the staticDataframe initially and keep as `var`
  var staticDf = spark.read.format("delta").load(deltaPath)
  staticDf.persist()

  //  2. Define a method that refreshes the static Dataframe
  def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
    staticDf.unpersist()
    staticDf = spark.read.format("delta").load(deltaPath)
    staticDf.persist()
    println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
  }

  // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  val staticRefreshStream = spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .option("numPartitions", 1)
    .load()
    .selectExpr("CAST(value as LONG) as trigger")
    .as[Long]

  // 4. Read actual streaming data and perform join operation with static Dataframe
  // As an example I used Kafka as a streaming source
  val streamingDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as id", "offset as streamingField")

  val joinDf = streamingDf.join(staticDf, "id")

  val query = joinDf.writeStream
    .format("console")
    .option("truncate", false)
    .option("checkpointLocation", "/path/to/sparkCheckpoint")
    .start()

  // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
  staticRefreshStream.writeStream
    .outputMode("append")
    .foreachBatch(foreachBatchMethod[Long] _)
    .queryName("RefreshStream")
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .start()

为了获得完整的示例,创建了delta表,如下所示:

代码语言:javascript
运行
复制
  val deltaPath = "file:///tmp/delta/table"

  import spark.implicits._
  val df = Seq(
    (1L, "static1"),
    (2L, "static2")
  ).toDF("id", "deltaField")

  df.write
    .mode(SaveMode.Overwrite)
    .format("delta")
    .save(deltaPath)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47793917

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档