首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何根据行添加新列并插入多个值?

如何根据行添加新列并插入多个值?
EN

Stack Overflow用户
提问于 2020-11-03 06:45:44
回答 1查看 857关注 0票数 2

我对火种很陌生。我希望添加一个具有多个值的新列,并添加带有这些值的分区。

代码语言:javascript
运行
复制
import math

coun=df.count()

if(coun<= 20000):
    chunksize=2
    rowsperchunk = math.ceil(coun/2)
else:
    chunksize= math.ceil(coun/20000)
    rowsperchunk = 20000

for i in chunksize:
    df.limit(num_rows_per_chunk).withColumn('chunk',F.lit(i))

在上面的for循环中,它将只插入一个值直到限制。

示例:我的数据帧中有100 k行,所以块大小为5。而每块的行数是20 000,所以我需要添加新的列,前面的20 000行需要插入值1,接下来的20 000行需要插入值2。然后,我想根据我们创建的新列进行分区。

EN

回答 1

Stack Overflow用户

发布于 2020-11-03 21:57:31

因此,您希望重新划分数据,以便在相同大小的分区中进行分区,同时保持顺序。

在这里面并不是那么容易点燃。我要做的是从计算每个分区的大小开始。然后,对于每个分区,我将计算前几个分区中的dataframe中的记录数。有了这一点和分区(partition_rank)中记录的级别,按所需分区的大小进行除法将给出新的分配。注意,我引入了一个index列来计算秩并保持顺序。以下是代码:

代码语言:javascript
运行
复制
partition_size = 20000

from pyspark.sql import functions as F
part_counts = df.withColumn("p", F.spark_partition_id()).groupBy("p").count().collect()
part_counts.sort()
part_counts = [(x[0], x[1]) for x in part_counts]

cum_part_counts = []
sum=0
for index, count in part_counts:
    cum_part_counts.append((index, sum))
    sum+=count
cum_part_counts_df = spark.createDataFrame(cum_part_counts, ['partition_index', 'count'])

repartitioned_df = df\
  .withColumn("partition_index", F.spark_partition_id())\
  .withColumn("index", F.monotonically_increasing_id())\
  .withColumn("partition_rank", F.rank().over(
           Window.partitionBy("partition_index").orderBy("index")))\
  .join(cum_part_counts_df, ['partition_index'])\
  .withColumn("new_partition",
      F.floor((F.col("count") + F.col("partition_rank") - 1)/partition_size))\
  .orderBy("index")\
  .write.partitionBy("new_partition").parquet("...")
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64657929

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档