对于给定的DataFrame,在成为save
d到parquet
之前,这里是一个模式:注意,centroid0
是第一个列,是StringType
。
但是,当使用以下方法保存文件时:
df.write.partitionBy(dfHolder.metadata.partitionCols: _*).format("parquet").mode("overwrite").save(fpath)
以partitionCols
作为centroid0
还有一个(对我来说)令人惊讶的结果:
centroid0
分区列已移动到行的endInteger
我通过println
确认了输出路径:
path=/git/block/target/scala-2.11/test-classes/data/output/blocking/out//level1/clusters
下面是从保存的parquet
读取数据时的模式
为什么要对输入模式进行这两种修改--以及如何避免--同时仍然将centroid0
作为分区列来维护?
更新首选答案应该提到为什么将分区添加到列列表的末尾(vs开头)。我们需要理解确定性的排序。
此外,是否有任何方法可以使spark
在推断的列类型上“改变想法”?我不得不将分区从0
、1
等更改为c0
、c1
等,以便将推理映射到StringType
。也许那是必需的..。但如果有一些火花设置,以改变行为,这将是一个很好的答案。
发布于 2022-03-10 08:14:09
实际上,您可以很容易地使用包含分区数据模式的case类的列的排序。您需要从路径中读取数据,在路径中存储分区列,以使Spark推断这些列的值。然后,只需使用case类模式和如下语句应用重新排序:
val encoder: Encoder[RecordType] = Encoders.product[RecordType]
spark.read
.schema(encoder.schema)
.format("parquet")
.option("mergeSchema", "true")
.load(myPath)
// reorder columns, since reading from partitioned data, the partitioning columns are put to end
.select(encoder.schema.fieldNames.head, encoder.schema.fieldNames.tail: _*)
.as[RecordType]
https://stackoverflow.com/questions/50962934
复制相似问题