首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >不知道如何解决火花放电的非确定性函数问题

不知道如何解决火花放电的非确定性函数问题
EN

Stack Overflow用户
提问于 2021-11-11 19:22:46
回答 1查看 565关注 0票数 0

我正在遵循雪花模式。我有一个事实表和一个红色仓库的9维表。我的ETL工作流程是用was胶水完成的。我将问题简化为两个表(一个维度表和一个事实)。

我遇到的问题是,每次运行ETL时,都会得到不同的映射结果,并认为这与我将physical_attribute表加入到animal表的方式有关。

我从名为cat的数据目录中的源表中提取数据。

代码语言:javascript
运行
复制
cat_df = glueContext.create_dynamic_frame.from_catalog(database="animal_parquet", table_name="cat").toDF()

接下来,我设置了映射函数--我做了两种方法。下面是在dict中映射的数据:

代码语言:javascript
运行
复制
animal_color_map = {
    "r": "red",
    "w": "white",
    "bl": "black",
    "br": "brown",
}

  1. F.create_map,如果我能解决其他问题,这就是我要做的事情。

代码语言:javascript
运行
复制
animal_color_mapper = F.create_map(
    [F.lit(x) for x in chain(*animal_color_map.items())])

  1. F.udf

代码语言:javascript
运行
复制
@F.udf
def animal_color_mapper2(x):
    return animal_color_map.get(x, "NOT SPECIFIED")

现在开始执行数据帧的创建,将数据从维度表连接回事实数据表,并进行评估。

**我相信这个问题来自于一个非确定性的类似于pyspark的函数,但是在文档中找不到将它们转化为确定性函数的答案**

代码语言:javascript
运行
复制
physical_attribute_df = (
    cat_df.select(

        F.coalesce(
            animal_color_mapper[cat_df.fur_color],
            F.lit("NOT SPECIFIED")
        ).cast('string').alias('color'),  # color
        
        F.coalesce(
            animal_color_mapper2(cat_df.fur_color),
            F.lit("NOT SPECIFIED")
        ).cast('string').alias('color2'),  # color2
            
        cat_df.zoo_id.alias('_zoo_id'),

    ).groupBy(
        'color',
        'color2',
    ).agg(
        F.collect_list('_zoo_id').alias('_zoo_ids')
    ).coalesce(1).withColumn(
        'id', F.monotonically_increasing_id() # id
    ).withColumn(
        'created_date', F.current_timestamp() # create_date
    ).withColumn(
        'last_updated', F.current_timestamp() # last_updated
    )

)

此时的这张表如下所示:

