专栏首页深度学习自然语言处理【PySpark入门】手把手实现PySpark机器学习项目-回归算法

【PySpark入门】手把手实现PySpark机器学习项目-回归算法

摘要

PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到了非常大的作用。PySpark如何建模呢?这篇文章手把手带你入门PySpark,提前感受工业界的建模过程!

任务简介

电商中,了解用户在不同品类的各个产品的购买力是非常重要的!这将有助于他们为不同产品的客户创建个性化的产品。在这篇文章中,笔者在真实的数据集中手把手实现如何预测用户在不同品类的各个产品的购买行为。

如果有兴趣和笔者一步步实现项目,可以先根据上一篇文章的介绍中安装PySpark,并在网站中下载数据。

https://datahack.analyticsvidhya.com/contest/black-friday/

数据集简介

某零售公司想要了解针对不同类别的各种产品的顾客购买行为(购买量)。他们为上个月选定的大批量产品分享了各种客户的购买汇总。该数据集还包含客户人口统计信息(age, gender, marital status, city_type, stay_in_current_city),产品详细信息(product_id and product category)以及上个月的purchase_amount总数。现在,他们希望建立一个模型来预测客户对各种产品的购买量,这将有助于他们为不同产品的客户创建个性化的产品。

手把手实战项目

1. 导入数据

这里我们使用PySpark的读数据接口read.csv读取数据,和pandas读取数据接口迷之相似。

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("test") \
    .config("spark.some.config.option", "setting") \
    .getOrCreate()
    
train = spark.read.csv('./BlackFriday/train.csv', header=True, inferSchema=True)
test = spark.read.csv('./BlackFriday/test.csv', header=True,  inferSchema=True

2. 分析数据的类型

要查看Dataframe中列的类型,可以使用printSchema()方法。让我们在train上应用printSchema(),它将以树格式打印模式。

train.printSchema()
"""
root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
"""

3. 预览数据集

在PySpark中,我们使用head()方法预览数据集以查看Dataframe的前n行,就像python中的pandas一样。我们需要在head方法中提供一个参数(行数)。让我们看一下train的前5行。

train.head(5)
"""
[Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370), 
Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),  
Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422), 
Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057), 
Row(User_ID=1000002, Product_ID='P00285442', Gender='M', Age='55+', Occupation=16, City_Category='C', Stay_In_Current_City_Years='4+', Marital_Status=0, Product_Category_1=8, Product_Category_2=None, Product_Category_3=None, Purchase=7969)]
"""

要查看数据框架中的行数,我们需要调用方法count()。让我们核对一下train上的行数。Pandas和Spark的count方法是不同的。

4. 插补缺失值

通过调用drop()方法,可以检查train上非空数值的个数,并进行测试。默认情况下,drop()方法将删除包含任何空值的行。我们还可以通过设置参数“all”,当且仅当该行所有参数都为null时以删除该行。这与pandas上的drop方法类似。

train.na.drop('any').count(),test.na.drop('any').count()
"""
(166821, 71037)
"""

在这里,为了填充简单,我使用-1来填充train和test的null值。虽然这不是一个很好的填充方法,你可以选择其他的填充方式。

train = train.fillna(-1)
test = test.fillna(-1)

5. 分析数值特征

我们还可以使用describe()方法查看Dataframe列的各种汇总统计信息,它显示了数字变量的统计信息。要显示结果,我们需要调用show()方法。

train.describe().show()
"""
+-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|        Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|            550068|       550068|                    550068|             550068|            550068|            550068|            550068|           550068|
|   mean|1003028.8424013031|      null|  null|  null| 8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 6.419769919355425| 3.145214773446192|9263.968712959126|
| stddev| 1727.591585530871|      null|  null|  null|6.5226604873418115|         null|         0.989086680757309| 0.4917701263173259| 3.936211369201324| 6.565109781181374| 6.681038828257864|5023.065393820593|
|    min|           1000001| P00000142|     F|  0-17|                 0|            A|                         0|                  0|                 1|                -1|                -1|               12|
|    max|           1006040|  P0099942|     M|   55+|                20|            C|                        4+|                  1|                20|                18|                18|            23961|
+-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
"""

上面看起来好像比较乱,这里我们选择某一列来看看

让我们从一个列中选择一个名为“User_ID”的列,我们需要调用一个方法select并传递我们想要选择的列名。select方法将显示所选列的结果。我们还可以通过提供用逗号分隔的列名,从数据框架中选择多个列。

train.select('User_ID','Age').show(5)
"""
+-------+----+
|User_ID| Age|
+-------+----+
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000001|0-17|
|1000002| 55+|
+-------+----+
only showing top 5 rows
"""
6. 分析categorical特征

为了建立一个模型,我们需要在“train”和“test”中看到分类特征的分布。这里我只对Product_ID显示这个,但是我们也可以对任何分类特性执行相同的操作。让我们看看在“train”和“test”中Product_ID的不同类别的数量。这可以通过应用distinct()count()方法来实现。

train.select('Product_ID').distinct().count(), test.select('Product_ID').distinct().count()
"""
(3631, 3491)
"""

在计算“train”和“test”的不同值的数量后,我们可以看到“train”和“test”有更多的类别。让我们使用相减方法检查Product_ID的类别,这些类别正在"test"中,但不在“train”中。我们也可以对所有的分类特征做同样的处理。

diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))

