我们在研究WordCount的时候碰到了很多诸如JavaRDD、Function之类的字眼,其实这些个代码逻辑就是我们以后日日夜夜不断去写去改的那部分程序了,从某种程度上来讲,完成一道spark的作业题,基本也就是去设计我们的rdd和function了。既然话都说到这份上了,建立感情需要从认识开始。在前面的文章已经确认过眼神了,这次当然是正式介绍。
我们在Java程序中定义的那个类型是JavaRDD,实际上是在是对本身的RDD类型的一个封装, 我们想亲密接触RDD,直接翻翻这部分的源码 ,我们看下图一:
图一:Rdd源码头注释
可能也是这部分源码是重中之重,Spark大咖们在写这部分给了特别多的文字。按照正常程序员的套路来说,也是在洋洋洒洒之后撸玩代码之后,来个代码评审什么的(或许有),后面才加上注释的,也是对RDD最为直接的解释。这部分文字在很多文章博客中被引入,翻译或许不完全相同,但是基本都是这些了。我们一起看看这段文字,这些文字在后面使用rdd的时候会不断去体现。当然,我的解释会程序猿一些,本身也是程序猿写给cxy的嘛。
我们先看看头部部分,这部分注释告诉我们RDD是三个单词的缩写,弹性分布式数据集这个名词便是从这里来的。这部分没啥问题,程序员记得是三个单词缩写就完事了。后面部分告诉我们是RDD是spark中的抽象,代表一组不可变的,分区存储的,而且还可以被并行操作计算的集合。
图二:注释头部分
我们关注这部分,rdd是抽象的,所以对应的类型要么是接口,要么是抽象类,然后对应具体很多实现。我们趴开在源码部分可以清楚看到RDD被定义成一个abstract的类型。而且,我们通过继承结构可以看到,RDD的子类就是一堆一堆的,可以知道这部分具体实现就是对应不同数据数据进行的处理,统一作为RDD使用。
图三:RDD的定义
图四:RDD的定义
对于不可变的数据集,这个好说,就是我们操作之后不会改变原来的元素的值。更加直接点就是我们对某个数据集可能有rddA.map(),rddA.filter()等之类的操作,这种操作并不会改变rddA中的数据,而是生成一个新的RDD,我们在之前写WordCount的时候体现了这点,我们不断去定义一个新的RDD去接收生成的数据,如图中的情况,我们完全可以不断去使用lines中的数据,因为在做了算子操作的时候是生成新的元素line中的元素并不会去改变。
图五:RDD可以重复被使用的
接下来的是介绍的存储和运行过程,RDD的存储有点像我们的hdfs中的block一样。所谓的分区存储,其实就是数据被切割分布式在集群中的各个节点上,我们很自然可以知道,各个计算节点在计算部分数据的时候,计算过程自然是并行的。 分布式计算本身依托数据本身是分布式的,各自负责自身那部分,再统一汇集,和我们以前谈到的分布式计算模型是差不多的。
我们接着看下一部分:
图六:下面部分
我把这部分分成了5个信息点,这部分其实是我们设计类型之后的一贯做法,我们把比较通用的封装到一个类里面,有其他公共的操作,又分到一个类里面,图中部分就是来解释这个事情的。第1点,这个类(RDD)封装了针对所有RDD基本操作,我们从源码中可以看出来,图七部分。这意味着我们以后不清楚基本rdd有什么操作的时候,就直接到这里看。
图七:RDD的通用操作
后面部分2、3、4部分,是针对不是全通用的操作,就分开了子类去定义,类似groupbykey,join的操作在PairRDDFunctions中定义;双精度的运算,在DoubleRDDFunctions定义;SequenceFileRDDFunctions则是对序列化的操作的定义。当然,我们可以在任何rdd中看到全部的这些操作。这部分操作我们可以发现看不到在他自身和自己的子类上面去实现,但是确实可以调用,这是因为这些方法是通过隐式转换的方式在其他几个类去实现了,看下源码是最直观的,看图八。当然,这部分虽然是底层的实现机制,但是对于使用者来说就是超级方便,我们并不需要去单独去new某个PairRDDFunctions也可以一路点下去使用这些类中实现的方法。
图八:隐式转换定义
后面这部分是比较精炼的部分,也是很多地方用这部分来解释rdd的,图九
图九:总结性的rdd
这里总结性的说了rdd五大属性,我们来一个个说明:
1.partitions的列表,不是别的,在rdd的实现类里面有一个属性的定义,
private var partitions_ : Array[Partition]
是说明我们要计算要操作的元素是一系列的partition列表,这是spark的计算基础,所有的rdd计算都要把数据读成为一系列的partition,我们以最常见的hdfs文件为例,图十那样,文件在计算之前有个读取过程,理想情况下,每个hdfs的文件块恰好是对应rdd中的一个partition,这一系列的partition组成的统一数据集,便是我们的rdd了。
图十:rdd生成过程
2.针对每一次的数据切割,会有一次计算。我们把图接着画(图十一),假设我们对rdd1进行了一次map操作,那么这个map函数便作用到我们每一个partition中,同时幂等地生成相同数量的partidion,这部分操作返回一个新的rdd2。实际情况我们也好理解,我们在写程序的时候 可以看成是对一条数据进行操作,但是实际对应rdd的转换来说,是partition中的每一条数据都是需要转换的。
图十一:rdd中的function
3.一个依赖其他rdd的依赖列表,这个怎么理解呢。在rdd里面定义了一个seq类型的变量,个人感觉代码里面体现还是比较直接,毕竟本身是程序的注释
private var dependencies_ : Seq[Dependency[_]] = _
这里说了个什么事情呢,实际上和定义的一样,是里面有一组依赖。我们在上图可以看到rdd2实际上是rdd1生成过来的,每个rdd需要记住自己的老爹是谁,老爹的老爹是谁,一直排到祖宗那头。这样有什么作用呢,我们把图画多一些
图十二:rdd的演化过程
我们从图中可以看到,每个partition都顺着自己一条线计算过来,我们在这里可以了解记录依赖的作用了。我们每个rdd通过追溯血缘关系,便可以从祖宗节点中生成自己。有了这部分信息,我们其实可以了解一下spark中的作业运行机制,spark快速计算也是得益于数据存放在内存,也就是说我们的parttion是在内存存储和进行转换的。spark认为内存中的计算是快速的,所以当作业失败的时候,我们只需要从源头rdd再计算一次就可以得到整目标rdd,为了实现这个,我们需要追溯rdd血缘信息,所以每个rdd都保留了依赖的信息。
4.分区器,其实我们要把数据分到不同的partition中,需要有一个分区算法
Partitioner
这部分算法可以自己定义,如果没有定义则使用默认的分区算法,这是一个可选项。
5.同样是可选项,叫做对于每一个切片有一组首选项的计算位置。大白话一点就是数据是分布式的,计算也是分布式的,要计算自然有个读取数据过程,如果数据和计算的节点恰好在一个节点,那么自然也不用网络资源消耗了,计算节点可以有很多个选择,但是有个首选项。这里的例子就是hdfs中block所在的位置,很明显,如果计算的节点恰好是block这部分数据的节点,那就大量减少网络传输了。
最后一段注释其实是说spark调度的时候是基于这些rdd实现的方法去调度的,更具体一点就是spark调度的时候会帮我们划分stage和生成调度Graph,有需要的话也可以自己去实现rdd的。
文章的最后总是需要来点总结性的文字,今天我们算是正式认识了RDD了,我只是觉得很多问题需要用程序的角度去审视,那么认识rdd的情况就变成了和平时写程序一样定义的一个抽象类,然后里面有一些私有成员。Spark上面注释很详细,很值得对揣摩几次的。