我正在遵循雪花模式。我有一个事实表和一个红色仓库的9维表。我的ETL工作流程是用was胶水完成的。我将问题简化为两个表(一个维度表和一个事实)。
我遇到的问题是,每次运行ETL时,都会得到不同的映射结果,并认为这与我将physical_attribute表加入到animal表的方式有关。
我从名为cat的数据目录中的源表中提取数据。
cat_df = glueContext.create_dynamic_frame.from_catalog(database="animal_parquet", table_name="cat").toDF()接下来,我设置了映射函数--我做了两种方法。下面是在dict中映射的数据:
animal_color_map = {
"r": "red",
"w": "white",
"bl": "black",
"br": "brown",
}F.create_map,如果我能解决其他问题,这就是我要做的事情。animal_color_mapper = F.create_map(
[F.lit(x) for x in chain(*animal_color_map.items())])F.udf@F.udf
def animal_color_mapper2(x):
return animal_color_map.get(x, "NOT SPECIFIED")现在开始执行数据帧的创建,将数据从维度表连接回事实数据表,并进行评估。
**我相信这个问题来自于一个非确定性的类似于pyspark的函数,但是在文档中找不到将它们转化为确定性函数的答案**
physical_attribute_df = (
cat_df.select(
F.coalesce(
animal_color_mapper[cat_df.fur_color],
F.lit("NOT SPECIFIED")
).cast('string').alias('color'), # color
F.coalesce(
animal_color_mapper2(cat_df.fur_color),
F.lit("NOT SPECIFIED")
).cast('string').alias('color2'), # color2
cat_df.zoo_id.alias('_zoo_id'),
).groupBy(
'color',
'color2',
).agg(
F.collect_list('_zoo_id').alias('_zoo_ids')
).coalesce(1).withColumn(
'id', F.monotonically_increasing_id() # id
).withColumn(
'created_date', F.current_timestamp() # create_date
).withColumn(
'last_updated', F.current_timestamp() # last_updated
)
)此时的这张表如下所示:
+------------+-------------+--------------------+---+--------------------+--------------------+
| color| color2| _app_ids| id| created_date| last_updated|
+------------+-------------+--------------------+---+--------------------+--------------------+
| black| black|[10447643, 104525...| 0|2021-11-11 18:38:...|2021-11-11 18:38:...|
| brown| brown|[10450650, 104551...| 1|2021-11-11 18:38:...|2021-11-11 18:38:...|
| white| white|[10445953, 104470...| 2|2021-11-11 18:38:...|2021-11-11 18:38:...|
| red| red|[10453690, 104547...| 3|2021-11-11 18:38:...|2021-11-11 18:38:...|
+------------+-------------+--------------------+---+--------------------+--------------------+现在请注意groupby到agg,因为它用于在数据爆炸后将数据连接到事实表。
_physical_attributes_df = physical_attribute_df.select(
F.explode(physical_attribute_df._zoo_ids).alias('zoo_id'),
physical_attribute_df.id.alias('physical_attribute_id'),
)此时的这张表如下所示:
+--------+---------------------+
| zoo_id|physical_attribute_id|
+--------+---------------------+
|10447643| 0|
|10452584| 0|
|10453651| 0|
|10448127| 0|
|10444833| 0|
+--------+---------------------+既然有了这个查找表,就会删除_zoo_ids列。
physical_attribute_df = physical_attribute_df.drop('_zoo_ids')此时的这张表如下所示:
+------------+-------------+---+--------------------+--------------------+
| color| color2| id| created_date| last_updated|
+------------+-------------+---+--------------------+--------------------+
| black| black| 0|2021-11-11 18:38:...|2021-11-11 18:38:...|
| brown| brown| 1|2021-11-11 18:38:...|2021-11-11 18:38:...|
| white| white| 2|2021-11-11 18:38:...|2021-11-11 18:38:...|
| red| red| 3|2021-11-11 18:38:...|2021-11-11 18:38:...|
+------------+-------------+---+--------------------+--------------------+这是我最后一步。将表连接到事实表
animal_df = (
cat_df.join(
_physical_attributes_ids_df,
_physical_attributes_ids_df.zoo_id == cat_df.zoo_id,
'left'
).select(
F.current_timestamp().alias('created_date'), # create_date
F.current_timestamp().alias('last_updated'), # last_updated
cat_df.zoo_id.alias('zoo_id'), # zoo_id
_physical_attributes_ids_df.physical_attributes_id.alias('physical_attributes_id'), # physical_attributes_id
).coalesce(1).withColumn(
'id', F.monotonically_increasing_id() # id
)
)当我验证时,对于我在映射中所做的每个ETL运行,数据都是不同的。
发布于 2021-11-11 21:05:48
问题是,'id', F.monotonically_increasing_id() # id在coalesce之后是不确定的(谁知道),因为coalesce导致分区合并而没有排序保证。
在这种情况下,显式排序将解决这个问题。
我过去用来解决这个问题的是我的表的标识字段--在本例中是“F.row_number().over(Window.orderBy(physical_attribute_df.color))中的颜色”
physical_attribute_df = (
cat_df.select(
F.coalesce(
animal_color_mapper[cat_df.fur_color],
F.lit("NOT SPECIFIED")
).cast('string').alias('color'), # color
cat_df.zoo_id.alias('_zoo_id'),
).groupBy(
'color',
'color2',
).agg(
F.collect_list('_zoo_id').alias('_zoo_ids')
)
physical_attribute_df = (
physical_attribute_df.coalesce(1).withColumn(
'id',
F.row_number().over(Window.orderBy(physical_attribute_df.color))
).withColumn(
'created_date', F.current_timestamp() # create_date
).withColumn(
'last_updated', F.current_timestamp() # last_updated
)
)
_physical_attributes_df = physical_attribute_df.select(
F.explode(physical_attribute_df._zoo_ids).alias('zoo_id'),
physical_attribute_df.id.alias('physical_attribute_id'),
)
physical_attribute_df = physical_attribute_df.drop('_zoo_ids')
animal_df = (
cat_df.join(
_physical_attributes_ids_df,
_physical_attributes_ids_df.zoo_id == cat_df.zoo_id,
'left'
).select(
F.current_timestamp().alias('created_date'), # create_date
F.current_timestamp().alias('last_updated'), # last_updated
cat_df.zoo_id.alias('zoo_id'), # zoo_id
_physical_attributes_ids_df.physical_attributes_id.alias('physical_attributes_id'), # physical_attributes_id
)
)
animal_df = (
animal_df.coalesce(1).withColumn(
'id',
F.row_number().over(Window.orderBy(animal_df.zoo_id))
)
)https://stackoverflow.com/questions/69933804
复制相似问题