专栏首页Spark学习技巧Spark SQL的Parquet那些事儿

Spark SQL的Parquet那些事儿

Parquet是一种列式存储格式,很多种处理引擎都支持这种存储格式,也是sparksql的默认存储格式。Spark SQL支持灵活的读和写Parquet文件,并且对parquet文件的schema可以自动解析。当Spark SQL需要写成Parquet文件时,处于兼容的原因所有的列都被自动转化为了nullable。

读写Parquet文件

// Encoders for most common types are automatically provided by importing spark.implicits._import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write.parquet("people.parquet")

// Read in the parquet file created above// Parquet files are self-describing so the schema is preserved// The result of loading a Parquet file is also a DataFrameval parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile")val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")namesDF.map(attributes => "Name: " + attributes(0)).show()// +------------+// |       value|// +------------+// |Name: Justin|// +------------+

分区发现

分区表时很多系统支持的,比如hive,对于一个分区表,往往是采用表中的某一或多个列去作为分区的依据,分区是以文件目录的形式体现。所有内置的文件源(Text/CSV/JSON/ORC/Parquet)都支持自动的发现和推测分区信息。例如,我们想取两个分区列,gender和country,先按照性别分区,再按照国家分区:

path└── to   └── table       ├── gender=male       │   ├── ...       │   │       │   ├── country=US       │   │   └── data.parquet       │   ├── country=CN       │   │   └── data.parquet       │   └── ...       └── gender=female           ├── ...           │           ├── country=US           │   └── data.parquet           ├── country=CN           │   └── data.parquet           └── ...

SparkSession.read.parquet 或者 SparkSession.read.load读取的目录为path/to/table的时候,会自动从路径下抽取分区信息。返回DataFrame的表结构为:

root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)

细细分析一下你也会发现分区列的数据类型也是自动推断的。当前支持的数据类型有,数字类型,date,timestamp和string类型。有时候用户可能不希望自动推断分区列的类型,这时候只需要将spark.sql.sources.partitionColumnTypeInference.enabled配置为false即可。如果分区列的类型推断这个参数设置为了false,那么分区列的类型会被认为是string。

从spark 1.6开始,分区发现默认情况只会发现给定路径下的分区。比如,上面的分区表,假如你讲路径path/to/table/gender=male传递给SparkSession.read.parquet 或者 SparkSession.read.load 那么gender不会被认为是分区列。如果想检测到该分区,传给spark的路径应该是其父路径也即是path/to/table/,这样gender就会被认为是分区列。

schema合并

跟protocol buffer,avro,thrift一样,parquet也支持schema演变升级。用户可以在刚开始的时候创建简单的schema,然后根据需要随时扩展新的列。

Parquet 数据源支持自动检测新作列并且会合并schema。

由于合并schema是一个相当耗费性能的操作,而且很多情况下都是不必要的,所以从spark 1.5开始就默认关闭掉该功能。有两种配置开启方式:

通过数据源option设置mergeSchema为true。在全局sql配置中设置spark.sql.parquet.mergeSchema 为true.// This is used to implicitly convert an RDD to a DataFrame.import spark.implicits._

// Create a simple DataFrame, store into a partition directoryval squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnval cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned tableval mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths// root//  |-- value: int (nullable = true)//  |-- square: int (nullable = true)//  |-- cube: int (nullable = true)//  |-- key: int (nullable = true)

hive metastore Parquet表转换

当读写hive metastore parquet格式表的时候,Spark SQL为了较好的性能会使用自己默认的parquet格式而不是采用hive SerDe。该行为是通过参数spark.sql.hive.convertMetastoreParquet空值,默认是true。

Hive和parquet兼容性

从表schema处理角度讲hive和parquet有两个主要的区别

  1. hive是大小写敏感的,但是parquet不是。
  2. hive会讲所有列视为nullable,但是nullability在parquet里有独特的意义。

由于上面的原因,在将hive metastore parquet转化为spark parquet表的时候,需要处理兼容一下hive的schema和parquet的schema。兼容处理的原则是:

  1. 有相同名字的字段必须要有相同的数据类型,忽略nullability。兼容处理的字段应该保持parquet侧的数据类型,这样就可以处理到nullability类型了。
  2. 兼容处理的schema应直接包含在hive元数据里的schema信息:
    1. 任何仅仅出现在parquet schema的字段将会被删除
    2. 任何仅仅出现在hive 元数据里的字段将会被视为nullable。

元数据刷新

