专栏首页SAMsharePySpark入门级学习教程,框架思维(上)

PySpark入门级学习教程,框架思维(上)

为什么要学习Spark?作为数据从业者多年,个人觉得Spark已经越来越走进我们的日常工作了,无论是使用哪种编程语言,Python、Scala还是Java,都会或多或少接触到Spark,它可以让我们能够用到集群的力量,可以对BigData进行高效操作,实现很多之前由于计算资源而无法轻易实现的东西。网上有很多关于Spark的好处,这里就不做过多的赘述,我们直接进入这篇文章的正文!

关于PySpark,我们知道它是Python调用Spark的接口,我们可以通过调用Python API的方式来编写Spark程序,它支持了大多数的Spark功能,比如SparkDataFrame、Spark SQL、Streaming、MLlib等等。只要我们了解Python的基本语法,那么在Python里调用Spark的力量就显得十分easy了。下面我将会从相对宏观的层面介绍一下PySpark,让我们对于这个神器有一个框架性的认识,知道它能干什么,知道去哪里寻找问题解答,争取看完这篇文章可以让我们更加丝滑地入门PySpark。话不多说,马上开始!

? 目录:

  • 安装指引
  • 基础概念
  • 常用函数
  • Sparksql使用
  • 调优思路
  • 学习资源推荐

? 安装指引:

安装这块本文就不展开具体的步骤了,毕竟大家的机子环境都不尽相同。不过可以简单说几点重要的步骤,然后节末放上一些安装示例供大家参考。

1)要使用PySpark,机子上要有Java开发环境

2)环境变量记得要配置完整

3)Mac下的/usr/local/ 路径一般是隐藏的,PyCharm配置py4j和pyspark的时候可以使用 shift+command+G 来使用路径访问。

4)Mac下如果修改了 ~/.bash_profile 的话,记得要重启下PyCharm才会生效的哈

5)版本记得要搞对,保险起见Java的jdk版本选择低版本(别问我为什么知道),我选择的是Java8.

下面是一些示例,可以参考下:

1)Mac下安装spark,并配置pycharm-pyspark完整教程

https://blog.csdn.net/shiyutianming/article/details/99946797

2)virtualBox里安装开发环境

https://www.bilibili.com/video/BV1i4411i79a?p=3

3)快速搭建spark开发环境,云哥项目

https://github.com/lyhue1991/eat_pyspark_in_10_days

? 基础概念

关于Spark的基础概念,我在先前的文章里也有写过,大家可以一起来回顾一下 《想学习Spark?先带你了解一些基础的知识》。作为补充,今天在这里也介绍一些在Spark中会经常遇见的专有名词。

?‍♀️ Q1: 什么是RDD

RDD的全称是 Resilient Distributed Datasets,这是Spark的一种数据抽象集合,它可以被执行在分布式的集群上进行各种操作,而且有较强的容错机制。RDD可以被分为若干个分区,每一个分区就是一个数据集片段,从而可以支持分布式计算。

?‍♀️ Q2: RDD运行时相关的关键名词

简单来说可以有 Client、Job、Master、Worker、Driver、Stage、Task以及Executor,这几个东西在调优的时候也会经常遇到的。

Client:指的是客户端进程,主要负责提交job到Master; Job:Job来自于我们编写的程序,Application包含一个或者多个job,job包含各种RDD操作; Master:指的是Standalone模式中的主控节点,负责接收来自Client的job,并管理着worker,可以给worker分配任务和资源(主要是driver和executor资源); Worker:指的是Standalone模式中的slave节点,负责管理本节点的资源,同时受Master管理,需要定期给Master回报heartbeat(心跳),启动Driver和Executor; Driver:指的是 job(作业)的主进程,一般每个Spark作业都会有一个Driver进程,负责整个作业的运行,包括了job的解析、Stage的生成、调度Task到Executor上去执行; Stage:中文名 阶段,是job的基本调度单位,因为每个job会分成若干组Task,每组任务就被称为 Stage; Task:任务,指的是直接运行在executor上的东西,是executor上的一个线程; Executor:指的是 执行器,顾名思义就是真正执行任务的地方了,一个集群可以被配置若干个Executor,每个Executor接收来自Driver的Task,并执行它(可同时执行多个Task)。

?‍♀️ Q3: 什么是DAG

全称是 Directed Acyclic Graph,中文名是有向无环图。Spark就是借用了DAG对RDD之间的关系进行了建模,用来描述RDD之间的因果依赖关系。因为在一个Spark作业调度中,多个作业任务之间也是相互依赖的,有些任务需要在一些任务执行完成了才可以执行的。在Spark调度中就是有DAGscheduler,它负责将job分成若干组Task组成的Stage。

