摘 要
RDD是Spark最重要的抽象,掌握了RDD,可以说就掌握了Spark计算的精髓。它不但对理解现有Spark程序大有帮助,也能提升Spark程序的编写能力。
RDD是Spark最重要的抽象,掌握了RDD,可以说就掌握了Spark计算的精髓。它不但对理解现有Spark程序大有帮助,也能提升Spark程序的编写能力。
RDD的全称是“弹性分布式数据集”(Resilient Distributed Dataset)。首先,它是一个数据集,就像Scala语言中的Array
、List
、Tuple
、Set
、Map
也是数据集合一样,但从操作上看RDD最像Array
和List
,里面的数据都是平铺的,可以顺序遍历。而且Array
、List
对象拥有的许多操作RDD对象也有,比如flatMap
、map
、filter
、reduce
、groupBy
等。
其次,RDD是分布存储的。里面的成员被水平切割成小的数据块,分散在集群的多个节点上,便于对RDD里面的数据进行并行计算。
最后,RDD的分布是弹性的,不是固定不变的。RDD的一些操作可以被拆分成对各数据块直接计算,不涉及其他节点,比如map
。这样的操作一般在数据块所在的节点上直接进行,不影响RDD的分布,除非某个节点故障需要转换到其他节点上。但是在有些操作中,只访问部分数据块是无法完成的,必须访问RDD的所有数据块。比如groupBy
,在做groupBy
之前完全不知道每个key
的分布,必须遍历RDD的所有数据块,将具有相同key
的元素汇聚在一起,这样RDD的分布就完全重组,而且数量也可能发生变化。此外,RDD的弹性还表现在高可靠性上。
一个RDD对象,包含如下5个核心属性。
compute
的计算函数,用于计算RDD各分区的值。下面我们直接来看看这5个属性的具体代码定义。
分区与依赖:
// 依赖关系定义在一个Seq数据集中,类型是Dependency
// 有检查点时,这些信息会被重写,指向检查点
private var dependencies_ : Seq[Dependency[_]] = null
// 分区定义在Array数据中,类型是Partition,没用Seq,这主要考虑到随时需要通过下标来访问或更新
// 分区内容,而dependencies_使用Seq是因为它的使用场景一般是取第一个成员或遍历
@transient private var partitions_ : Array[Partition] = null
计算函数:
/**
* compute方法由子类来实现,对输入的RDD分区进行计算
*/
def compute(split: Partition, context: TaskContext): Iterator[T]
分区器:
/** 可选,子类可以重写以指定新的分区方式。Spark支持两种分区方式:Hash和Range*/
@transient val partitioner: Option[Partitioner] = None
优先计算位置:
/**
* 可选,子类可以指定分区优先的位置,比如HadoopRDD会重写此方法,让分区尽可能与数据在相同的节点上
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/**
* RDD提供统一的调用方法,统一处理检查点问题
*/
final def preferredLocations(split: Partition): Seq[String] = {
checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
getPreferredLocations(split)
}
}
而且,Spark从第一个开源版本0.3-scala-2.8开始,到目前最新的1.4.1,RDD一直使用这5个核心属性,没有增加,也没减少。可以说,这就是Spark计算的基因。
Spark调度和计算都基于这5个属性,各种RDD都有自己实现的计算,用户也可以方便地实现自己的RDD,比如从一个新的存储系统中读取数据。
下图显示了RDD及其常见子类的继承关系。
RDD及其常见子类的继承关系
每个Transformation操作都会生成一个新的RDD,不同操作也可能返回相同类型的RDD,只是计算方法等参数不同。比如,map
、flatMap
、filter
这3个操作都会生成MapPartitionsRDD
类型的RDD:
/**
* Transformation:map
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
/**
* Transformation:flatMap
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
/**
* Transformation:filter
*/
def filter(f: T => Boolean): RDD[T] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}