前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PySpark教程:使用Python学习Apache Spark

PySpark教程:使用Python学习Apache Spark

作者头像
February
修改2018-11-26 15:17:35
10.2K0
修改2018-11-26 15:17:35
举报
文章被收录于专栏:技术翻译技术翻译

在以如此惊人的速度生成数据的世界中,在正确的时间对数据进行正确分析非常有用。实时处理大数据并执行分析的最令人惊奇的框架之一是Apache Spark,如果我们谈论现在用于处理复杂数据分析和数据修改任务的编程语言,我相信Python会超越这个图表。所以在这个PySpark教程中,我将讨论以下主题:

  • 什么是PySpark?
  • PySpark在业界
  • 为什么选择Python?
  • Spark RDDs
  • 使用PySpark进行机器学习

PySpark教程:什么是PySpark?

Apache Spark是一个快速的集群计算框架,用于处理,查询和分析大数据。基于内存计算,它具有优于其他几个大数据框架的优势。

开源社区最初是用Scala编程语言编写的,它开发了一个支持Apache Spark的神奇工具。PySpark通过其库Py4j帮助数据科学家与Apache Spark和Python中的RDD进行交互。有许多功能使PySpark成为比其他更好的框架:

  • 速度:比传统的大规模数据处理框架快100倍。
  • 强大的缓存:简单的编程层提供强大的缓存和磁盘持久性功能。
  • 部署:可以通过Mesos,Hadoop通过Yarn或Spark自己的集群管理器进行部署。
  • 实时:由于内存计算,实时计算和低延迟。
  • Polyglot: 支持Scala,Java,Python和R编程。

让我们继续我们的PySpark教程博客,看看Spark在业界的使用情况。

PySpark在业界

让我们继续我们的PySpark教程,看看Spark在业界的使用位置。

每个行业都围绕大数据展开,而大数据则涉及分析。那么让我们来看看使用Apache Spark的各个行业。

Media是向在线流媒体发展的最大行业之一。Netflix使用Apache Spark进行实时流处理,为其客户提供个性化的在线推荐。它每天处理4500亿个事件,流向服务器端应用程序。

财务是Apache Spark的实时处理发挥重要作用的另一个领域。银行正在使用Spark访问和分析社交媒体资料,以获取洞察力,从而帮助他们为信用风险评估,有针对性的广告和客户细分做出正确的业务决策。使用Spark还可以减少客户流失欺诈检测是涉及Spark的最广泛使用的机器学习领域之一。

医疗保健提供商正在使用Apache Spark来分析患者记录以及过去的临床数据,以确定哪些患者在从诊所出院后可能面临健康问题。Apache Spark用于基因组测序,以减少处理基因组数据所需的时间。

零售和电子商务是一个人们无法想象它在没有使用分析和有针对性的广告的情况下运行的行业。作为当今最大的电子商务平台之一,Alibabaruns是世界上一些最大的Spark职位,用于分析数PB的数据。阿里巴巴在图像数据中执行特征提取。易趣使用Apache Spark提供有针对性的优惠,增强客户体验并优化整体性能。

旅游业也使用Apache Spark。TripAdvisor是一家帮助用户计划完美旅行的领先旅游网站,它正在使用Apache Spark来加速其个性化的客户推荐。TripAdvisor使用Apache Spark通过比较数百个网站为数百万旅客提供建议,以便为其客户找到最佳的酒店价格。

这个PySpark教程的一个重要方面是理解为什么我们需要使用Python。为什么不使用Java,Scala或R?

易于学习:对于程序员来说,Python因其语法和标准库而相对容易学习。而且,它是一种动态类型语言,这意味着RDD可以保存多种类型的对象。

大量的库: Scala没有足够的数据科学工具和Python,如机器学习和自然语言处理。此外,Scala缺乏良好的可视化和本地数据转换。

巨大的社区支持: Python拥有一个全球社区,拥有数百万开发人员,可在数千个虚拟和物理位置进行在线和离线交互。

这个PySpark教程中最重要的主题之一是使用RDD。让我们了解一下RDD是什么。

Spark RDDs

