PySpark做数据处理

这是我的第82篇原创文章,关于PySpark和数据处理。

阅读完本文,你可以知道:

1 PySpark是什么

2 PySpark工作环境搭建

3 PySpark做数据处理工作

“我们要学习工具,也要使用工具。”

1 PySpark简介

PySpark是一种适合在大规模数据上做探索性分析,机器学习模型和ETL工作的优秀语言。若是你熟悉了Python语言和pandas库,PySpark适合你进一步学习和使用,你可以用它来做大数据分析和建模。

PySpark = Python + Spark。Python语言是一种开源编程语言,可以用来做很多事情,我主要关注和使用Python语言做与数据相关的工作,比方说,数据读取,数据处理,数据分析,数据建模和数据可视化等。Spark是采用内存计算机制,是一个高速并行处理大数据的框架。Spark架构如下图所示。

  • 1:Spark SQL:用于处理结构化数据,可以看作是一个分布式SQL查询引擎。
  • 2:Spark Streaming:以可伸缩和容错的方式处理实时流数据,采用微批处理来读取和处理传入的数据流。
  • 3:Spark MLlib:以分布式的方式在大数据集上构建机器学习模型
  • 4:Spark GraphX/Graphframe:用于图分析和图并行处理

2 PySpark工作环境搭建

我以Win10系统64位机,举例说明PySpark工作环境过程搭建。

第一步: 下载和安装好Anaconda数据科学套件。下载链接:https://www.anaconda.com/distribution/#windows,并创建自己的工作环境。我的工作环境是data_science

第二步: 下载和安装Java软件。下载链接:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html。软件安装好后,并且在环境变量上做好配置。

第三步: 下载Spark压缩包,并解压缩。下载链接:https://spark.apache.org/downloads.html,如图所示。

下载好后,把它解压缩到自己指定的位置。我把它放在D:\DataScienceTools\spark下,重命名为spark_unzipped。这个文件夹下的目录结构如下图所示。

下载winutils.exe,并放到D:\DataScienceTools\spark\spark_unzipped\bin下。winutils.exe的下载链接:https://github.com/steveloughran/winutils/blob/master/ hadoop-2.7.1/bin/winutils.exe

在Win10的环境变量做如下配置

  • 1 创建变量:HADOOP_HOME和SPARK_HOME,都赋值:D:\DataScienceTools\spark\spark_unzipped
  • 2 创建变量:PYSPARK_DRIVER_PYTHON,赋值:Jupyter
  • 3 创建变量:DRIVER_PYTHON_OPTS,赋值:notebook
  • 4 在Path变量中新建并添加D:\DataScienceTools\spark\spark_unzipped\bin

第四步: 打开Anaconda Prompt,进入到data_science工作环境,安装findspark库.

pip install findspark

第五步:,测试PySpark是否可以正常工作,在Anaconda Prompt输入Jupyter notebook,新建一个notebook。输入如下测试语句,若是没有报错,表示可以正常使用PySpark。

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.getOrCreate()
print(spark)

小提示:每次使用PySpark的时候,请先运行初始化语句

import findspark
findspark.init()

3 PySpark数据处理

PySpark数据处理包括数据读取,探索性数据分析,数据选择,增加变量,分组处理,自定义函数等操作

3.1 数据读取

import findspark
findspark.init()

# 导入 SparkSession
from pyspark.sql import SparkSession
# 创建一个Spark会话对象
spark=SparkSession.builder.appName('data_processing').getOrCreate()
# 加载csv数据集
df=spark.read.csv('./datasets/sample_data.csv',inferSchema=True,header=True)

3.2 数据探索性分析

  • 1:结构分析
print((df.count(), len(df.columns)))
print(df.columns)
  • 2:元数据分析
print(df.printSchema())
  • 3:描述性统计分析
df.describe().show()

3.3 数据选择

  • 变量选择
df.select('age', 'mobile').show(10)
  • 样本选择
df.filter(df['mobile']=='Vivo').show()
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()
  • 样本和变量选择
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()

3.4 增加变量

df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)
from pyspark.sql.types import StringType,DoubleType,IntegerType
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)

3.5 分组处理

  • 计数运算
df.groupBy('mobile').count().show(5,False)
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
  • 均值运算
df.groupBy('mobile').mean().show(5,False)
  • 最大值运算
df.groupBy('mobile').max().show(5,False)
  • 最小值运算
df.groupBy('mobile').min().show(5,False)
  • 求和运算
df.groupBy('mobile').sum().show(5,False)
  • 对特定列做聚合运算
df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)

3.6 用户自定义函数使用

一种情况,使用udf函数。

  • 具有函数名
from pyspark.sql.functions import udf
def price_range(brand):
    if brand in ['Samsung','Apple']:
        return '高档价位'
    elif brand =='MI':
        return '中档价位'
    else:
        return '低档价位'
brand_udf=udf(price_range,StringType())
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)
  • 匿名函数
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
df.withColumn("age_group", age_udf(df.age)).show(10,False)

另一种情况,使用pandas_udf函数。

from pyspark.sql.functions import pandas_udf
def remaining_yrs(age):
    yrs_left=100-age

    return yrs_left
    
length_udf = pandas_udf(remaining_yrs, IntegerType())
df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)

关于PySpark做数据处理,你有什么问题,请留言。

本文分享自微信公众号 - 数据科学与人工智能(DS_AI_shujuren)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2020-03-22

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【Python环境】python 中数据分析几个比较常用的方法

    1,表头或是excel的索引如果是中文的话,输出会出错 ? 解决方法:python的版本问题!换成python3就自动解决了!当然也有其他的方法,这里就不再深究...

    陆勤_数据人网
  • Python代码|Python做数据可视化的代码

    从代码中学习Python知识和Python与数据相关的知识,是一个有效的方法。例如:想了解Python做数据可视化的工作。我们可以从互联网找一些Python做数...

    陆勤_数据人网
  • 【Python环境】使用Python Pandas处理亿级数据

    在数据分析领域,最热门的莫过于Python和R语言,此前有一篇文章《别老扯什么Hadoop了,你的数据根本不够大》指出:只有在超过5TB数据量的规模下,Hado...

    陆勤_数据人网
  • 关于python的索引

    写了几天程序,深刻地感受到python语言中(特指numpy、pandas)对于数据强大的索引能力。特此总结一下:

    py3study
  • Python 数据可视化神器分享:pyecharts

    我们都知道python上的一款可视化工具matplotlib,而前些阵子做一个Spark项目的时候用到了百度开源的一个可视化JS工具-Echarts,可视化类型...

    公众号---志学Python
  • zone watermark水位控制

    本节我们来分析下zone的水位控制,在zone那一节中,我们将重点放在了free_area中,故意没有分析zone中的水位控制,本节在重点分析zone中的水位控...

    DragonKingZhu
  • 在Power Pivot中计算众数

    我们知道在Excel中以及Power Query中都有众数的函数,但是Power Pivot中却没有。

    逍遥之
  • redis_3.0.7_sds.c_sdsAllocSize()

    青木
  • Python-科学计算-pandas-01-df获取部分数据

    系统:Windows 7 语言版本:Anaconda3-4.3.0.1-Windows-x86_64 编辑器:pycharm-community-2016.3....

    zishendianxia
  • 正则表达式-3.位置匹配

    ^匹配字符串的开头。 $匹配字符串的结尾。 注意:^出现在一个字符串集合中时(左方括号[后面),表示求非。

    悠扬前奏

扫码关注云+社区

领取腾讯云代金券