前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >​PySpark 读写 Parquet 文件到 DataFrame

​PySpark 读写 Parquet 文件到 DataFrame

作者头像
数据STUDIO
发布2023-09-04 12:50:07
5800
发布2023-09-04 12:50:07
举报
文章被收录于专栏:数据STUDIO数据STUDIO

本文中,云朵君将和大家一起学习如何从 PySpark DataFrame 编写 Parquet 文件并将 Parquet 文件读取到 DataFrame 并创建视图/表来执行 SQL 查询。还要学习在 SQL 的帮助下,如何对 Parquet 文件对数据进行分区和检索分区以提高性能。

Pyspark SQL 提供了将 Parquet 文件读入 DataFrame 和将 DataFrame 写入 Parquet 文件,DataFrameReaderDataFrameWriter对方法parquet()分别用于读取和写入/创建 Parquet 文件。Parquet 文件与数据一起维护模式,因此它用于处理结构化文件。

下面是关于如何在 PySpark 中写入和读取 Parquet 文件的简单说明,我将在后面的部分中详细解释。

代码语言:javascript
复制
df.write.parquet("/tmp/out/people.parquet") 
parDF1=spark.read.parquet("/temp/out/people.parquet")

之前,我详细讲解过,首先让我们了解一下什么是 Parquet 文件以及它相对于 CSV、JSON 等文本文件格式的优势。

什么是 Parquet 文件

Apache Parquet 文件是一种列式存储格式,适用于 Hadoop 生态系统中的任何项目,无论选择何种数据处理框架、数据模型或编程语言。

https://parquet.apache.org/

优点

在查询列式存储时,它会非常快速地跳过不相关的数据,从而加快查询执行速度。因此,与面向行的数据库相比,聚合查询消耗的时间更少。

Parquet 能够支持高级嵌套数据结构,并支持高效的压缩选项和编码方案。

Pyspark SQL 支持读取和写入 Parquet 文件,自动捕获原始数据的模式,它还平均减少了 75% 的数据存储。Pyspark 默认在其库中支持 Parquet,因此我们不需要添加任何依赖库。

Apache Parquet Pyspark 示例

由于我们没有 Parquet 文件,我们从 DataFrame 编写 Parquet。首先,使用方法 spark.createDataFrame() 从数据列表创建一个 Pyspark DataFrame。

代码语言:javascript
复制
data =[("James ","","Smith","36636","M",3000),
       ("Michael ","Rose","","40288","M",4000),
       ("Robert ","","Williams","42114","M",4000),
       ("Maria ","Anne","Jones","39192","F",4000),
       ("Jen","Mary","Brown","","F",-1)]
columns=["firstname", "middlename",
         "lastname", "dob",
         "gender", "salary"]
df=spark.createDataFrame(data,columns)

在上面的示例中,它创建了一个 DataFrame,其中包含 firstname、middlename、lastname、dob、gender、salary 列。

Pyspark 将 DataFrame 写入 Parquet 文件格式

现在通过调用DataFrameWriter类的parquet()函数从PySpark DataFrame创建一个parquet文件。当将DataFrame写入parquet文件时,它会自动保留列名及其数据类型。Pyspark创建的每个分区文件都具有 .parquet 文件扩展名。

代码语言:javascript
复制
df.write.parquet("/PyDataStudio/output/people.parquet")

Pyspark 将 Parquet 文件读入 DataFrame

Pyspark 在 DataFrameReader 类中提供了一个parquet()方法来将 Parquet 文件读入 dataframe。下面是一个将 Parquet 文件读取到 dataframe 的示例。

代码语言:javascript
复制
parDF=spark.read.parquet("/PyDataStudio/output/people.parquet")

追加或覆盖现有 Parquet 文件

使用 append 追加保存模式,可以将数据框追加到现有的 Parquet 文件中。如要覆盖使用 overwrite 覆盖保存模式。

代码语言:javascript
复制
df.write.mode('append') \
        .parquet("/PyDataStudio/output/people.parquet")
df.write.mode('overwrite') \
        .parquet("/PyDataStudio/output/people.parquet")

执行 SQL 查询 DataFrame

Pyspark Sql 提供在 Parquet 文件上创建临时视图以执行 sql 查询。在你的程序存在之前,这些视图都可用。

代码语言:javascript
复制
parqDF.createOrReplaceTempView("ParquetTable")
parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")

在 Parquet 文件上创建表

现在来看看在 Parquet 文件上执行 SQL 查询。为了执行 sql 查询,我们不从 DataFrame 中创建,而是直接在 parquet 文件上创建一个临时视图或表。

