前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[1014]PySpark使用笔记

[1014]PySpark使用笔记

作者头像
周小董
发布2021-07-14 14:09:58
1.3K0
发布2021-07-14 14:09:58
举报
文章被收录于专栏:python前行者

文章目录

背景

PySpark 通过 RPC server 来和底层的 Spark 做交互,通过 Py4j 来实现利用 API 调用 Spark 核心。 Spark (written in Scala) 速度比 Hadoop 快很多。Spark 配置可以各种参数,包括并行数目、资源占用以及数据存储的方式等等 Resilient Distributed Dataset (RDD) 可以被并行运算的 Spark 单元。它是 immutable, partitioned collection of elements

安装 PySpark

代码语言:javascript
复制
pip install pyspark

使用

连接 Spark Cluster

代码语言:javascript
复制
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("sparkAppExample")
sc = SparkContext(conf=conf)

Spark DataFrame

代码语言:javascript
复制
from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .master("local") \
          .appName("Word Count") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()
# getOrCreate表明可以视情况新建session或利用已有的session
# 如果使用 hive table 则加上 .enableHiveSupport()

Spark Config 条目

  • 配置大全网址

Spark Configuration

DataFrame 结构使用说明

PySpark 的 DataFrame 很像 pandas 里的 DataFrame 结构

读取本地文件

代码语言:javascript
复制
# Define the Data
import json
people = [
    {'name': 'Li', 'age': 12, 'address': {'country': 'China', 'city': 'Nanjing'}},
    {'name': 'Richard', 'age': 14, 'address': {'country': 'USA', 'city': 'Los Angeles'}},
    {'name': 'Jacob', 'age': 12, 'address': {'country': 'France', 'city': 'Paris'}},
    {'name': 'Manuel', 'age': 12, 'address': {'country': 'UK', 'city': 'London'}},
    {'name': 'Kio', 'age': 16, 'address': {'country': 'Japan', 'city': 'Tokyo'}},
]
json.dump(people, open('people.json', 'w'))

# Load Data into PySpark automatically
df = spark.read.load('people.json', format='json')

查看 DataFrame 结构

代码语言:javascript
复制
# Peek into dataframe
df
# DataFrame[address: struct<city:string,country:string>, age: bigint, name: string]

df.show(2)
"""
+------------------+---+-------+
|           address|age|   name|
+------------------+---+-------+
|  [Nanjing, China]| 12|     Li|
|[Los Angeles, USA]| 14|Richard|
+------------------+---+-------+
only showing top 2 rows
"""

df.columns
# ['address', 'age', 'name']

df.printSchema()
"""
root
 |-- address: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
"""

自定义 schema

代码语言:javascript
复制
from pyspark.sql.types import StructField, MapType, StringType, IntegerType, StructType
# 常用的还包括 DateType 等

people_schema= StructType([
    StructField('address', MapType(StringType(), StringType()), True),
    StructField('age', LongType(), True),
    StructField('name', StringType(), True),
])

df = spark.read.json('people.json', schema=people_schema)

df.show(1)
"""
+--------------------+---+----+
|             address|age|name|
+--------------------+---+----+
|[country -> China...| 12|  Li|
+--------------------+---+----+
only showing top 1 row
"""

df.dtypes
# [('address', 'map<string,string>'), ('age', 'bigint'), ('name', 'string')]

选择过滤数据

代码语言:javascript
复制
# Select column
address_df = df.select(['address.city'])
# DataFrame[city: string]

# Filter column with value
df.filter(df.age == 12).show()
"""
+----------------+---+------+
|         address|age|  name|
+----------------+---+------+
|[Nanjing, China]| 12|    Li|
| [Paris, France]| 12| Jacob|
|    [London, UK]| 12|Manuel|
+----------------+---+------+
"""

nj_df = df.filter('address.city == "Nanjing"')
nj_df.show()
"""
+--------------------+---+----+
|             address|age|name|
+--------------------+---+----+
|[country -> China...| 12|  Li|
+--------------------+---+----+
"""

# 选择数据头
df.head(2)
"""
[ 
  Row(address={'country': 'China', 'city': 'Nanjing'}, age=12, name='Li'), 
  Row(address={'country': 'USA', 'city': 'Los Angeles'}, age=14, name='Richard')
]
"""

提取数据

代码语言:javascript
复制
people = df.collect()
# return list of all Row class

len(people)
# 5

df.select('age').distinct().collect()
# [Row(age=12), Row(age=14), Row(age=16)]

Row & Column

代码语言:javascript
复制
# ---------------- row -----------------------

first_row = df.head()
# Row(address=Row(city='Nanjing', country='China'), age=12, name='Li')

# 读取行内某一列的属性值
first_row['age']           # 12
first_row.age              # 12
getattr(first_row, 'age')  # 12
first_row.address
# Row(city='Nanjing', country='China')

# -------------- column -----------------------

first_col = df[0]
first_col = df['adress']
# Column<b'address'>

# copy column[s]
address_copy = first_col.alias('address_copy')

# rename column / create new column
df.withColumnRenamed('age', 'birth_age')
df.withColumn('age_copy', df['age']).show(1)
"""
+----------------+---+----+--------+
|         address|age|name|age_copy|
+----------------+---+----+--------+
|[Nanjing, China]| 12|  Li|      12|
+----------------+---+----+--------+
only showing top 1 row
"""

df.withColumn('age_over_18',df['age'] > 18).show(1)
"""
+----------------+---+----+-----------+
|         address|age|name|age_over_18|
+----------------+---+----+-----------+
|[Nanjing, China]| 12|  Li|      false|
+----------------+---+----+-----------+
only showing top 1 row
"""

原始 sql 查询语句

代码语言:javascript
复制
df.createOrReplaceTempView("people")
sql_results = spark.sql("SELECT count(*) FROM people")
sql_results.show()
"""
+--------+
|count(1)|
+--------+
|       5|
+--------+
"""

pyspark.sql.function 示例

代码语言:javascript
复制
from pyspark.sql import functions as F
import datetime as dt

# 装饰器使用
@F.udf()
def calculate_birth_year(age):
    this_year = dt.datetime.today().year
    birth_year = this_year - age
    return birth_year 

calculated_df = df.select("*", calculate_birth_year('age').alias('birth_year'))
calculated_df .show(2)
"""
+------------------+---+-------+----------+
|           address|age|   name|birth_year|
+------------------+---+-------+----------+
|  [Nanjing, China]| 12|     Li|      2008|
|[Los Angeles, USA]| 14|Richard|      2006|
+------------------+---+-------+----------+
only showing top 2 rows
"""

# pyspark.sql.function 下很多函保活 udf(用户自定义函数)可以很好的并行处理大数据
# 这就是传说中的函数式编程,进度条显示可能如下:
# [Stage 41: >>>>>>>>>>>>>>>>>                    (0 + 1) / 1]

来源:https://zhuanlan.zhihu.com/p/171813899 https://blog.csdn.net/cymy001/article/details/78483723

  • 其它阅读:

pyspark 自定义聚合函数 UDAF:https://www.cnblogs.com/wdmx/p/10156500.html

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/07/10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 文章目录
  • 背景
  • 安装 PySpark
  • 使用
    • 连接 Spark Cluster
      • Spark DataFrame
        • Spark Config 条目
        • DataFrame 结构使用说明
          • 读取本地文件
            • 查看 DataFrame 结构
              • 自定义 schema
                • 选择过滤数据
                  • 提取数据
                    • Row & Column
                      • 原始 sql 查询语句
                        • pyspark.sql.function 示例
                        相关产品与服务
                        数据保险箱
                        数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
                        领券
                        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档