当涉及到迭代分布式计算,即在计算中处理多个作业的数据时,我们需要在多个作业之间重用或共享数据。像Hadoop这样的早期框架在处理多个操作/作业时遇到了问题:

  • 将数据存储在HDFS等中间存储中。
  • 多个I / O作业使计算变慢。
  • 复制和序列化反过来使进程更慢。

RDD尝试通过启用容错分布式内存计算来解决所有问题。RDD是弹性分布式数据集的缩写RDD是一种分布式内存抽象,它允许程序员以容错的方式在大型集群上执行内存计算。它们是在一组计算机上分区的对象只读集合,如果分区丢失,可以重建这些对象。在RDD上执行了几个操作:

  • 转换:转换从现有数据集创建新数据集。懒惰的评价。
  • 操作:仅当在RDD上调用操作时, Spark才会强制执行计算。

让我们理解一些转换,动作和函数。

读取文件并显示前n个元素:

代码语言:javascript
复制
rdd = sc.textFile("file:///home/edureka/Desktop/Sample")
rdd.take(n)
代码语言:javascript
复制
[u'Deforestation is arising as the main environmental and social issue which has now taken the form of more than a powerful demon. ',
 u'We must know about the causes, effects and ways to solve the problems arisen because of the deforestation. ',
 u'We have provided many paragraphs, long and short essay on deforestation in order to help your kids and children to get aware about the problem as well as get participated in the essay writing competition in the school or outside the school. ',
 u'You can select any deforestation essay given below according to the class standard. ',
 u'Deforestation is arising as the major global problem to the society and environment.']

转换为小写和拆分:(降低和拆分)