Spark SQL为了更好的性能会缓存parquet的元数据。当spark 读取hive表的时候,schema一旦从hive转化为spark sql的,就会被spark sql缓存,如果此时表的schema被hive或者其他外部工具更新,必须要手动的去刷新元数据,才能保证元数据的一致性。

spark.catalog.refreshTable("my_table")

配置

parquet的相关的参数可以通过setconf或者set key=value的形式配置。

  • spark.sql.parquet.binaryAsString 默认值是false。一些parquet生产系统,尤其是impala,hive和老版本的spark sql,不区分binary和string类型。该参数告诉spark 讲binary数据当作字符串处理。
  • spark.sql.parquet.int96AsTimestamp 默认是true。有些parquet生产系统,尤其是parquet和hive,将timestamp翻译成INT96.该参数会提示Spark SQL讲INT96翻译成timestamp。
  • spark.sql.parquet.compression.codec 默认是snappy。当写parquet文件的时候设置压缩格式。如果在option或者properties里配置了compression或者parquet.compression优先级依次是:compression,parquet.compression,spark.sql.parquet.compression.codec。支持的配置类型有:none,uncompressed,snappy,gzip,lzo,brotli,lz4,zstd。在hadoop2.9.0之前,zstd需要安装ZstandardCodec,brotli需要安装BrotliCodec。
  • spark.sql.parquet.filterPushdown 默认是true。设置为true代表开启parquet下推执行优化。
  • spark.sql.hive.convertMetastoreParquet 默认是true。假如设置为false,spark sql会读取hive parquet表的时候使用Hive SerDe,替代内置的。
  • spark.sql.parquet.mergeSchema 默认是false。当设置为true的时候,parquet数据源会合并读取所有的parquet文件的schema,否则会从summary文件或者假如没有summary文件的话随机的选一些数据文件来合并schema。
  • spark.sql.parquet.writeLegacyFormat 默认是false。如果设置为true 数据会以spark 1.4和更早的版本的格式写入。比如,decimal类型的值会被以apache parquet的fixed-length byte array格式写出,该格式是其他系统例如hive,impala等使用的。如果是false,会使用parquet的新版格式。例如,decimals会以int-based格式写出。如果spark sql要以parquet输出并且结果会被不支持新格式的其他系统使用的话,需要设置为true。

星球里刚刚更新完flink datastream完整版本教程,完整的案例已经置顶~

本文分享自微信公众号 - Spark学习技巧(bigdatatip),作者:浪院长

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-05-27

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Spark SQL的Parquet那些事儿.docx

    Parquet是一种列式存储格式,很多种处理引擎都支持这种存储格式,也是sparksql的默认存储格式。Spark SQL支持灵活的读和写Parquet文件,并...

    Spark学习技巧
  • 聊聊spark-submit的几个有用选项

    Spark学习技巧
  • Spark学习入门(让人看了想吐的话题)

    Spark学习技巧
  • Voleon Group:一家『纯』用机器学习策略的对冲基金

    全球规模最大、表现最好、最成功的的人工智能对冲基金之一的Voleon Group最近受到市场冲击后,出现巨额了亏损,成为了今年众多陷入困境的AI驱动型基金管理公...

    量化投资与机器学习微信公众号
  • MongoDB

    断痕
  • 机器学习变得越来越容易,软件工程仍旧很难

    在过去的五年中,机器学习变得更加容易而软件工程比以往更加复杂。对于软件工程师来说,这是好事,但对机器学习专家却不是。这是机器学习向软件工程,数据科学向数据分析的...

    大数据文摘
  • logback.xml读取spring的属性

    因为logback.xml和logback-test.xml会被logback组件直接读取,所以如果要交给spring管理,需要

    十毛
  • JQuery选择器和JQuery包装集

    (本文年代久远,请谨慎阅读)今天学习了JQuery的一些基本用法,包括JQuery选择器和JQuery包装集;

    ZONGLYN
  • 【DeepMind&OpenAI】利用人类偏好深度强化学习,机器学会后空翻

    【新智元导读】DeepMind 和 OpenAI 合作的新研究,让没有技术经验的人类给强化学习系统提供反馈,从而避免事先为系统指定目标的步骤。在某些情况下,这种...

    新智元
  • 整理:深度学习 vs 机器学习 vs 模式识别

    本文来自CMU的博士,MIT的博士后,vision.ai的联合创始人Tomasz Malisiewicz的个人博客文章,阅读本文,你可以更好的理解计算机视觉是怎...

    CSDN技术头条

扫码关注云+社区

领取腾讯云代金券