?‍♀️ Q4: Spark的部署模式有哪些

主要有local模式、Standalone模式、Mesos模式、YARN模式。

更多的解释可以参考这位老哥的解释。https://www.jianshu.com/p/3b8f85329664

?‍♀️ Q5: Shuffle操作是什么

Shuffle指的是数据从Map端到Reduce端的数据传输过程,Shuffle性能的高低直接会影响程序的性能。因为Reduce task需要跨节点去拉在分布在不同节点上的Map task计算结果,这一个过程是需要有磁盘IO消耗以及数据网络传输的消耗的,所以需要根据实际数据情况进行适当调整。另外,Shuffle可以分为两部分,分别是Map阶段的数据准备与Reduce阶段的数据拷贝处理,在Map端我们叫Shuffle Write,在Reduce端我们叫Shuffle Read。

?‍♀️ Q6: 什么是惰性执行

这是RDD的一个特性,在RDD中的算子可以分为Transform算子和Action算子,其中Transform算子的操作都不会真正执行,只会记录一下依赖关系,直到遇见了Action算子,在这之前的所有Transform操作才会被触发计算,这就是所谓的惰性执行。具体哪些是Transform和Action算子,可以看下一节。

? 常用函数

从网友的总结来看比较常用的算子大概可以分为下面几种,所以就演示一下这些算子,如果需要看更多的算子或者解释,建议可以移步到官方API文档去Search一下哈。

pyspark.RDD:http://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.html#pyspark.RDD

图来自 edureka 的pyspark入门教程

下面我们用自己创建的RDD:sc.parallelize(range(1,11),4)

import os
import pyspark
from pyspark import SparkContext, SparkConf

conf = SparkConf().setAppName("test_SamShare").setMaster("local[4]")
sc = SparkContext(conf=conf)

# 使用 parallelize方法直接实例化一个RDD
rdd = sc.parallelize(range(1,11),4) # 这里的 4 指的是分区数量
rdd.take(100)
# [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]


"""
----------------------------------------------
                Transform算子解析
----------------------------------------------
"""
# 以下的操作由于是Transform操作,因为我们需要在最后加上一个collect算子用来触发计算。
# 1. map: 和python差不多,map转换就是对每一个元素进行一个映射
rdd = sc.parallelize(range(1, 11), 4)
rdd_map = rdd.map(lambda x: x*2)
print("原始数据:", rdd.collect())
print("扩大2倍:", rdd_map.collect())
# 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 扩大2倍: [2, 4, 6, 8, 10, 12, 14, 16, 18, 20]

# 2. flatMap: 这个相比于map多一个flat(压平)操作,顾名思义就是要把高维的数组变成一维
rdd2 = sc.parallelize(["hello SamShare", "hello PySpark"])
print("原始数据:", rdd2.collect())
print("直接split之后的map结果:", rdd2.map(lambda x: x.split(" ")).collect())
print("直接split之后的flatMap结果:", rdd2.flatMap(lambda x: x.split(" ")).collect())
# 直接split之后的map结果: [['hello', 'SamShare'], ['hello', 'PySpark']]
# 直接split之后的flatMap结果: ['hello', 'SamShare', 'hello', 'PySpark']

# 3. filter: 过滤数据
rdd = sc.parallelize(range(1, 11), 4)
print("原始数据:", rdd.collect())
print("过滤奇数:", rdd.filter(lambda x: x % 2 == 0).collect())
# 原始数据: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 过滤奇数: [2, 4, 6, 8, 10]

# 4. distinct: 去重元素
rdd = sc.parallelize([2, 2, 4, 8, 8, 8, 8, 16, 32, 32])
print("原始数据:", rdd.collect())
print("去重数据:", rdd.distinct().collect())
# 原始数据: [2, 2, 4, 8, 8, 8, 8, 16, 32, 32]
# 去重数据: [4, 8, 16, 32, 2]

# 5. reduceByKey: 根据key来映射数据
from operator import add
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print("原始数据:", rdd.collect())
print("原始数据:", rdd.reduceByKey(add).collect())
# 原始数据: [('a', 1), ('b', 1), ('a', 1)]
# 原始数据: [('b', 1), ('a', 2)]

# 6. mapPartitions: 根据分区内的数据进行映射操作
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator):
    yield sum(iterator)
print(rdd.collect())
print(rdd.mapPartitions(f).collect())
# [1, 2, 3, 4]
# [3, 7]

# 7. sortBy: 根据规则进行排序
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortBy(lambda x: x[0]).collect())
print(sc.parallelize(tmp).sortBy(lambda x: x[1]).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
# [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

# 8. subtract: 数据集相减, Return each value in self that is not contained in other.
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])
print(sorted(x.subtract(y).collect()))
# [('a', 1), ('b', 4), ('b', 5)]

