我有100,000+的数据记录。我希望动态创建一个文件,并将每个文件的1000条记录推送。有人能帮我解决这个问题吗?谢谢。
发布于 2020-04-24 16:20:25
您可以在编写时使用dataframe选项。
如果需要在每个文件中写入1000条记录,则使用.coalesce(1) (or)为编写1000条记录,每个分区使用
Example:
# 1000 records written per file in each partition
df.coalesce(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)
# 1000 records written per file for dataframe 100 files created for 100,000
df.repartition(1).write.option("maxRecordsPerFile", 1000).mode("overwrite").parquet(<path>)
#or by set config on spark session
spark.conf.set("spark.sql.files.maxRecordsPerFile", 1000)
#or
spark.sql("set spark.sql.files.maxRecordsPerFile=1000").show()
df.coalesce(1).write.mode("overwrite").parquet(<path>)
df.repartition(1).write.mode("overwrite").parquet(<path>)Method-2:
Caluculating number of partitions then repartition the dataframe:
df = spark.range(10000)
#caluculate partitions
no_partitions=df.count()/1000
from pyspark.sql.functions import *
#repartition and check number of records on each partition
df.repartition(no_partitions).\
withColumn("partition_id",spark_partition_id()).\
groupBy(col("partition_id")).\
agg(count("*")).\
show()
#+-----------+--------+
#|partiton_id|count(1)|
#+-----------+--------+
#| 1| 1001|
#| 6| 1000|
#| 3| 999|
#| 5| 1000|
#| 9| 1000|
#| 4| 999|
#| 8| 1000|
#| 7| 1000|
#| 2| 1001|
#| 0| 1000|
#+-----------+--------+
df.repartition(no_partitions).write.mode("overwrite").parquet(<path>)发布于 2020-04-24 16:06:10
首先,创建行号列。
df = df.withColumn('row_num', F.row_number().over(Window.orderBy('any_column'))现在,运行一个循环并继续保存记录。
for i in range(0, df.count(), 1000):
records = df.where(F.col("row_num").between(i, i+999))
records.toPandas().to_csv("file-{}.csv".format(i))https://stackoverflow.com/questions/61412292
复制相似问题