我有一个包含数百万行的静态DataFrame
,如下所示。
静态DataFrame
:
--------------
id|time_stamp|
--------------
|1|1540527851|
|2|1540525602|
|3|1530529187|
|4|1520529185|
|5|1510529182|
|6|1578945709|
--------------
现在,在每一批中,都会形成一个流DataFrame
,其中包含id和更新后的time_stamp,如下所示。
第一批:
--------------
id|time_stamp|
--------------
|1|1540527888|
|2|1540525999|
|3|1530529784|
--------------
现在,在每个批处理中,我都希望使用流数据访问的更新值更新静态DataFrame,如下所示。怎么做?
第一批之后的静态DF:
--------------
id|time_stamp|
--------------
|1|1540527888|
|2|1540525999|
|3|1530529784|
|4|1520529185|
|5|1510529182|
|6|1578945709|
--------------
我已经尝试过,除了()、union()或'left_anti‘连接。但结构化流媒体似乎不支持这样的操作,。
发布于 2018-12-26 14:24:16
因此,我通过Spark2.4.0 AddBatch方法解决了这个问题,该方法将流数据转化为小型批处理数据。但是对于<2.4.0版本来说,这仍然是一个令人头痛的问题。
发布于 2020-07-19 03:02:26
我也有类似的问题。下面是我申请更新静态数据的foreachBatch。我想知道如何返回用foreachBatch完成的更新的df。
def update_reference_df(df, static_df):
query: StreamingQuery = df \
.writeStream \
.outputMode("append") \
.format("memory") \
.foreachBatch(lambda batch_df, batchId: update_static_df(batch_df, static_df)) \
.start()
return query
def update_static_df(batch_df, static_df):
df1: DataFrame = static_df.union(batch_df.join(static_df,
(batch_df.SITE == static_df.SITE)
"left_anti"))
return df1
https://stackoverflow.com/questions/53004818
复制相似问题