我有两个脚本,一个在R中,另一个在pyspark中,它使用输出。为了简单起见,我试图将该功能复制到第一个脚本中。
第二个脚本非常简单--读取一堆csv文件,并将它们作为分区块发出:
spark.read.csv(path_to_csv, header = True) \
.repartition(partition_column).write \
.partitionBy(partition_column).mode('overwrite') \
.parquet(path_to_parquet)这在R中应该同样简单,但我不知道如何匹配partitionBy在SparkR中的功能。到目前为止,我有这样的想法:
library(SparkR); library(magrittr)
read.df(path_to_csv, 'csv', header = TRUE) %>%
repartition(col = .$partition_column) %>%
write.df(path_to_parquet, 'parquet', mode = 'overwrite')这成功地为partition_column的每个值编写了一个拼图文件。问题是发出的文件有错误的目录结构;而Python则生成类似的内容
/path/to/parquet/
partition_column=key1/
file.parquet.gz
partition_column=key2/
file.parquet.gz
...R只产生
/path/to/parquet/
file_for_key1.parquet.gz
file_for_key2.parquet.gz
...我是不是遗漏了什么?partitionBy函数在SparkR中只显示引用窗口函数的上下文,我在手册中没有看到任何可能相关的内容。也许有一种在...中传递某些东西的方法,但是我在文档或在线搜索中没有看到任何例子。
发布于 2019-06-13 13:41:16
Spark <= 2.x不支持对输出进行分区。
但是,SparR >= 3.0.0 (API接口)将支持它,语法如下:
write.df(
df, path_to_csv, "parquet", mode = "overwrite",
partitionBy = "partition_column"
)由于对应PR只修改R文件,如果升级到开发版本不是一种选择,那么您应该能够修补任何SparkR 2.x发行版:
git clone https://github.com/apache/spark.git
git checkout v2.4.3 # Or whatever branch you use
# https://github.com/apache/spark/commit/cb77a6689137916e64bc5692b0c942e86ca1a0ea
git cherry-pick cb77a6689137916e64bc5692b0c942e86ca1a0ea
R -e "devtools::install('R/pkg')"在客户端模式下,这应该只需要在驱动节点上。
但这些不是致命的,也不应该引起任何严重的问题。
https://stackoverflow.com/questions/56579955
复制相似问题