diff_cat_in_train_test.distinct().count()
"""
(46, None)
"""

diff_cat_in_train_test.distinct().show(5)
"""
+----------+
|Product_ID|
+----------+
| P00322642|
| P00300142|
| P00077642|
| P00249942|
| P00294942|
+----------+
only showing top 5 rows
"""

以上你可以看到46个不同的类别是在"test"中,而不在"train"中。在这种情况下,我们要么收集更多关于它们的数据,要么跳过那些类别(无效类别)的“test”。

7. 将分类变量转换为标签

我们还需要通过在Product_ID上应用StringIndexer转换将分类列转换为标签,该转换将标签的Product_ID列编码为标签索引的列。

from pyspark.ml.feature import StringIndexer
plan_indexer = StringIndexer(inputCol = 'Product_ID', outputCol = 'product_id_trans')
labeller = plan_indexer.fit(train)

在上面,我们将fit()方法应用于“train”数据框架上,构建了一个标签。稍后我们将使用这个标签来转换我们的"train"和“test”。让我们在labeller的帮助下转换我们的train和test的Dataframe。我们需要调用transform方法。我们将把转换结果存储在Train1和Test1中.

Train1 = labeller.transform(train)
Test1 = labeller.transform(test)
Train1.show(2)
"""
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_id_trans|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|           766.0|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|           183.0|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+
only showing top 2 rows
"""

Train1.select('product_id_trans').show(2)
"""
+----------------+
|product_id_trans|
+----------------+
|           766.0|
|           183.0|
+----------------+
only showing top 2 rows
"""

上面已经显示了我们在以前的"train" Dataframe中成功的添加了一个转化后的列“product_id_trans”,("Train1" Dataframe)。

8. 选择特征来构建机器学习模型

首先,我们需要从pyspark.ml.feature导入RFormula;然后,我们需要在这个公式中指定依赖和独立的列;我们还必须为为features列和label列指定名称。

from pyspark.ml.feature import RFormula
formula = RFormula(formula="Purchase ~ Age+ Occupation +City_Category+Stay_In_Current_City_Years+Product_Category_1+Product_Category_2+ Gender",
                   featuresCol="features",labelCol="label")

在创建了这个公式之后,我们需要将这个公式应用到我们的Train1上,并通过这个公式转换Train1,Test1。让我们看看如何做到这一点,在拟合变换train1之后,

t1 = formula.fit(Train1)
train1 = t1.transform(Train1)
test1 = t1.transform(Test1)
train1.show(2)
"""
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_id_trans|            features|  label|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|           766.0|(16,[6,10,13,14],...| 8370.0|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|           183.0|(16,[6,10,13,14],...|15200.0|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+
only showing top 2 rows
"""

在应用了这个公式之后,我们可以看到train1和test1有两个额外的列,称为features和label,并对我们在公式中指定的列进行标记(featuresCol= features和labelCol= label)。直观上,train1和test1中的features列中的所有分类变量都被转换为数值,数值变量与之前应用ML时相同。我们还可以查看train1和test1中的列特性和标签。

train1.select('features').show(2)
"""
+--------------------+
|            features|
+--------------------+
|(16,[6,10,13,14],...|
|(16,[6,10,13,14],...|
+--------------------+
only showing top 2 rows
"""

train1.select('label').show(2)
"""
+-------+
|  label|
+-------+
| 8370.0|
|15200.0|
+-------+
only showing top 2 rows
"""

9. 建立机器学习模型

在应用RFormula和转换Dataframe之后,我们现在需要根据这些数据开发机器学习模型。我想为这个任务应用一个随机森林回归。让我们导入一个在pyspark.ml中定义的随机森林回归器。然后建立一个叫做rf的模型。我将使用随机森林算法的默认参数。

