首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >使用spark-csv写入单个CSV文件

使用spark-csv写入单个CSV文件
EN

Stack Overflow用户
提问于 2015-07-28 19:08:20
回答 15查看 325.6K关注 0票数 144

我正在使用https://github.com/databricks/spark-csv,我正在尝试写一个CSV,但是不能,它正在制作一个文件夹。

需要一个Scala函数,它将接受参数,如路径和文件名,并写入CSV文件。

EN

回答 15

Stack Overflow用户

发布于 2015-07-28 19:44:54

它正在创建一个包含多个文件的文件夹,因为每个分区都是单独保存的。如果您需要单个输出文件(仍在文件夹中),则可以使用repartition (如果上游数据较大,但需要随机处理,则首选):

代码语言:javascript
运行
复制
df
   .repartition(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

coalesce

代码语言:javascript
运行
复制
df
   .coalesce(1)
   .write.format("com.databricks.spark.csv")
   .option("header", "true")
   .save("mydata.csv")

保存前的数据帧:

所有数据都将写入mydata.csv/part-00000。在使用此选项之前,请确保您了解发生了什么以及将所有数据传输到单个worker的成本是多少。如果将分布式文件系统与复制结合使用,数据将多次传输-首先提取到单个工作进程,然后跨存储节点分发。

或者,您可以让代码保持原样,然后使用catHDFS getmerge等通用工具简单地合并所有部分。

票数 199
EN

Stack Overflow用户

发布于 2017-01-22 06:09:32

如果你使用HDFS运行Spark,我已经通过正常编写csv文件并利用HDFS进行合并来解决这个问题。我在Spark (1.6)中直接这样做:

代码语言:javascript
运行
复制
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
   val hadoopConfig = new Configuration()
   val hdfs = FileSystem.get(hadoopConfig)
   FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
   // the "true" setting deletes the source files once they are merged into the new output
}


val newData = << create your dataframe >>


val outputfile = "/user/feeds/project/outputs/subject"  
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename 
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob  = outputFileName

    newData.write
        .format("com.databricks.spark.csv")
        .option("header", "false")
        .mode("overwrite")
        .save(outputFileName)
    merge(mergeFindGlob, mergedFileName )
    newData.unpersist()

我不记得我是从哪里学到这个技巧的,但它可能对你有用。

票数 40
EN

Stack Overflow用户

发布于 2016-01-14 22:44:40

我在这里可能有点晚了,但使用coalesce(1)repartition(1)可能适用于较小的数据集,但大型数据集将全部放入一个节点上的一个分区中。这很可能抛出OOM错误,或者最好的情况下,处理速度很慢。

我强烈建议您使用Hadoop API中的FileUtil.copyMerge()函数。这将把输出合并到一个文件中。

-有效地将数据带到驱动程序而不是执行器节点。如果单个执行器使用的内存比驱动程序多,那么Coalesce()就没问题。

Hadoop 2:copyMerge()将在Hadoop3.0中删除。有关如何使用最新版本的更多信息,请参阅下面的堆栈溢出文章:How to do CopyMerge in Hadoop 3.0?

票数 37
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31674530

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档