前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark 与 DataFrame

Spark 与 DataFrame

作者头像
EmoryHuang
发布2022-10-31 16:25:17
1.7K0
发布2022-10-31 16:25:17
举报
文章被收录于专栏:EmoryHuang's Blog

Spark 与 DataFrame

前言

在 Spark 中,除了 RDD 这种数据容器外,还有一种更容易操作的一个分布式数据容器 DateFrame,它更像传统关系型数据库的二维表,除了包括数据自身以外还包括数据的结构信息(Schema),这就可以利用类似 SQL 的语言来进行数据访问。

Dataframe 读写

手动创建

代码语言:javascript
复制
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark").getOrCreate()

创建一个列表,列表的元素是字典,将其作为输出初始化 DataFrame:

代码语言:javascript
复制
data = [{"Category": 'A', "ID": 1, "Value": 121.44, "Truth": True},
        {"Category": 'B', "ID": 2, "Value": 300.01, "Truth": False},
        {"Category": 'C', "ID": 3, "Value": 10.99, "Truth": None},
        {"Category": 'A', "ID": 4, "Value": 33.87, "Truth": True}
        ]
df = spark.createDataFrame(data)

分别打印 Schema 和 DataFrame,可以看到创建 DataFrame 时自动分析了每列数据的类型

代码语言:javascript
复制
df.printSchema()
'''
root
 |-- Category: string (nullable = true)
 |-- ID: long (nullable = true)
 |-- Truth: boolean (nullable = true)
 |-- Value: double (nullable = true)
'''
df.show()
'''
+--------+---+-----+------+
|Category| ID|Truth| Value|
+--------+---+-----+------+
|       A|  1| true|121.44|
|       B|  2|false|300.01|
|       C|  3| null| 10.99|
|       A|  4| true| 33.87|
+--------+---+-----+------+
'''

读取文件创建

除了手动创建 DataFrame 之外,更常见的是通过读取文件,可以通过 spark.read 方法来实现,你也可以指定 options 添加额外选项。

代码语言:javascript
复制
df = spark.read.csv('hdfs://spark1:9000/data/test.csv', header=True, inferSchema=True)
# df = spark.read.options(inferSchema='True', header='True').csv('hdfs://spark1:9000/data/test.csv')
df.show()

类似的,你也可以直接从 jsonmysql等数据源读取数据。

写数据

write 的使用方法与 read 相同,可以通过 format 指定写入的格式,默认为 csv,也可以通过 options 添加额外选项。

代码语言:javascript
复制
# use write
df.write.csv('hdfs://spark1:9000/data/test.csv')

写数据时,也可以先将 Pandas-on-Spark Dataframe 转化为 Pandas Dataframe,然后在保存为 csv 文件

代码语言:javascript
复制
# Convert a Pandas-on-Spark Dataframe into a Pandas Dataframe
df.toPandas().to_csv(file_path, index=False)

DateFrame 操作

代码语言:javascript
复制
df.show()
+--------+---+-----+------+
|Category| ID|Truth| Value|
+--------+---+-----+------+
|       A|  1| true|121.44|
|       B|  2|false|300.01|
|       C|  3| null| 10.99|
|       A|  4| true| 33.87|
+--------+---+-----+------+

select()

代码语言:javascript
复制
df.select('Value').show()
'''
+------+
| Value|
+------+
|121.44|
|300.01|
| 10.99|
| 33.87|
+------+
'''

另外,你也可以使用标准的 SQL 语句来查询数据,例如:

代码语言:javascript
复制
df.createOrReplaceTempView('table')
spark.sql('select Value from table').show()

withColumn

whtiColumn 方法根据指定 colName 往 DataFrame 中新增一列,如果 colName 已存在,则会覆盖当前列。

代码语言:javascript
复制
df.withColumn('New', df['Value'] + 50).show()
'''
+--------+---+-----+------+------+
|Category| ID|Truth| Value|   New|
+--------+---+-----+------+------+
|       A|  1| true|121.44|171.44|
|       B|  2|false|300.01|350.01|
|       C|  3| null| 10.99| 60.99|
|       A|  4| true| 33.87| 83.87|
+--------+---+-----+------+------+
'''

groupby()

根据字段进行 group by 操作

代码语言:javascript
复制
# 按 Category 进行分类,求每类的平均值
df.groupby('Category').mean().show()
'''
+--------+-------+----------+
|Category|avg(ID)|avg(Value)|
+--------+-------+----------+
|       B|    2.0|    300.01|
|       C|    3.0|     10.99|
|       A|    2.5|    77.655|
+--------+-------+----------+
'''

其他常用操作

代码语言:javascript
复制
df.first()      # 获取第一行记录
df.head(5)      # 获取前 5 行记录
df.take(5)      # 获取前 5 行数据

df.count()      # 返回 DataFrame 的行数

df.drop('Truth')        # 删除指定列
df.drop_duplicates()    # 删除重复记录
df.dropna()             # 删除缺失值

df.orderBy('Value')     # 排序
df.filter(df['Value'] > 100)    # 过滤指定数据
df.withColumnRenamed('Value', 'Value_new')    # 重命名列

Pandas on Spark

在 Spark 3.2 版本中,可以通过 Pandas api 直接对 DataFrame 进行操作

代码语言:javascript
复制
# import Pandas-on-Spark
import pyspark.pandas as ps

# Create a DataFrame with Pandas-on-Spark
ps_df = ps.DataFrame(range(10))

# Convert a Pandas-on-Spark Dataframe into a Pandas Dataframe
pd_df = ps_df.to_pandas()

# Convert a Pandas Dataframe into a Pandas-on-Spark Dataframe
ps_df = ps.from_pandas(pd_df)

参考资料

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022-03-03,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark 与 DataFrame
    • 前言
      • Dataframe 读写
        • 手动创建
        • 读取文件创建
        • 写数据
      • DateFrame 操作
        • select()
        • withColumn
        • groupby()
        • 其他常用操作
      • Pandas on Spark
        • 参考资料
        相关产品与服务
        文件存储
        文件存储(Cloud File Storage,CFS)为您提供安全可靠、可扩展的共享文件存储服务。文件存储可与腾讯云服务器、容器服务、批量计算等服务搭配使用,为多个计算节点提供容量和性能可弹性扩展的高性能共享存储。腾讯云文件存储的管理界面简单、易使用,可实现对现有应用的无缝集成;按实际用量付费,为您节约成本,简化 IT 运维工作。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档