首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何从dataframe获取1000个记录并使用PySpark写入文件?

如何从dataframe获取1000个记录并使用PySpark写入文件?
EN

Stack Overflow用户
提问于 2020-04-24 15:48:42
回答 2查看 5.6K关注 0票数 1

我有100,000+的数据记录。我希望动态创建一个文件,并将每个文件的1000条记录推送。有人能帮我解决这个问题吗?谢谢。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2020-04-24 16:20:25

您可以在编写时使用dataframe选项。

如果需要在每个文件中写入1000条记录,则使用.coalesce(1) (or)编写1000条记录,每个分区使用

Example:

代码语言:javascript
运行
复制
# 1000 records written per file in each partition
df.coalesce(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)

# 1000 records written per file for dataframe 100 files created for 100,000
df.repartition(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)

#or by set config on spark session
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000)
#or
spark.sql("set spark.sql.files.maxRecordsPerFile=1000").show()

df.coalesce(1).write.mode("overwrite").parquet(<path>)
df.repartition(1).write.mode("overwrite").parquet(<path>)

Method-2:

Caluculating number of partitions then repartition the dataframe:

代码语言:javascript
运行
复制
df = spark.range(10000)

#caluculate partitions
no_partitions=df.count()/1000

from pyspark.sql.functions import *

#repartition and check number of records on each partition
df.repartition(no_partitions).\
withColumn("partition_id",spark_partition_id()).\
groupBy(col("partition_id")).\
agg(count("*")).\
show()

#+-----------+--------+
#|partiton_id|count(1)|
#+-----------+--------+
#|          1|    1001|
#|          6|    1000|
#|          3|     999|
#|          5|    1000|
#|          9|    1000|
#|          4|     999|
#|          8|    1000|
#|          7|    1000|
#|          2|    1001|
#|          0|    1000|
#+-----------+--------+

df.repartition(no_partitions).write.mode("overwrite").parquet(<path>)
票数 4
EN

Stack Overflow用户

发布于 2020-04-24 16:06:10

首先,创建行号列。

代码语言:javascript
运行
复制
df = df.withColumn('row_num', F.row_number().over(Window.orderBy('any_column'))

现在,运行一个循环并继续保存记录。

代码语言:javascript
运行
复制
for i in range(0, df.count(), 1000):
   records = df.where(F.col("row_num").between(i, i+999))
   records.toPandas().to_csv("file-{}.csv".format(i))
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61412292

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档