这是我的第82篇原创文章,关于PySpark和数据处理。
阅读完本文,你可以知道:
1 PySpark是什么
2 PySpark工作环境搭建
3 PySpark做数据处理工作
“我们要学习工具,也要使用工具。”
PySpark是一种适合在大规模数据上做探索性分析,机器学习模型和ETL工作的优秀语言。若是你熟悉了Python语言和pandas库,PySpark适合你进一步学习和使用,你可以用它来做大数据分析和建模。
PySpark = Python + Spark。Python语言是一种开源编程语言,可以用来做很多事情,我主要关注和使用Python语言做与数据相关的工作,比方说,数据读取,数据处理,数据分析,数据建模和数据可视化等。Spark是采用内存计算机制,是一个高速并行处理大数据的框架。Spark架构如下图所示。
我以Win10系统64位机,举例说明PySpark工作环境过程搭建。
第一步: 下载和安装好Anaconda数据科学套件。下载链接:https://www.anaconda.com/distribution/#windows,并创建自己的工作环境。我的工作环境是data_science。
第二步: 下载和安装Java软件。下载链接:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html。软件安装好后,并且在环境变量上做好配置。
第三步: 下载Spark压缩包,并解压缩。下载链接:https://spark.apache.org/downloads.html,如图所示。
下载好后,把它解压缩到自己指定的位置。我把它放在D:\DataScienceTools\spark下,重命名为spark_unzipped。这个文件夹下的目录结构如下图所示。
下载winutils.exe,并放到D:\DataScienceTools\spark\spark_unzipped\bin下。winutils.exe的下载链接:https://github.com/steveloughran/winutils/blob/master/ hadoop-2.7.1/bin/winutils.exe。
在Win10的环境变量做如下配置
第四步: 打开Anaconda Prompt,进入到data_science工作环境,安装findspark库.
pip install findspark
第五步:,测试PySpark是否可以正常工作,在Anaconda Prompt输入Jupyter notebook,新建一个notebook。输入如下测试语句,若是没有报错,表示可以正常使用PySpark。
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.getOrCreate()
print(spark)
小提示:每次使用PySpark的时候,请先运行初始化语句。
import findspark
findspark.init()
PySpark数据处理包括数据读取,探索性数据分析,数据选择,增加变量,分组处理,自定义函数等操作。
import findspark
findspark.init()
# 导入 SparkSession
from pyspark.sql import SparkSession
# 创建一个Spark会话对象
spark=SparkSession.builder.appName('data_processing').getOrCreate()
# 加载csv数据集
df=spark.read.csv('./datasets/sample_data.csv',inferSchema=True,header=True)
print((df.count(), len(df.columns)))
print(df.columns)
print(df.printSchema())
df.describe().show()
df.select('age', 'mobile').show(10)
df.filter(df['mobile']=='Vivo').show()
df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()
df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()
df.withColumn("age_after_10_yrs",(df["age"]+10)).show(10,False)
from pyspark.sql.types import StringType,DoubleType,IntegerType
df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)
df.groupBy('mobile').count().show(5,False)
df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)
df.groupBy('mobile').mean().show(5,False)
df.groupBy('mobile').max().show(5,False)
df.groupBy('mobile').min().show(5,False)
df.groupBy('mobile').sum().show(5,False)
df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)
一种情况,使用udf函数。
from pyspark.sql.functions import udf
def price_range(brand):
if brand in ['Samsung','Apple']:
return '高档价位'
elif brand =='MI':
return '中档价位'
else:
return '低档价位'
brand_udf=udf(price_range,StringType())
df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)
age_udf = udf(lambda age: "young" if age <= 30 else "senior", StringType())
df.withColumn("age_group", age_udf(df.age)).show(10,False)
另一种情况,使用pandas_udf函数。
from pyspark.sql.functions import pandas_udf
def remaining_yrs(age):
yrs_left=100-age
return yrs_left
length_udf = pandas_udf(remaining_yrs, IntegerType())
df.withColumn("yrs_left", length_udf(df['age'])).show(10,False)
关于PySpark做数据处理,你有什么问题,请留言。