代码语言:javascript
运行
复制
+------------+-------------+--------------------+---+--------------------+--------------------+
|       color|       color2|            _app_ids| id|        created_date|        last_updated|
+------------+-------------+--------------------+---+--------------------+--------------------+
|       black|        black|[10447643, 104525...|  0|2021-11-11 18:38:...|2021-11-11 18:38:...|
|       brown|        brown|[10450650, 104551...|  1|2021-11-11 18:38:...|2021-11-11 18:38:...|
|       white|        white|[10445953, 104470...|  2|2021-11-11 18:38:...|2021-11-11 18:38:...|
|         red|          red|[10453690, 104547...|  3|2021-11-11 18:38:...|2021-11-11 18:38:...|
+------------+-------------+--------------------+---+--------------------+--------------------+

现在请注意groupbyagg,因为它用于在数据爆炸后将数据连接到事实表。

代码语言:javascript
运行
复制
_physical_attributes_df = physical_attribute_df.select(
    F.explode(physical_attribute_df._zoo_ids).alias('zoo_id'),
    physical_attribute_df.id.alias('physical_attribute_id'),
)

此时的这张表如下所示:

代码语言:javascript
运行
复制
+--------+---------------------+
|  zoo_id|physical_attribute_id|
+--------+---------------------+
|10447643|                    0|
|10452584|                    0|
|10453651|                    0|
|10448127|                    0|
|10444833|                    0|
+--------+---------------------+

既然有了这个查找表,就会删除_zoo_ids列。

代码语言:javascript
运行
复制
physical_attribute_df = physical_attribute_df.drop('_zoo_ids')

此时的这张表如下所示:

代码语言:javascript
运行
复制
+------------+-------------+---+--------------------+--------------------+
|       color|       color2| id|        created_date|        last_updated|
+------------+-------------+---+--------------------+--------------------+
|       black|        black|  0|2021-11-11 18:38:...|2021-11-11 18:38:...|
|       brown|        brown|  1|2021-11-11 18:38:...|2021-11-11 18:38:...|
|       white|        white|  2|2021-11-11 18:38:...|2021-11-11 18:38:...|
|         red|          red|  3|2021-11-11 18:38:...|2021-11-11 18:38:...|
+------------+-------------+---+--------------------+--------------------+

这是我最后一步。将表连接到事实表

代码语言:javascript
运行
复制
animal_df = (

    cat_df.join(
        _physical_attributes_ids_df,
        _physical_attributes_ids_df.zoo_id == cat_df.zoo_id,
        'left'
    ).select(

        F.current_timestamp().alias('created_date'), # create_date
        F.current_timestamp().alias('last_updated'), # last_updated

        cat_df.zoo_id.alias('zoo_id'),  # zoo_id
   
        _physical_attributes_ids_df.physical_attributes_id.alias('physical_attributes_id'), # physical_attributes_id
  
    ).coalesce(1).withColumn(
        'id', F.monotonically_increasing_id() # id
    )
)

当我验证时,对于我在映射中所做的每个ETL运行,数据都是不同的。

EN

回答 1

Stack Overflow用户

发布于 2021-11-11 21:05:48

问题是,'id', F.monotonically_increasing_id() # idcoalesce之后是不确定的(谁知道),因为coalesce导致分区合并而没有排序保证。

在这种情况下,显式排序将解决这个问题。

我过去用来解决这个问题的是我的表的标识字段--在本例中是“F.row_number().over(Window.orderBy(physical_attribute_df.color))中的颜色”

代码语言:javascript
运行
复制
physical_attribute_df = (
    cat_df.select(

        F.coalesce(
            animal_color_mapper[cat_df.fur_color],
            F.lit("NOT SPECIFIED")
        ).cast('string').alias('color'),  # color
       
        cat_df.zoo_id.alias('_zoo_id'),

    ).groupBy(
        'color',
        'color2',
    ).agg(
        F.collect_list('_zoo_id').alias('_zoo_ids')
    )

physical_attribute_df = (
    physical_attribute_df.coalesce(1).withColumn(
        'id',
        F.row_number().over(Window.orderBy(physical_attribute_df.color))
    ).withColumn(
        'created_date', F.current_timestamp() # create_date
    ).withColumn(
        'last_updated', F.current_timestamp() # last_updated
    )
)

_physical_attributes_df = physical_attribute_df.select(
    F.explode(physical_attribute_df._zoo_ids).alias('zoo_id'),
    physical_attribute_df.id.alias('physical_attribute_id'),
)


physical_attribute_df = physical_attribute_df.drop('_zoo_ids')

animal_df = (

    cat_df.join(
        _physical_attributes_ids_df,
        _physical_attributes_ids_df.zoo_id == cat_df.zoo_id,
        'left'
    ).select(

        F.current_timestamp().alias('created_date'), # create_date
        F.current_timestamp().alias('last_updated'), # last_updated

        cat_df.zoo_id.alias('zoo_id'),  # zoo_id
   
        _physical_attributes_ids_df.physical_attributes_id.alias('physical_attributes_id'), # physical_attributes_id

    )
)


animal_df = (

    animal_df.coalesce(1).withColumn(
        'id',
         F.row_number().over(Window.orderBy(animal_df.zoo_id))
    )
)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69933804

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档