Parquet是一种列式存储格式,很多种处理引擎都支持这种存储格式,也是sparksql的默认存储格式。Spark SQL支持灵活的读和写Parquet文件,并且对parquet文件的schema可以自动解析。当Spark SQL需要写成Parquet文件时,处于兼容的原因所有的列都被自动转化为了nullable。
读写Parquet文件
// Encoders for most common types are automatically provided by importing spark.implicits._import spark.implicits._
val peopleDF = spark.read.json("examples/src/main/resources/people.json")
// DataFrames can be saved as Parquet files, maintaining the schema informationpeopleDF.write.parquet("people.parquet")
// Read in the parquet file created above// Parquet files are self-describing so the schema is preserved// The result of loading a Parquet file is also a DataFrameval parquetFileDF = spark.read.parquet("people.parquet")
// Parquet files can also be used to create a temporary view and then used in SQL statementsparquetFileDF.createOrReplaceTempView("parquetFile")val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")namesDF.map(attributes => "Name: " + attributes(0)).show()// +------------+// | value|// +------------+// |Name: Justin|// +------------+
分区发现
分区表时很多系统支持的,比如hive,对于一个分区表,往往是采用表中的某一或多个列去作为分区的依据,分区是以文件目录的形式体现。所有内置的文件源(Text/CSV/JSON/ORC/Parquet)都支持自动的发现和推测分区信息。例如,我们想取两个分区列,gender和country,先按照性别分区,再按照国家分区:
path└── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
SparkSession.read.parquet 或者 SparkSession.read.load读取的目录为path/to/table的时候,会自动从路径下抽取分区信息。返回DataFrame的表结构为:
root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)
细细分析一下你也会发现分区列的数据类型也是自动推断的。当前支持的数据类型有,数字类型,date,timestamp和string类型。有时候用户可能不希望自动推断分区列的类型,这时候只需要将spark.sql.sources.partitionColumnTypeInference.enabled配置为false即可。如果分区列的类型推断这个参数设置为了false,那么分区列的类型会被认为是string。
从spark 1.6开始,分区发现默认情况只会发现给定路径下的分区。比如,上面的分区表,假如你讲路径path/to/table/gender=male传递给SparkSession.read.parquet 或者 SparkSession.read.load 那么gender不会被认为是分区列。如果想检测到该分区,传给spark的路径应该是其父路径也即是path/to/table/,这样gender就会被认为是分区列。
schema合并
跟protocol buffer,avro,thrift一样,parquet也支持schema演变升级。用户可以在刚开始的时候创建简单的schema,然后根据需要随时扩展新的列。
Parquet 数据源支持自动检测新作列并且会合并schema。
由于合并schema是一个相当耗费性能的操作,而且很多情况下都是不必要的,所以从spark 1.5开始就默认关闭掉该功能。有两种配置开启方式:
通过数据源option设置mergeSchema为true。在全局sql配置中设置spark.sql.parquet.mergeSchema 为true.// This is used to implicitly convert an RDD to a DataFrame.import spark.implicits._
// Create a simple DataFrame, store into a partition directoryval squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")squaresDF.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,// adding a new column and dropping an existing columnval cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")cubesDF.write.parquet("data/test_table/key=2")
// Read the partitioned tableval mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")mergedDF.printSchema()
// The final schema consists of all 3 columns in the Parquet files together// with the partitioning column appeared in the partition directory paths// root// |-- value: int (nullable = true)// |-- square: int (nullable = true)// |-- cube: int (nullable = true)// |-- key: int (nullable = true)
hive metastore Parquet表转换
当读写hive metastore parquet格式表的时候,Spark SQL为了较好的性能会使用自己默认的parquet格式而不是采用hive SerDe。该行为是通过参数spark.sql.hive.convertMetastoreParquet空值,默认是true。
Hive和parquet兼容性
从表schema处理角度讲hive和parquet有两个主要的区别
由于上面的原因,在将hive metastore parquet转化为spark parquet表的时候,需要处理兼容一下hive的schema和parquet的schema。兼容处理的原则是:
元数据刷新
Spark SQL为了更好的性能会缓存parquet的元数据。当spark 读取hive表的时候,schema一旦从hive转化为spark sql的,就会被spark sql缓存,如果此时表的schema被hive或者其他外部工具更新,必须要手动的去刷新元数据,才能保证元数据的一致性。
spark.catalog.refreshTable("my_table")
配置
parquet的相关的参数可以通过setconf或者set key=value的形式配置。
星球里刚刚更新完flink datastream完整版本教程,完整的案例已经置顶~