我有一个包含下一步信息的Dataframe df:
id json_data
1 {a: "1", b: "2"}
1 {a: "1", b: "3"}
1 {a: "1", b: "4"}
2 {a: "1", b: "2"}
2 {a: "1", b: "6"}
我需要下一个最终结果:
id json_data
1 [{a: "1", b: "2"},{a: "1", b: "3"},{a: "1", b: "4"}]
2 [{a: "1", b: "2"},{a: "1", b: "6"}]
我尝试了两种不同的方法,分别使用窗口函数和groupBy。使用这两种方法,我都得到了想要的结果。
1:方法:
var user_window = Window.partitionBy("id").orderBy("id")
val df2 = df.withColumn("json_data",
collect_list($"json_data").over(user_window))
.withColumn("rank", row_number().over(user_window))
.where("rank = 1")
2:方法:
val df2 = df.groupBy(df("id")).agg(collect_list($"json_data").as("json_data"))
使用这两种方法,我获得了相同的性能。但是阅读有关Spark的文档,似乎这两种方法都效率不高,因为具有相同键的行需要在集群中移动(随机排列)才能在一起。我展示了一个小示例,因为在生产中我有大量的数据。进行分组或使用窗口函数需要很长时间。
为了做到这一点,有什么替代方案吗?
发布于 2018-06-05 03:39:12
我的建议是使用reduceByKey。
这样,如果键是id
,值(在开始时)是列表中的json_data
,那么对不同的json_data
包装列表执行reduceByKey以及连接函数将会带来更好的性能。
简而言之,使用reduceByKey首先在分区内执行"groupBy“,然后才开始数据的混洗。
要了解groupByKey和reduceByKey的性能差异,一个很好的地方是here (部分6b
)。
在pyspark中,它看起来像这样:
rdd = df.rdd
rdd = rdd.map(lambda row: (row['id'], [row['json_data']]))
rdd = rdd.reduceByKey(lambda a, b: a + b)
https://stackoverflow.com/questions/50686225
复制相似问题