# 9. union: 合并两个RDD
rdd = sc.parallelize([1, 1, 2, 3])
print(rdd.union(rdd).collect())
# [1, 1, 2, 3, 1, 1, 2, 3]

# 10. intersection: 取两个RDD的交集,同时有去重的功效
rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5, 2, 3])
rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
print(rdd1.intersection(rdd2).collect())
# [1, 2, 3]

# 11. cartesian: 生成笛卡尔积
rdd = sc.parallelize([1, 2])
print(sorted(rdd.cartesian(rdd).collect()))
# [(1, 1), (1, 2), (2, 1), (2, 2)]

# 12. zip: 拉链合并,需要两个RDD具有相同的长度以及分区数量
x = sc.parallelize(range(0, 5))
y = sc.parallelize(range(1000, 1005))
print(x.collect())
print(y.collect())
print(x.zip(y).collect())
# [0, 1, 2, 3, 4]
# [1000, 1001, 1002, 1003, 1004]
# [(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]

# 13. zipWithIndex: 将RDD和一个从0开始的递增序列按照拉链方式连接。
rdd_name = sc.parallelize(["LiLei", "Hanmeimei", "Lily", "Lucy", "Ann", "Dachui", "RuHua"])
rdd_index = rdd_name.zipWithIndex()
print(rdd_index.collect())
# [('LiLei', 0), ('Hanmeimei', 1), ('Lily', 2), ('Lucy', 3), ('Ann', 4), ('Dachui', 5), ('RuHua', 6)]

# 14. groupByKey: 按照key来聚合数据
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.collect())
print(sorted(rdd.groupByKey().mapValues(len).collect()))
print(sorted(rdd.groupByKey().mapValues(list).collect()))
# [('a', 1), ('b', 1), ('a', 1)]
# [('a', 2), ('b', 1)]
# [('a', [1, 1]), ('b', [1])]

# 15. sortByKey:
tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
print(sc.parallelize(tmp).sortByKey(True, 1).collect())
# [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

# 16. join:
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2), ("a", 3)])
print(sorted(x.join(y).collect()))
# [('a', (1, 2)), ('a', (1, 3))]

# 17. leftOuterJoin/rightOuterJoin
x = sc.parallelize([("a", 1), ("b", 4)])
y = sc.parallelize([("a", 2)])
print(sorted(x.leftOuterJoin(y).collect()))
# [('a', (1, 2)), ('b', (4, None))]

"""
----------------------------------------------
                Action算子解析
----------------------------------------------
"""
# 1. collect: 指的是把数据都汇集到driver端,便于后续的操作
rdd = sc.parallelize(range(0, 5))
rdd_collect = rdd.collect()
print(rdd_collect)
# [0, 1, 2, 3, 4]

# 2. first: 取第一个元素
sc.parallelize([2, 3, 4]).first()
# 2

# 3. collectAsMap: 转换为dict,使用这个要注意了,不要对大数据用,不然全部载入到driver端会爆内存
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
m
# {1: 2, 3: 4}

# 4. reduce: 逐步对两个元素进行操作
rdd = sc.parallelize(range(10),5)
print(rdd.reduce(lambda x,y:x+y))
# 45

# 5. countByKey/countByValue:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(sorted(rdd.countByKey().items()))
print(sorted(rdd.countByValue().items()))
# [('a', 2), ('b', 1)]
# [(('a', 1), 2), (('b', 1), 1)]

# 6. take: 相当于取几个数据到driver端
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
print(rdd.take(5))
# [('a', 1), ('b', 1), ('a', 1)]

# 7. saveAsTextFile: 保存rdd成text文件到本地
text_file = "./data/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)

# 8. takeSample: 随机取数
rdd = sc.textFile("./test/data/hello_samshare.txt", 4)  # 这里的 4 指的是分区数量
rdd_sample = rdd.takeSample(True, 2, 0)  # withReplacement 参数1:代表是否是有放回抽样
rdd_sample

# 9. foreach: 对每一个元素执行某种操作,不生成新的RDD
rdd = sc.parallelize(range(10), 5)
accum = sc.accumulator(0)
rdd.foreach(lambda x: accum.add(x))
print(accum.value)
# 45

Sam:未完待续... 文章较长,分上下两篇文章来写哈。

?学习资源推荐:

1)edureka about PySpark Tutorial

印度老哥的课程,B站可直接看,不过口音略难听懂不过还好有字幕。

https://www.bilibili.com/video/BV1i4411i79a?p=1

