我对火种很陌生。我希望添加一个具有多个值的新列,并添加带有这些值的分区。
import math
coun=df.count()
if(coun<= 20000):
chunksize=2
rowsperchunk = math.ceil(coun/2)
else:
chunksize= math.ceil(coun/20000)
rowsperchunk = 20000
for i in chunksize:
df.limit(num_rows_per_chunk).withColumn('chunk',F.lit(i))在上面的for循环中,它将只插入一个值直到限制。
示例:我的数据帧中有100 k行,所以块大小为5。而每块的行数是20 000,所以我需要添加新的列,前面的20 000行需要插入值1,接下来的20 000行需要插入值2。然后,我想根据我们创建的新列进行分区。
发布于 2020-11-03 21:57:31
因此,您希望重新划分数据,以便在相同大小的分区中进行分区,同时保持顺序。
在这里面并不是那么容易点燃。我要做的是从计算每个分区的大小开始。然后,对于每个分区,我将计算前几个分区中的dataframe中的记录数。有了这一点和分区(partition_rank)中记录的级别,按所需分区的大小进行除法将给出新的分配。注意,我引入了一个index列来计算秩并保持顺序。以下是代码:
partition_size = 20000
from pyspark.sql import functions as F
part_counts = df.withColumn("p", F.spark_partition_id()).groupBy("p").count().collect()
part_counts.sort()
part_counts = [(x[0], x[1]) for x in part_counts]
cum_part_counts = []
sum=0
for index, count in part_counts:
cum_part_counts.append((index, sum))
sum+=count
cum_part_counts_df = spark.createDataFrame(cum_part_counts, ['partition_index', 'count'])
repartitioned_df = df\
.withColumn("partition_index", F.spark_partition_id())\
.withColumn("index", F.monotonically_increasing_id())\
.withColumn("partition_rank", F.rank().over(
Window.partitionBy("partition_index").orderBy("index")))\
.join(cum_part_counts_df, ['partition_index'])\
.withColumn("new_partition",
F.floor((F.col("count") + F.col("partition_rank") - 1)/partition_size))\
.orderBy("index")\
.write.partitionBy("new_partition").parquet("...")https://stackoverflow.com/questions/64657929
复制相似问题