导读
继续前期依次推文PySpark入门和SQL DataFrame简介的基础上,今日对Spark中最重要的一个概念——RDD进行介绍。虽然在Spark中,基于RDD的其他4大组件更为常用,但作为Spark core中的核心数据抽象,RDD是必须深刻理解的基础概念。
01 何为RDD
RDD(Resilient Distributed Dataset),弹性分布式数据集,是Spark core中的核心数据抽象,其他4大组件都或多或少依赖于RDD。简单理解,RDD就是一种特殊的数据结构,是为了适应大数据分布式计算的特殊场景(此时传统的数据集合无法满足分布式、容错性等需求)而设计的一种数据形式,其三个核心关键词是:
RDD在Spark中占据"core"的地位
02 RDD为何快于MapReduce
看一个人,可以看看他的对手;了解一个产品,也可以看看他的竞品。Spark是为了解决Hadoop中 MapReduce计算框架效率低下而产生的大数据计算引擎,所以Spark起初的竞争对手就是MapReduce。
MapReduce之所以计算效率低,主要原因在于每次计算都涉及从硬盘的数据读写问题,而Spark设计之初就考虑尽可能避免硬盘读写,所以Spark的第一大特点是数据优先存储于内存中(除非内存存储不够才放到硬盘中)。同时,为了尽可能优化RDD在内存中的计算流程,Spark还引入了lazy特性。lazy特性其实质就是直至"真正碰上事了"才计算,否则就一直"推托下去",颇有不见兔子不撒鹰的味道。
这实际上又涉及到了RDD的两类算子:transformation和action,前者只是建立逻辑转换流程,后者才真正落地执行。transformation的结果是从一个RDD转换到另一个RDD,而action则是从一个RDD转换到一个非RDD,因此从执行结果是否仍然是RDD也可推断出该操作是transformation抑或action。进一步地,在transformation过程中,Spark内部调度RDD的计算过程是一个有向无环图(Directed Acyclic Graph,DAG ),意味着所有RDD的转换都带有方向性(一个产生另一个,即血缘关系),且不存在循环依赖的,这对Spark的容错性带来了有效保证:当一个环节出现问题时仅需按照方向关系追溯到相应的父RDD即可,而无需从头开始全流程计算。
Spark中关于宽窄依赖的经典图例(图片选自网络)
上图给出了宽窄依赖的一个图例。实际上,这里的宽窄依赖是针对RDD的每个partition而言的,分析子RDD的每个partition来源就容易理解其依赖为宽或窄:
也正因如此,对于整个DAG而言,依据依赖类型可将Spark执行过程划分为多个阶段,同一阶段内部Spark还会进行相应的调度和优化。可以说,内存计算+DAG两大特性共同保证了Spark执行的高效性。
03 RDD创建
RDD的创建主要有3类形式:
from pyspark import SparkContext # SparkContext是spark core的入口
sc = SparkContext() # sc是一个单例
rdd1 = sc.parallelize(['Tom', 'John', 'Joy']) # 从本地已有Python集合创建
rdd2 = sc.textFile('test.txt') # 从本地文件序列化一个RDD
rdd3 = rdd1.map(lambda x:(x, 1)) # 从一个RDD转换为另一个RDD
需要指出的是,RDD作为分布式的数据集合,其本身是不可变对象(immutable),所以所有的transformation算子都是从一个RDD转换生成了一个新的RDD,这也印证了DAG中无环的概念。
至于说转换过程中仍然可以使用相同的变量名,这是由Python的特性所决定的,类似于字符串是不可变数据类型,但也可以由一个字符串生成另一个同名字符串一样。
04 三类算子
Spark中的算子,其实就是一类操作,或者更具体说是一个函数!
前面提到,Spark在执行过程中,依据从一个RDD是生成另一个RDD还是其他数据类型,可将操作分为两类:transformation和action。这实际上也是最为常用的RDD操作,甚至说Spark core编程模式就是先经历一系列的transformation,然后在action提取相应的结果。
然而,在系列transformation过程中,由于其lazy特性,当且仅当遇到action操作时才真正从头至尾的完整执行,所以就不得不面对一个问题:假如有RDD6是由前面系列的RDD1-5转换生成,而RDD6既是RDD7的父RDD,也是RDD8的父RDD,所以在独立执行RDD7和RDD8时,实际上会将RDD1=>RDD6的转换操作执行两遍,存在资源和效率上的浪费。当存在2遍计算重复或许尚可接受,但若存在更多重复转换时,这种模式或许不是一个明智之举,为此Spark还为RDD设计了第三类算子:持久化操作persistence。
至此,RDD的三类常用算子介绍如下:
1. transformation算子
另外,针对以上函数还有一些功能相近的函数,不再列出。
2. action算子
action算子Spark中真正执行的操作,当一个算子的执行结果不再是RDD时,那么它就是一个action算子,此时Spark意识到不能再简单的进行逻辑运算标记,而需要实质性的执行计算。常用的action算子包括如下:
3. persistence算子
持久化的目的是为了短期内将某一RDD存储于内存或硬盘中,使其可复用。主要操作有两类:
另外,还有checkpoint也属于持久化操作。对于一个已经持久化的对象,当无需继续使用时,可使用unpersist完成取消持久化。
需知,持久化操作是为了便于多次重复调用同一RDD时,防止发生重复计算而设计的操作,但其本身仍然是偏lazy的模式,即执行了persist或者cache操作后,仅仅是将其标记为需要持久化,而直至第一次遇到action触发其执行时才会真正的完成持久化。
最后,举一个Spark中hello world级别的WordCount例子,实战一下各类算子的应用:
texts = ['this is spark', 'this is RDD']
rdd = sc.parallelize(texts) # 从已有集合创建RDD对象
# rdd = ['this is spark', 'this is RDD']
rdd1 = rdd.flatMap(lambda x:x.split(' ')) # flatMap将原来的句子用空格分割,并展平至单个词
# rdd1 = ['this', 'is', 'spark', 'this', 'is', 'RDD']
rdd2 = rdd1.map(lambda x:(x, 1)) # 将每个单词映射为(单词,1)的(key value)对象格式
# rdd2 = [('this', 1), ('is', 1), ('spark', 1), ('this', 1), ('is', 1), ('RDD', 1)]
rdd3 = rdd2.reduceByKey(lambda a, b:a+b) # 依据单词相同进行聚合
# rdd3 = [('spark', 1), ('RDD', 1), ('this', 2), ('is', 2)]
rdd3.collect() # 遇到action算子,将上述rdd=>rdd1=>rdd2=>rdd3有向无环图真正执行,并返回列表