from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor()

在创建一个模型rf之后,我们需要将train1数据划分为train_cv和test_cv进行交叉验证。这里,我们将train1数据区域划分为train_cv的70%和test_cv的30%。

(train_cv, test_cv) = train1.randomSplit([0.7, 0.3])

在train_cv上建立模型,在test_cv上进行预测。结果将保存在predictions中。

model1 = rf.fit(train_cv)
predictions = model1.transform(test_cv)

10. 模型效果评估

让我们评估对test_cv的预测,看看rmse和mse是多少。

为了评估模型,我们需要从pyspark.ml.evaluation中导入RegressionEvaluator。我们必须为此创建一个对象。有一种方法叫 evaluate for evaluator ,它对模型求值。我们需要为此指定度量标准。

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()
mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" })
import numpy as np
np.sqrt(mse), mse
"""
(3832.4796474051345, 14687900.247774584)
"""

经过计算,我们可以看到我们的rmse是3827.767295494888。

现在,我们将在所有的train1数据集上再次训练一个模型。

model = rf.fit(train1)
predictions1 = model.transform(test1)

预测之后,我们得到测试集预测结果,并将其保存成csv文件。

df = predictions1.selectExpr("User_ID as User_ID", "Product_ID as Product_ID", 'prediction as Purchase')
df.toPandas().to_csv('./BlackFriday/submission.csv')

写入csv文件后(submission.csv)。我们可以上传我们的第一个解决方案来查看分数,我得到的分数是3844.20920145983。

总结

在本文中,我以一个真实案例介绍了PySpark建模流程。这只是本系列文章的开始。在接下来的几周,我将继续分享PySpark使用的教程。同时,如果你有任何问题,或者你想对我要讲的内容提出任何建议,欢迎留言。

本文分享自微信公众号 - 深度学习自然语言处理(zenRRan)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2019-10-16

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 【论文】哈工大SCIR Lab | EMNLP 2019 基于BERT的跨语言

    论文名称:Cross-Lingual BERT Transformation for Zero-Shot Dependency Parsing

    zenRRan
  • 文本分类有哪些论文中很少提及却对性能有重要影响的tricks?

    当时正好在刷一个比较有趣的task,结果发现奇奇怪怪的tricks可以带来不少的性能收益。再加上后来为了验证一个小idea跑了一堆公开的文本分类数据集,虽然id...

    zenRRan
  • 【一分钟论文】IJCAI2019 | Self-attentive Biaffine Dependency Parsing

    http://hlt.suda.edu.cn/~zhli/papers/liying_ijcai19_dp.pdf

    zenRRan
  • 手把手教你实现PySpark机器学习项目——回归算法

    在电商中,了解用户在不同品类的各个产品的购买力是非常重要的!这将有助于他们为不同产品的客户创建个性化的产品。在这篇文章中,笔者在真实的数据集中手把手实现如何预测...

    AI科技大本营
  • 【PySpark入门】手把手实现PySpark机器学习项目-回归算法

    PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到了非常大的作用。PySpark如何建模呢?这篇文章手把手带你入门PySpa...

    用户2769421
  • 大数据处理实践!手把手实现PySpark机器学习项目-回归算法

    PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到了非常大的作用。PySpark如何建模呢?这篇文章手把手带你入门PySpa...

    Datawhale
  • 【PySpark入门】手把手实现PySpark机器学习项目-回归算法

    PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到了非常大的作用。PySpark如何建模呢?这篇文章手把手带你入门PySpa...

    小小詹同学
  • 卷积神经网络的Helloworld例子

    下面是keras官方的卷积神经网络在github上的例子。和原版的唯一区别是:mnist的数据因为在国外(由于翻墙的原因,报错Exception: URL fe...

    马克java社区
  • 入门项目数字手写体识别:使用Keras完成CNN模型搭建

    对于图像分类任务而言,卷积神经网络(CNN)是目前最优的网络结构,没有之一。在面部识别、自动驾驶、物体检测等领域,CNN被广泛使用,并都取得了最优性能。对于绝大...

    用户3578099
  • 「微服务架构」基于NGINX的三种微服务参考架构

    所有六个博客,以及一个关于微服务应用程序的Web前端的博客,都被收集到一个免费的电子书中。

    首席架构师智库

扫码关注云+社区

领取腾讯云代金券

玩转腾讯云 有奖征文活动