前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PySpark 读写 JSON 文件到 DataFrame

PySpark 读写 JSON 文件到 DataFrame

作者头像
数据STUDIO
发布2023-09-04 12:37:02
1K0
发布2023-09-04 12:37:02
举报
文章被收录于专栏:数据STUDIO
本文中,云朵君将和大家一起学习了如何将具有单行记录和多行记录的 JSON 文件读取到 PySpark DataFrame 中,还要学习一次读取单个和多个文件以及使用不同的保存选项将 JSON 文件写回 DataFrame。

PySpark SQL 提供 read.json("path") 将单行或多行(多行)JSON 文件读取到 PySpark DataFrame 并 write.json("path") 保存或写入 JSON 文件的功能,在本教程中,您将学习如何读取单个文件、多个文件、目录中的所有文件进入 DataFrame 并使用 Python 示例将 DataFrame 写回 JSON 文件。

注意: 开箱即用的 PySpark API 支持将 JSON 文件和更多文件格式读取到 PySpark DataFrame 中。

使用 read.json("path") 或者 read.format("json").load("path") 方法将文件路径作为参数,可以将 JSON 文件读入 PySpark DataFrame。

与读取 CSV 不同,默认情况下,来自输入文件的 JSON 数据源推断模式。

此处使用的 zipcodes.json 文件可以从 GitHub 项目下载。

传送门: https://github.com/spark-examples/pyspark-examples/blob/master/resources/zipcodes.json

代码语言:javascript
复制
# Read JSON file into dataframe
df = spark.read.json("PyDataStudio/zipcodes.json")
df.printSchema()
df.show()

当使用 format("json") 方法时,还可以通过其完全限定名称指定数据源,如下所示。

代码语言:javascript
复制
# Read JSON file into dataframe
df = spark.read.format('org.apache.spark.sql.json') \
        .load("PyDataStudio/zipcodes.json")

从多行读取 JSON 文件

PySpark JSON 数据源在不同的选项中提供了多个读取文件的选项,使用multiline选项读取分散在多行的 JSON 文件。默认情况下,多行选项设置为 false。

下面是我们要读取的输入文件,同样的文件也可以在Github上找到。

传送门: https://github.com/spark-examples/pyspark-examples/blob/master/resources/multiline-zipcode.json

代码语言:javascript
复制
[{
  "RecordNumber": 2,
  "Zipcode": 704,
  "ZipCodeType": "STANDARD",
  "City": "PASEO COSTA DEL SUR",
  "State": "PR"
},
{
  "RecordNumber": 10,
  "Zipcode": 709,
  "ZipCodeType": "STANDARD",
  "City": "BDA SAN LUIS",
  "State": "PR"
}]

使用read.option("multiline","true")

代码语言:javascript
复制
# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
      .json("PyDataStudio/multiline-zipcode.json")
multiline_df.show()    

一次读取多个文件

还可以使用read.json()方法从不同路径读取多个 JSON 文件,只需通过逗号分隔传递所有具有完全限定路径的文件名,例如

代码语言:javascript
复制
# Read multiple files
df2 = spark.read.json(
    ['resources/zipcode1.json',
     'resources/zipcode2.json'])
df2.show()  

读取目录中的所有文件

只需将目录作为json()方法的路径传递给该方法,我们就可以将目录中的所有 JSON 文件读取到 DataFrame 中。

代码语言:javascript
复制
# Read all JSON files from a folder
df3 = spark.read.json("resources/*.json")
df3.show()

使用用户自定义架构读取文件

PySpark Schema 定义了数据的结构,换句话说,它是 DataFrame 的结构。PySpark SQL 提供 StructType 和 StructField 类以编程方式指定 DataFrame 的结构。

如果事先知道文件的架构并且不想使用inferSchema选项来指定列名和类型,请使用指定的自定义列名schema并使用schema选项键入。

使用 PySpark StructType 类创建自定义 Schema,下面我们启动这个类并使用添加方法通过提供列名、数据类型和可为空的选项向其添加列。

