对于给定的DataFrame,在成为saved到parquet之前,这里是一个模式:注意,centroid0是第一个列,是StringType。

但是,当使用以下方法保存文件时:
df.write.partitionBy(dfHolder.metadata.partitionCols: _*).format("parquet").mode("overwrite").save(fpath)以partitionCols作为centroid0

还有一个(对我来说)令人惊讶的结果:
centroid0分区列已移动到行的endInteger我通过println确认了输出路径:
path=/git/block/target/scala-2.11/test-classes/data/output/blocking/out//level1/clusters下面是从保存的parquet读取数据时的模式

为什么要对输入模式进行这两种修改--以及如何避免--同时仍然将centroid0作为分区列来维护?
更新首选答案应该提到为什么将分区添加到列列表的末尾(vs开头)。我们需要理解确定性的排序。
此外,是否有任何方法可以使spark在推断的列类型上“改变想法”?我不得不将分区从0、1等更改为c0、c1等,以便将推理映射到StringType。也许那是必需的..。但如果有一些火花设置,以改变行为,这将是一个很好的答案。
发布于 2018-06-21 10:32:47
当您将分区字段保存为文件夹时,这对以后读取数据是有益的,因为(对于某些文件类型,包含了拼花),它可以优化地从您使用的分区读取数据(也就是说,如果您已经读取并筛选了centroid0==1 just不会读取其他分区)
这样做的效果是,分区字段(在您的例子中是centroid0)不只是作为文件夹名(centroid0=1、centroid0=2等)写入到拼花文件中。
它们的副作用是: 1.分区的类型是在运行时推断的(因为模式没有保存在parquet中),在您的情况下,只存在整数值,因此它被推断为整数。
另一个副作用是,分区字段是在模式的末尾/开头添加的,因为它将模式从拼花文件中读取为一个块,然后将该分区字段作为另一个块添加到该分区字段(同样,它不再是存储在拼花中的模式的一部分)。
发布于 2022-03-10 08:14:09
实际上,您可以很容易地使用包含分区数据模式的case类的列的排序。您需要从路径中读取数据,在路径中存储分区列,以使Spark推断这些列的值。然后,只需使用case类模式和如下语句应用重新排序:
val encoder: Encoder[RecordType] = Encoders.product[RecordType]
spark.read
.schema(encoder.schema)
.format("parquet")
.option("mergeSchema", "true")
.load(myPath)
// reorder columns, since reading from partitioned data, the partitioning columns are put to end
.select(encoder.schema.fieldNames.head, encoder.schema.fieldNames.tail: _*)
.as[RecordType]发布于 2018-06-21 08:45:39
原因其实很简单。按列进行分区时,每个分区只能包含该列的一个值。因此,实际上在文件中到处写入相同的值是无用的,这就是为什么Spark没有这样做。读取文件时,Spark使用文件名称中包含的信息来重构分区列,并将其放在架构的末尾。列的类型不是存储的,而是在读取时推断出来的,因此在您的情况下是整数类型。注:没有什么特别的理由说明为什么在后面加列。可能是刚开始的时候。我想这只是一个武断的实现选择。
为了避免丢失列的类型和顺序,您可以像下面的df.withColumn("X", 'YOUR_COLUMN).write.partitionBy("X").parquet("...")一样复制分区列。
不过你会浪费空间的。另外,spark使用分区来优化过滤器。在读取数据之后,不要忘记使用X列作为筛选器,而不是您的列,否则Spark将无法执行任何优化。
https://stackoverflow.com/questions/50962934
复制相似问题