RDD的Transformation是指由一个RDD生成新RDD的过程,比如前面使用的flatMap
、map
、filter
操作都返回一个新的RDD对象,类型是MapPartitionsRDD
,它是RDD的子类。
所有的RDD Transformation都只是生成了RDD之间的计算关系以及计算方法,并没有进行真正的计算。下面还是以WordCount为例进行介绍:
val textFile = sc.textFile("README.md")
val words = textFile.flatMap(line => line.split(" "))
val wordPairs = words.map(word => (word, 1))
val wordCounts = wordPairs.reduceByKey((a, b) => a + b)
wordCounts.collect()
每一个操作都会生成一个新的RDD对象(其类型为RDD子类),它们按照依赖关系串在一起,像一个链表(其实是DAG的简化形式),每个对象有一个指向父节点的指针,以及如何从父节点通过计算生成新对象的信息。下图显示了WordCount计算过程中的RDD Transformation生成的RDD对象的依赖关系。
RDD Transformation生成的RDD对象的依赖关系
除了RDD创建过程会生成新的RDD外,RDD Transformation也会生成新的RDD,并且设置与前一个RDD的依赖关系。结合每一个RDD的数据和它们之间的依赖关系,每个RDD都可以按依赖链追溯它的祖先,这些依赖链接就是RDD重建的基础。因此,理解了RDD依赖,也就理解了RDD的重建容错机制。
下面以map
为例进行介绍。实际上,这就是生成了一个新的RDD对象,其类型是MapPartitionsRDD
(它是RDD的子类):
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
MapPartitionsRDD
的定义如下:
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
preservesPartitioning: Boolean = false)
extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning)
firstParent[T].partitioner else None
override def getPartitions: Array[Partition] = firstParent[T].partitions
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
}
可以看到,MapPartitionsRDD
最主要的工作是用变量f
保存传入的计算函数,以便compute
调用它来进行计算。其他4个重要属性基本保持不变:分区和优先计算位置没有重新定义,保持不变,依赖关系默认依赖调用的RDD,分区器优先使用上一级RDD的分区器,否则为None
。
在Spark中,RDD是有依赖关系的,这种依赖关系有两种类型。
对应类的关系如下图所示。
对应类的关系
之所以这么区分依赖关系,是因为它们之间有本质的区别。使用窄依赖时,可以精确知道依赖的上级RDD的分区。一般情况下,会选择与自己在同一节点的上级RDD分区,这样计算过程都在同一节点进行,没有网络IO开销,非常高效,常见的map
、flatMap
、filter
操作都是这一类。而Shuffle依赖则不同,无法精确定位依赖的上级RDD的分区,相当于依赖所有分区(想想reduceByKey
计算,需要对所有的key
重新排列)。计算时涉及所有节点之间的数据传输,开销巨大。所以,以Shuffle依赖为分隔,Task被分成Stage,方便计算时的管理。
RDD仔细维护着这种依赖关系和计算方法,使得通过重新计算来恢复RDD成为可能。当然,这也不是万能的。如果依赖链条太长,那么通过计算来恢复的代价就太大了。所以,Spark又提供了一种叫检查点的机制。对于依赖链条太长的计算,对中间结果存一份快照,这样就不需要从头开始计算了。