代码语言:javascript
复制
# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema) \
        .json("PyDataStudio/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()

使用 PySpark SQL 读取 JSON 文件

PySpark SQL 还提供了一种读取 JSON 文件的方法,方法是使用 spark.sqlContext.sql(“将 JSON 加载到临时视图”) 直接从读取文件创建临时视图

代码语言:javascript
复制
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path 'PyDataStudio/zipcodes.json')")
spark.sql("select * from zipcode").show()

读取 JSON 文件时的选项

NullValues

使用 nullValues 选项,可以将 JSON 中的字符串指定为 null。例如,如果想考虑一个值为 1900-01-01 的日期列,则在 DataFrame 上设置为 null。

DateFormat

选项 dateFormat用于设置输入 DateType 和 TimestampType 列的格式的选项。支持所有 java.text.SimpleDateFormat 格式。

注意:除了上述选项外,PySpark JSON 数据集还支持许多其他选项。

应用 DataFrame 转换

从 JSON 文件创建 PySpark DataFrame 后,可以应用 DataFrame 支持的所有转换和操作。

将 PySpark DataFrame 写入 JSON 文件

在 DataFrame 上使用 PySpark DataFrameWriter 对象 write 方法写入 JSON 文件。

代码语言:javascript
复制
df2.write.json("/PyDataStudio/spark_output/zipcodes.json")

编写 JSON 文件时的 PySpark 选项

在编写 JSON 文件时,可以使用多个选项。如 nullValuedateFormat

PySpark 保存模式

PySpark DataFrameWriter 还有一个方法 mode() 来指定 SaveMode;此方法的参数采用overwrite, append, ignore, errorifexists.

  • overwrite – 模式用于覆盖现有文件
  • append – 将数据添加到现有文件
  • ignore – 当文件已经存在时忽略写操作
  • errorifexistserror – 这是文件已存在时的默认选项,它返回错误
代码语言:javascript
复制
 df2.write.mode('Overwrite') \
       .json("/PyDataStudio/spark_output/zipcodes.json")

源代码供参考

此示例也可在GitHub PySpark 示例项目中获得以供参考。

代码语言:javascript
复制
# https://github.com/spark-examples/pyspark-examples/blob/master/pyspark-read-json.py
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

# Read JSON file into dataframe    
df = spark.read.json("PyDataStudio/zipcodes.json")
df.printSchema()
df.show()

# Read multiline json file
multiline_df = spark.read.option("multiline","true") \
      .json("PyDataStudio/multiline-zipcode.json")
multiline_df.show()

#Read multiple files
df2 = spark.read.json(
    ['PyDataStudio/zipcode2.json','PyDataStudio/zipcode1.json'])
df2.show()    

#Read All JSON files from a directory
df3 = spark.read.json("PyDataStudio/*.json")
df3.show()

# Define custom schema
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema) \
        .json("PyDataStudio/zipcodes.json")
df_with_schema.printSchema()
df_with_schema.show()

# Create a table from Parquet File
spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode3 USING json OPTIONS" + 
      " (path 'PyDataStudio/zipcodes.json')")
spark.sql("select * from zipcode3").show()

# PySpark write Parquet File
df2.write.mode('Overwrite').json("/PyDataStudio/spark_output/zipcodes.json")

相关阅读: PySpark 读写 CSV 文件到 DataFrame

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-07-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据STUDIO 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 从多行读取 JSON 文件
  • 一次读取多个文件
  • 读取目录中的所有文件
  • 使用用户自定义架构读取文件
  • 使用 PySpark SQL 读取 JSON 文件
  • 读取 JSON 文件时的选项
    • NullValues
      • DateFormat
      • 应用 DataFrame 转换
      • 将 PySpark DataFrame 写入 JSON 文件
        • 编写 JSON 文件时的 PySpark 选项
          • PySpark 保存模式
          • 源代码供参考
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档