代码语言:javascript
复制
def Func(lines):
lines = lines.lower()
lines = lines.split()
return lines
rdd1 = rdd.map(Func)
rdd1.take(5)
代码语言:javascript
复制
[[u'deforestation',
  u'is',
  u'arising',
  u'as',
  u'the',
  u'main',
  u'environmental',
  u'and',
  u'social',
  u'issue',
  u'which',
  u'has',
  u'now',
  u'taken',
.....
.
.
.
]

删除StopWords :(过滤器)

代码语言:javascript
复制
stop_words = ['a','all','the','as','is','am','an','and','be','been','from','had','I','I'd','why','with']
rdd2 = rdd1.filter(lambda z: z not in stop_words)
rdd2.take(10)
代码语言:javascript
复制
[u'deforestation',
 u'arising',
 u'main',
 u'environmental',
 u'social',
 u'issue',
 u'which',
 u'has',
 u'now',
 u'taken']

从1到500的数字总和 :(减少)

代码语言:javascript
复制
sum_rdd = sc.parallelize(range(1,500))
sum_rdd.reduce(lambda x,y: x+y)
代码语言:javascript
复制
124750

使用PySpark进行机器学习

继续我们的PySpark教程,让我们分析一些篮球数据并进行一些预测。所以,在这里我们将使用自1980年以来NBA所有球员的数据[引入3指针的年份]。

代码语言:javascript
复制
df = spark.read.option('header','true')\
.option('inferSchema','true')
.csv("file:///home/edureka/Downloads/season_totals.csv")
代码语言:javascript
复制
print(df.columns)
代码语言:javascript
复制
['_c0', 'player', 'pos', 'age', 'team_id', 'g', 'gs', 'mp', 'fg', 'fga', 'fg_pct', 'fg3', 'fg3a', 'fg3_pct', 'fg2', 'fg2a', 'fg2_pct', 'efg_pct', 'ft', 'fta', 'ft_pct', 'orb', 'drb', 'trb', 'ast', 'stl', 'blk', 'tov', 'pf', 'pts', 'yr']

排序玩家(OrderBy)和 toPandas:

在这里,我们根据一个赛季得分来排序球员。

代码语言:javascript
复制
df.orderBy('pts',ascending = False).limit(10).toPandas()[['yr','player','age','pts','fg3']]

使用DSL和matplotlib:

在这里,我们分析了每个赛季3次尝试的平均次数,在36分钟 的时间限制内[对应于足够休息的近似完整的NBA比赛的间隔]。我们使用3点射门次数(fg3a)和分钟数(mp)来计算此指标,然后使用matlplotlib绘制结果。

代码语言:javascript
复制
from pyspark.sql.functions import col
fga_py = df.groupBy('yr')\
.agg({'mp' : 'sum', 'fg3a' : 'sum'})
.select(col('yr'), (36*col('sum(fg3a)')/col('sum(mp)')).alias('fg3a_p36m'))\
.orderBy('yr')
from matplotlib import pyplot as plt
import seaborn as sns
plt.style.use('fivethirtyeight')
_df = fga_py.toPandas()
plt.plot(_df.yr,_df.fg3a_p36m, color = '#CD5C5C')
plt.xlabel('Year')
_=plt.title('Player average 3-point attempts (per 36 minutes)')
plt.annotate('3 pointer introduced', xy=(1980, .5), xytext=(1981, 1.1), fontsize = 9,
arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
plt.annotate('NBA moved in 3-point line', xy=(1996, 2.4), xytext=(1991.5, 2.7), fontsize = 9,
arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
plt.annotate('NBA moved back\n3-point line', xy=(1998, 2.), xytext=(1998.5, 2.4), fontsize = 9, arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))

线性回归和向量汇编程序:

我们可以在此曲线上拟合线性回归模型,以模拟未来5年的射击次数。我们必须使用VectorAssembler 函数将数据转换为单个列。这是一个必要条件为在MLlib线性回归API。

代码语言:javascript
复制
from pyspark.ml.feature import VectorAssembler
t = VectorAssembler(inputCols=['yr'], outputCol = 'features')
training = t.transform(fga_py)\
.withColumn('yr',fga_py.yr)\
.withColumn('label',fga_py.fg3a_p36m)
training.toPandas().head()

然后,我们使用转换后的数据构建线性回归模型对象。

代码语言:javascript
复制
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10)
model = lr.fit(training)

将训练模型应用于数据集:

我们将训练有素的模型对象模型应用于我们的原始训练集以及5年的未来数据:

代码语言:javascript
复制
from pyspark.sql.types import Row
# apply model for the 1979-80 season thru 2020-21 season
training_yrs = training.select('yr').rdd.map(lambda x: x[0]).collect()
training_y = training.select('fg3a_p36m').rdd.map(lambda x: x[0]).collect()
prediction_yrs = [2017, 2018, 2019, 2020, 2021]
all_yrs = training_yrs + prediction_yrs
# built testing DataFrame
test_rdd = sc.parallelize(all_yrs)
row = Row('yr')&lt
all_years_features = t.transform(test_rdd.map(row).toDF())
# apply linear regression model
df_results = model.transform(all_years_features).toPandas()

绘制最终预测:

然后,我们可以绘制结果并将图表保存在指定位置。

代码语言:javascript
复制
plt.plot(df_results.yr,df_results.prediction, linewidth = 2, linestyle = '--',color = '#224df7', label = 'L2 Fit')
plt.plot(training_yrs, training_y, color = '#f08080', label = None)
plt.xlabel('Year')
plt.ylabel('Number of attempts')
plt.legend(loc = 4)
_=plt.title('Player average 3-point attempts (per 36 minutes)')
plt.tight_layout()
plt.savefig("/home/edureka/Downloads/Images/REGRESSION.png")

而且,通过这个图,我们来到这个PySpark教程的末尾。

伙计们,就是这样!

我希望你们知道PySpark是什么,为什么Python最适合Spark,RDD和Pyspark机器学习的一瞥。恭喜,您不再是PySpark的新手了。

原文标题《PySpark Tutorial: Learn Apache Spark Using Python》

作者:Kislay Keshari

译者:February

不代表云加社区观点,更多详情请查看原文链接

本文系外文翻译,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系外文翻译前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • PySpark教程:什么是PySpark?
  • PySpark在业界
  • Spark RDDs
    • 读取文件并显示前n个元素:
      • 转换为小写和拆分:(降低和拆分)
        • 删除StopWords :(过滤器)
          • 从1到500的数字总和 :(减少)
          • 使用PySpark进行机器学习
            • 排序玩家(OrderBy)和 toPandas:
              • 使用DSL和matplotlib:
                • 线性回归和向量汇编程序:
                  • 将训练模型应用于数据集:
                    • 绘制最终预测:
                    相关产品与服务
                    大数据
                    全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档