我正在使用https://github.com/databricks/spark-csv,我正在尝试写一个CSV,但是不能,它正在制作一个文件夹。
需要一个Scala函数,它将接受参数,如路径和文件名,并写入CSV文件。
发布于 2015-07-28 19:44:54
它正在创建一个包含多个文件的文件夹,因为每个分区都是单独保存的。如果您需要单个输出文件(仍在文件夹中),则可以使用repartition (如果上游数据较大,但需要随机处理,则首选):
df
.repartition(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")或coalesce
df
.coalesce(1)
.write.format("com.databricks.spark.csv")
.option("header", "true")
.save("mydata.csv")保存前的数据帧:
所有数据都将写入mydata.csv/part-00000。在使用此选项之前,请确保您了解发生了什么以及将所有数据传输到单个worker的成本是多少。如果将分布式文件系统与复制结合使用,数据将多次传输-首先提取到单个工作进程,然后跨存储节点分发。
或者,您可以让代码保持原样,然后使用cat或HDFS getmerge等通用工具简单地合并所有部分。
发布于 2017-01-22 06:09:32
如果你使用HDFS运行Spark,我已经通过正常编写csv文件并利用HDFS进行合并来解决这个问题。我在Spark (1.6)中直接这样做:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
val newData = << create your dataframe >>
val outputfile = "/user/feeds/project/outputs/subject"
var filename = "myinsights"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob = outputFileName
newData.write
.format("com.databricks.spark.csv")
.option("header", "false")
.mode("overwrite")
.save(outputFileName)
merge(mergeFindGlob, mergedFileName )
newData.unpersist()我不记得我是从哪里学到这个技巧的,但它可能对你有用。
发布于 2016-01-14 22:44:40
我在这里可能有点晚了,但使用coalesce(1)或repartition(1)可能适用于较小的数据集,但大型数据集将全部放入一个节点上的一个分区中。这很可能抛出OOM错误,或者最好的情况下,处理速度很慢。
我强烈建议您使用Hadoop API中的FileUtil.copyMerge()函数。这将把输出合并到一个文件中。
-有效地将数据带到驱动程序而不是执行器节点。如果单个执行器使用的内存比驱动程序多,那么Coalesce()就没问题。
Hadoop 2:copyMerge()将在Hadoop3.0中删除。有关如何使用最新版本的更多信息,请参阅下面的堆栈溢出文章:How to do CopyMerge in Hadoop 3.0?
https://stackoverflow.com/questions/31674530
复制相似问题