前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PySpark做数据处理

PySpark做数据处理

作者头像
陆勤_数据人网
发布2020-03-25 17:03:31
4.2K0
发布2020-03-25 17:03:31
举报
文章被收录于专栏:数据科学与人工智能

这是我的第82篇原创文章,关于PySpark和数据处理。

阅读完本文,你可以知道:

1 PySpark是什么

2 PySpark工作环境搭建

3 PySpark做数据处理工作

“我们要学习工具,也要使用工具。”

1 PySpark简介

PySpark是一种适合在大规模数据上做探索性分析,机器学习模型和ETL工作的优秀语言。若是你熟悉了Python语言和pandas库,PySpark适合你进一步学习和使用,你可以用它来做大数据分析和建模。

PySpark = Python + Spark。Python语言是一种开源编程语言,可以用来做很多事情,我主要关注和使用Python语言做与数据相关的工作,比方说,数据读取,数据处理,数据分析,数据建模和数据可视化等。Spark是采用内存计算机制,是一个高速并行处理大数据的框架。Spark架构如下图所示。

  • 1:Spark SQL:用于处理结构化数据,可以看作是一个分布式SQL查询引擎。
  • 2:Spark Streaming:以可伸缩和容错的方式处理实时流数据,采用微批处理来读取和处理传入的数据流。
  • 3:Spark MLlib:以分布式的方式在大数据集上构建机器学习模型
  • 4:Spark GraphX/Graphframe:用于图分析和图并行处理

2 PySpark工作环境搭建

我以Win10系统64位机,举例说明PySpark工作环境过程搭建。

第一步: 下载和安装好Anaconda数据科学套件。下载链接:https://www.anaconda.com/distribution/#windows,并创建自己的工作环境。我的工作环境是data_science

第二步: 下载和安装Java软件。下载链接:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html。软件安装好后,并且在环境变量上做好配置。

第三步: 下载Spark压缩包,并解压缩。下载链接:https://spark.apache.org/downloads.html,如图所示。

下载好后,把它解压缩到自己指定的位置。我把它放在D:\DataScienceTools\spark下,重命名为spark_unzipped。这个文件夹下的目录结构如下图所示。

下载winutils.exe,并放到D:\DataScienceTools\spark\spark_unzipped\bin下。winutils.exe的下载链接:https://github.com/steveloughran/winutils/blob/master/ hadoop-2.7.1/bin/winutils.exe

在Win10的环境变量做如下配置

  • 1 创建变量:HADOOP_HOME和SPARK_HOME,都赋值:D:\DataScienceTools\spark\spark_unzipped
  • 2 创建变量:PYSPARK_DRIVER_PYTHON,赋值:Jupyter
  • 3 创建变量:DRIVER_PYTHON_OPTS,赋值:notebook
  • 4 在Path变量中新建并添加D:\DataScienceTools\spark\spark_unzipped\bin

第四步: 打开Anaconda Prompt,进入到data_science工作环境,安装findspark库.

代码语言:javascript
复制
pip install findspark

第五步:,测试PySpark是否可以正常工作,在Anaconda Prompt输入Jupyter notebook,新建一个notebook。输入如下测试语句,若是没有报错,表示可以正常使用PySpark。

代码语言:javascript
复制
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.getOrCreate()
print(spark)

小提示:每次使用PySpark的时候,请先运行初始化语句

代码语言:javascript
复制
import findspark
findspark.init()

3 PySpark数据处理

PySpark数据处理包括数据读取,探索性数据分析,数据选择,增加变量,分组处理,自定义函数等操作

3.1 数据读取

代码语言:javascript
复制
import findspark
findspark.init()

# 导入 SparkSession
from pyspark.sql import SparkSession
# 创建一个Spark会话对象
spark=SparkSession.builder.appName('data_processing').getOrCreate()
# 加载csv数据集
df=spark.read.csv('./datasets/sample_data.csv',inferSchema=True,header=True)

3.2 数据探索性分析

  • 1:结构分析
代码语言:javascript
复制
print((df.count(), len(df.columns)))
print(df.columns)
  • 2:元数据分析
代码语言:javascript
复制
print(df.printSchema())
  • 3:描述性统计分析
代码语言:javascript
复制
df.describe().show()

3.3 数据选择

  • 变量选择
代码语言:javascript
复制
df.select('age', 'mobile').show(10)
  • 样本选择
代码语言:javascript
复制
df.filter(df['mobile']=='Vivo').show()
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()
  • 样本和变量选择
代码语言:javascript
复制
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()

3.4 增加变量

代码语言:javascript
复制
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)
from pyspark.sql.types import StringType,DoubleType,IntegerType
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)

3.5 分组处理

  • 计数运算
代码语言:javascript
复制
df.groupBy('mobile').count().show(5,False)
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
  • 均值运算
代码语言:javascript
复制
df.groupBy('mobile').mean().show(5,False)
  • 最大值运算
代码语言:javascript
复制
df.groupBy('mobile').max().show(5,False)
  • 最小值运算
代码语言:javascript
复制
df.groupBy('mobile').min().show(5,False)
  • 求和运算
代码语言:javascript
复制
df.groupBy('mobile').sum().show(5,False)
  • 对特定列做聚合运算
代码语言:javascript
复制
df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)

3.6 用户自定义函数使用

一种情况,使用udf函数。

  • 具有函数名
代码语言:javascript
复制
from pyspark.sql.functions import udf
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return '高档价位'
    elif brand =='MI':
        return '中档价位'
    else:
        return '低档价位'
brand_udf=udf(price_range,StringType())
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)
  • 匿名函数
代码语言:javascript
复制
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
df.withColumn("age_group", age_udf(df.age)).show(10,False)

另一种情况,使用pandas_udf函数。

代码语言:javascript
复制
from pyspark.sql.functions import pandas_udf
def remaining_yrs(age):
    yrs_left=100-age

    return yrs_left
    
length_udf = pandas_udf(remaining_yrs, IntegerType())
df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)

关于PySpark做数据处理,你有什么问题,请留言。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-03-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据科学与人工智能 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1 PySpark简介
  • 2 PySpark工作环境搭建
  • 3 PySpark数据处理
    • 3.1 数据读取
      • 3.2 数据探索性分析
        • 3.3 数据选择
          • 3.4 增加变量
            • 3.5 分组处理
              • 3.6 用户自定义函数使用
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档