代码语言:javascript
复制
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/PyDataStudio/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()

在这里,我们从 people.parquet 文件创建了一个临时视图 PERSON 。这给出了以下结果。

代码语言:javascript
复制
+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |          |Williams|42114|     M|  4000|
|   Maria |      Anne|   Jones|39192|     F|  4000|
| Michael |      Rose|        |40288|     M|  4000|
|   James |          |   Smith|36636|     M|  3000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+

创建 Parquet 分区文件

当我们对 PERSON 表执行特定查询时,它会扫描所有行并返回结果。这与传统的数据库查询执行类似。在 PySpark 中,我们可以通过使用 PySpark partitionBy()方法对数据进行分区,以优化的方式改进查询执行。

代码语言:javascript
复制
df.write.partitionBy("gender","salary") \
     .mode("overwrite") \
        .parquet("/PyDataStudio/output/people2.parquet")

当检查 people2.parquet 文件时,它有两个分区 gendersalary

从分区 Parquet 文件中检索

下面的示例解释了将分区 Parquet 文件读取到 gender=M 的 DataFrame 中。

代码语言:javascript
复制
parDF2=spark.read.parquet("/PyDataStudio/output/people2.parquet/gender=M")
parDF2.show(truncate=False)

上述示例的输出如下所示。

代码语言:javascript
复制
+---------+----------+--------+-----+------+
|firstname|middlename|lastname|dob  |salary|
+---------+----------+--------+-----+------+
|Robert   |          |Williams|42114|4000  |
|Michael  |Rose      |        |40288|4000  |
|James    |          |Smith   |36636|3000  |
+---------+----------+--------+-----+------+

在分区 Parquet 文件上创建表

在这里,我在分区 Parquet 文件上创建一个表,并执行一个比没有分区的表执行得更快的查询,从而提高了性能。

代码语言:javascript
复制
spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"/PyDataStudio/output/people2.parquet/gender=F\")")
spark.sql("SELECT * FROM PERSON2" ).show()

上述示例的输出如下所示。

代码语言:javascript
复制
+---------+----------+--------+-----+------+
|firstname|middlename|lastname|  dob|salary|
+---------+----------+--------+-----+------+
|   Maria |      Anne|   Jones|39192|  4000|
|      Jen|      Mary|   Brown|     |    -1|
+---------+----------+--------+-----+------+

PySpark读写Parquet文件的完整示例

代码语言:javascript
复制
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("parquetFile").getOrCreate()
data =[("James ","","Smith","36636","M",3000),
              ("Michael ","Rose","","40288","M",4000),
              ("Robert ","","Williams","42114","M",4000),
              ("Maria ","Anne","Jones","39192","F",4000),
              ("Jen","Mary","Brown","","F",-1)]
columns=["firstname","middlename","lastname","dob","gender","salary"]
df=spark.createDataFrame(data,columns)
df.write.mode("overwrite").parquet("/PyDataStudio/output/people.parquet")
parDF1=spark.read.parquet("/PyDataStudio/output/people.parquet")
parDF1.createOrReplaceTempView("parquetTable")
parDF1.printSchema()
parDF1.show(truncate=False)

parkSQL = spark.sql("select * from ParquetTable where salary >= 4000 ")
parkSQL.show(truncate=False)

spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/PyDataStudio/output/people.parquet\")")
spark.sql("SELECT * FROM PERSON").show()

df.write.partitionBy("gender","salary").mode("overwrite").parquet("/PyDataStudio/output/people2.parquet")

parDF2=spark.read.parquet("/PyDataStudio/output/people2.parquet/gender=M")
parDF2.show(truncate=False)

spark.sql("CREATE TEMPORARY VIEW PERSON2 USING parquet OPTIONS (path \"/PyDataStudio/output/people2.parquet/gender=F\")")
spark.sql("SELECT * FROM PERSON2" ).show()
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2023-07-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据STUDIO 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是 Parquet 文件
    • 优点
    • Apache Parquet Pyspark 示例
    • Pyspark 将 DataFrame 写入 Parquet 文件格式
    • Pyspark 将 Parquet 文件读入 DataFrame
    • 追加或覆盖现有 Parquet 文件
    • 执行 SQL 查询 DataFrame
    • 在 Parquet 文件上创建表
    • 创建 Parquet 分区文件
    • 从分区 Parquet 文件中检索
    • 在分区 Parquet 文件上创建表
    • PySpark读写Parquet文件的完整示例
    相关产品与服务
    数据保险箱
    数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
    领券
    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档