我有30TB的数据按日期和时间分区,每小时拆分为300个文件。我进行了一些数据转换,然后希望按排序的顺序对数据进行排序并保存,以便于C++程序接收。我知道在序列化时,只有文件中的顺序是正确的。我希望通过更好地对数据进行分区来规避这个问题。
我想同时按sessionID和时间戳排序。我不想在不同的文件中拆分sessionID。如果我在SessionID上分区,我会有太多,所以我做了一个模N来生成N个存储桶,目标是获得大约100-200MB的1个存储桶数据:
df = df.withColumn("bucket", F.abs(F.col("sessionId")) % F.lit(50))
然后我按日期、小时和桶进行遣返,然后进行排序
df = df.repartition(50,"dt","hr","bucket")
df = df.sortWithinPartitions("sessionId","timestamp")
df.write.option("compression","gzip").partitionBy("dt","hr","bucket").parquet(SAVE_PATH)
这会将数据保存到dt/hr/ bucket,每个存储桶中有一个文件,但排序会丢失。如果我不创建存储桶并重新分区,那么我最终会得到200个文件,数据是有序的,但sessionIds被拆分到多个文件中。
编辑:问题似乎出在使用partitionBy("dt","hr","bucket")
保存时,它会随机重新分区数据,因此不再对其进行排序。如果我在不使用partitionBy
的情况下保存,那么我会得到我所期望的结果--N个存储桶/分区的N个文件,sessionIds跨越一个文件,所有这些都是正确排序的。因此,我有一个非spark hack手动迭代所有日期+小时目录
如果您按列进行分区,排序,然后使用partitionBy对同一列进行写入,那么您可能会直接转储已排序的分区,而不是对数据进行随机的重新洗牌。
发布于 2021-06-21 01:36:50
将分区列放在已排序列列表中可能会起到作用。
完整描述请点击此处- https://stackoverflow.com/a/59161488/3061686
https://stackoverflow.com/questions/58881553
复制相似问题