2)eat_pyspark_in_10_days

梁云大哥的课程,讲得超级清晰,建议精读。

https://github.com/lyhue1991/eat_pyspark_in_10_days

3)官方文档

http://spark.apache.org/docs/latest/api/python/reference/index.html

本文分享自微信公众号 - SAMshare(gh_8528ce7b7e80),作者:samshare

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-04-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • PySpark入门级学习教程,框架思维(中)

    在讲Spark SQL前,先解释下这个模块。这个模块是Spark中用来处理结构化数据的,提供一个叫SparkDataFrame的东西并且自动解析为分布式SQL查...

    Sam Gor
  • 3万字长文,PySpark入门级学习教程,框架思维

    关于PySpark,我们知道它是Python调用Spark的接口,我们可以通过调用Python API的方式来编写Spark程序,它支持了大多数的Spark功能...

    Sam Gor
  • PySpark教程:使用Python学习Apache Spark

    在以如此惊人的速度生成数据的世界中,在正确的时间对数据进行正确分析非常有用。实时处理大数据并执行分析的最令人惊奇的框架之一是Apache Spark,如果我们谈...

    February
  • [源码解析] 深度学习分布式训练框架 horovod (8) --- on spark

    Horovod 是Uber于2017年发布的一个易于使用的高性能的分布式训练框架,在业界得到了广泛应用。

    罗西的思考
  • Apache Spark MLlib入门体验教程

    学习spark之前,我们需要安装Python环境,而且需要安装下边这两个关于Spark的库:

    深度学习与Python
  • 想学习Spark?先带你了解一些基础的知识

    之前也学习过一阵子的Spark了,是时候先输出一些知识内容了,一来加深印象,二来也可以分享知识,一举多得,今天这篇主要是在学习实验楼的一门课程中自己记下来的笔记...

    Sam Gor
  • 独家 | 一文读懂PySpark数据框(附实例)

    本文中我们将探讨数据框的概念,以及它们如何与PySpark一起帮助数据分析员来解读大数据集。

    数据派THU
  • 第1天:PySpark简介及环境搭建

    Apache Spark是Scala语言实现的一个计算框架。为了支持Python语言使用Spark,Apache Spark社区开发了一个工具PySpark。利...

    会呼吸的Coder
  • 金色传说,开源教程!属于算法的大数据工具-pyspark

    spark是目前大数据领域的核心技术栈,许多从事数据相关工作的小伙伴都想驯服它,变成"驯龙高手",以便能够驾驭成百上千台机器组成的集群之龙来驰骋于大数据之海。

    Sam Gor
  • PySpark SQL 相关知识介绍

    大数据是这个时代最热门的话题之一。但是什么是大数据呢?它描述了一个庞大的数据集,并且正在以惊人的速度增长。大数据除了体积(Volume)和速度(velocity...

    foochane
  • 年前干货 | 数据工程师必备的学习资源(附链接)

    导读:本文首先详细介绍了数据工程的职责、与数据科学家之间的差别以及其不同的工作角色,然后重点列出了很多与核心技能相关的的优秀学习资源,最后介绍行业内认可度较高的...

    Python数据科学
  • Spark vs Dask Python生态下的计算引擎

    对于 Python 环境下开发的数据科学团队,Dask 为分布式分析指出了非常明确的道路,但是事实上大家都选择了 Spark 来达成相同的目的。Dask 是一个...

    Ewdager
  • 腾讯云助力高校AI人才培养,多重产品福利赋能教学实践

    摘要:腾讯云AI专家为你讲解如何使用TI-ONE机器学习平台赋能课堂教学,实现自我提升。文末可预约观看直播,并有大量COS云资源放送。 为了推动中国人工智能行...

    腾讯高校合作
  • 49个Python 学习必备资源

    小编最近也是忙头晕啦,给大家整理了一些python学习的资源,希望能给大家的自学贡献微薄之力;本文为不同阶段的Python学习者从不同角度量身定制了49个学习资...

    用户6133654
  • 收藏 | 49个Python学习资源

    How to Run Your Python Scripts – Real Python

    Python数据科学
  • 与你共享从菜鸟到大佬的49个Python学习资源!

    How to Run Your Python Scripts – Real Python

    1480
  • 49个Python学习资源:从初学者到高级玩家都有了

    3. Basic Data Types in Python – Real Python

    华章科技
  • 49 个免费 Python 学习资源,适合不同阶段!

    How to Run Your Python Scripts – Real Python

    昱良
  • 收藏 | 49 个 Python 学习资源

    How to Run Your Python Scripts – Real Python

    小小詹同学

扫码关注云+社区

领取腾讯云代金券