Spark介绍
安装库
学习spark之前,我们需要安装Python环境,而且需要安装下边这两个关于Spark的库:
Apache Spark:安装Apache Spark非常简单。 您只需从官方网站下载该软件包即可。安装完成后可以在命令行测试是否安装成功,命令行cd进入spark安装路径查看spark版本的命令如下:
./pyspark --version
如果显示下列结果说明安装成功。
findspark库:为了更轻松地使用Apache Spark,我们需要安装findspark库。 它是一个非常简单的库,可以自动设置开发环境以导入Apache Spark库。findspark库可以直接用pip进行安装。
pip3 install findspark
Spark回归案例分析
安装好spark环境后,我们通过一个回归的例子来为大家演示如何用spark开始第一个spark小项目。本次数据集采用的是波士顿住房数据集,该数据集包含美国人口普查局收集的有关波士顿马萨诸塞州住房的信息。通过13个特征变量来对住房价格进行回归分析。
下边开始动手实现我们的项目
首先导入findspark库并通过传递Apache Spark文件夹的路径进行初始化。
# make pyspark importable as a regular library.
import findspark
findspark.init('/opt/spark')
每次使用Spark都需要先构建SparkSession,因此我们导入pyspark.sql库并初始化一个SparkSession 。
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
下面我们开始加载数据,这里我们使用spark.read.csv,而不是使用我们之前用的pandas。
data = spark.read.csv('./boston_housing.csv', header=True, inferSchema=True)
其中两个参数分别表示为:
加载完数据后,我们可以直接使用data.show()来查看数据信息:
根据上边显示的数据信息,我们需要将1-13列作为变量,MEDV列作为数据标签进行预测,所以接下来我们要创建特征数组,这个过程只需导入VectorAssembler类并传入特征变量的列名称即可,非常简单直接,具体代码如下:
feature_columns = data.columns[:-1] # here we omit the final column
from pyspark.ml.feature import VectorAssembler
assembler =VectorAssembler(inputCols=feature_columns,outputCol="features")
data_2 = assembler.transform(data)
接下来就是训练集与测试集的划分,这里我们可以直接使用RandomSplit函数,而不是之前sklearn中的train_test_split函数。
train,test = data_2.randomSplit([0.7,0.3])
训练与评估模型,与平时我们训练和评估模型一样,只不过在spark中我们使用的是spark为我们提供的算法函数。在spark中我们需要从pyspark.ml中导入算法函数,使用model.transform()函数进行预测,这个和之前用的model.predict()还是有区别的。spark模型训练与评估代码如下:
from pyspark.ml.regression import LinearRegression
algo = LinearRegression(featuresCol="features", labelCol="medv")
model = algo.fit(train)
evaluation_summary = model.evaluate(test)
predictions = model.transform(test)
完整代码
本次使用pyspark进行机器学习回归分析教程的完整代码如下所示,大家可以安装相应的库,然后下载数据按照教程一步一步跑出Apache Spark的入门尝鲜案例。
# make pyspark importable as a regular library.
import findspark
findspark.init('/opt/spark')
# create a SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# load data
data = spark.read.csv('./boston_housing.csv', header=True, inferSchema=True)
# create features vector
feature_columns = data.columns[:-1] # here we omit the final column
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=feature_columns,outputCol="features")
data_2 = assembler.transform(data)
# train/test split
train, test = data_2.randomSplit([0.7, 0.3])
# define the model
from pyspark.ml.regression import LinearRegression
algo = LinearRegression(featuresCol="features", labelCol="medv")
# train the model
model = algo.fit(train)
# evaluation
evaluation_summary = model.evaluate(test)
evaluation_summary.meanAbsoluteError
evaluation_summary.rootMeanSquaredError
evaluation_summary.r2
# predicting values
predictions = model.transform(test)
predictions.select(predictions.columns[13:]).show() # here I am filtering out some columns just for the figure to fit