首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在Spark结构化流中使用流数据更新静态数据

如何在Spark结构化流中使用流数据更新静态数据
EN

Stack Overflow用户
提问于 2018-10-26 08:41:56
回答 2查看 2.4K关注 0票数 5

我有一个包含数百万行的静态DataFrame,如下所示。

静态DataFrame

代码语言:javascript
运行
复制
--------------
id|time_stamp|
--------------
|1|1540527851|
|2|1540525602|
|3|1530529187|
|4|1520529185|
|5|1510529182|
|6|1578945709|
--------------

现在,在每一批中,都会形成一个流DataFrame,其中包含id和更新后的time_stamp,如下所示。

第一批:

代码语言:javascript
运行
复制
--------------
id|time_stamp|
--------------
|1|1540527888|
|2|1540525999|
|3|1530529784|
--------------

现在,在每个批处理中,我都希望使用流数据访问的更新值更新静态DataFrame,如下所示。怎么做?

第一批之后的静态DF:

代码语言:javascript
运行
复制
--------------
id|time_stamp|
--------------
|1|1540527888|
|2|1540525999|
|3|1530529784|
|4|1520529185|
|5|1510529182|
|6|1578945709|
--------------

我已经尝试过,除了()、union()或'left_anti‘连接。但结构化流媒体似乎不支持这样的操作,

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-12-26 14:24:16

因此,我通过Spark2.4.0 AddBatch方法解决了这个问题,该方法将流数据转化为小型批处理数据。但是对于<2.4.0版本来说,这仍然是一个令人头痛的问题。

票数 1
EN

Stack Overflow用户

发布于 2020-07-19 03:02:26

我也有类似的问题。下面是我申请更新静态数据的foreachBatch。我想知道如何返回用foreachBatch完成的更新的df。

代码语言:javascript
运行
复制
def update_reference_df(df, static_df):
    query: StreamingQuery = df \
        .writeStream \
        .outputMode("append") \
        .format("memory") \
        .foreachBatch(lambda batch_df, batchId: update_static_df(batch_df, static_df)) \
        .start()
    return query

def update_static_df(batch_df, static_df):
    df1: DataFrame = static_df.union(batch_df.join(static_df,
                                                 (batch_df.SITE == static_df.SITE)
                                                 "left_anti"))

    return df1
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53004818

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档