首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >将文件保存到Parquet时,分区列被移动到行尾

将文件保存到Parquet时,分区列被移动到行尾
EN

Stack Overflow用户
提问于 2018-06-21 07:38:13
回答 3查看 2.3K关注 0票数 5

对于给定的DataFrame,在成为saved到parquet之前,这里是一个模式:注意,centroid0第一个列,是StringType

但是,当使用以下方法保存文件时:

代码语言:javascript
运行
复制
      df.write.partitionBy(dfHolder.metadata.partitionCols: _*).format("parquet").mode("overwrite").save(fpath)

partitionCols作为centroid0

还有一个(对我来说)令人惊讶的结果:

  • centroid0分区列已移动到行的end
  • 数据类型已更改为Integer

我通过println确认了输出路径:

代码语言:javascript
运行
复制
 path=/git/block/target/scala-2.11/test-classes/data/output/blocking/out//level1/clusters

下面是从保存的parquet读取数据时的模式

为什么要对输入模式进行这两种修改--以及如何避免--同时仍然将centroid0作为分区列来维护?

更新首选答案应该提到为什么将分区添加到列列表的末尾(vs开头)。我们需要理解确定性的排序。

此外,是否有任何方法可以使spark在推断的列类型上“改变想法”?我不得不将分区从01等更改为c0c1等,以便将推理映射到StringType。也许那是必需的..。但如果有一些火花设置,以改变行为,这将是一个很好的答案。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-06-21 10:32:47

当您将分区字段保存为文件夹时,这对以后读取数据是有益的,因为(对于某些文件类型,包含了拼花),它可以优化地从您使用的分区读取数据(也就是说,如果您已经读取并筛选了centroid0==1 just不会读取其他分区)

这样做的效果是,分区字段(在您的例子中是centroid0)不只是作为文件夹名(centroid0=1centroid0=2等)写入到拼花文件中。

它们的副作用是: 1.分区的类型是在运行时推断的(因为模式没有保存在parquet中),在您的情况下,只存在整数值,因此它被推断为整数。

另一个副作用是,分区字段是在模式的末尾/开头添加的,因为它将模式从拼花文件中读取为一个块,然后将该分区字段作为另一个块添加到该分区字段(同样,它不再是存储在拼花中的模式的一部分)。

票数 4
EN

Stack Overflow用户

发布于 2022-03-10 08:14:09

实际上,您可以很容易地使用包含分区数据模式的case类的列的排序。您需要从路径中读取数据,在路径中存储分区列,以使Spark推断这些列的值。然后,只需使用case类模式和如下语句应用重新排序:

代码语言:javascript
运行
复制
val encoder: Encoder[RecordType] = Encoders.product[RecordType]
spark.read
      .schema(encoder.schema)
      .format("parquet")
      .option("mergeSchema", "true")
      .load(myPath)
      // reorder columns, since reading from partitioned data, the partitioning columns are put to end
      .select(encoder.schema.fieldNames.head, encoder.schema.fieldNames.tail: _*)
      .as[RecordType]
票数 1
EN

Stack Overflow用户

发布于 2018-06-21 08:45:39

原因其实很简单。按列进行分区时,每个分区只能包含该列的一个值。因此,实际上在文件中到处写入相同的值是无用的,这就是为什么Spark没有这样做。读取文件时,Spark使用文件名称中包含的信息来重构分区列,并将其放在架构的末尾。列的类型不是存储的,而是在读取时推断出来的,因此在您的情况下是整数类型。注:没有什么特别的理由说明为什么在后面加列。可能是刚开始的时候。我想这只是一个武断的实现选择。

为了避免丢失列的类型和顺序,您可以像下面的df.withColumn("X", 'YOUR_COLUMN).write.partitionBy("X").parquet("...")一样复制分区列。

不过你会浪费空间的。另外,spark使用分区来优化过滤器。在读取数据之后,不要忘记使用X列作为筛选器,而不是您的列,否则Spark将无法执行任何优化。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50962934

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档