前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark RDD的Transformation

Spark RDD的Transformation

作者头像
天策
发布2018-06-22 14:36:41
3780
发布2018-06-22 14:36:41
举报
文章被收录于专栏:行者悟空

RDD的Transformation是指由一个RDD生成新RDD的过程,比如前面使用的flatMapmapfilter操作都返回一个新的RDD对象,类型是MapPartitionsRDD,它是RDD的子类。

所有的RDD Transformation都只是生成了RDD之间的计算关系以及计算方法,并没有进行真正的计算。下面还是以WordCount为例进行介绍:

代码语言:javascript
复制
val textFile = sc.textFile("README.md")
val words = textFile.flatMap(line => line.split(" "))
val wordPairs = words.map(word => (word, 1))
val wordCounts = wordPairs.reduceByKey((a, b) => a + b)
wordCounts.collect()

每一个操作都会生成一个新的RDD对象(其类型为RDD子类),它们按照依赖关系串在一起,像一个链表(其实是DAG的简化形式),每个对象有一个指向父节点的指针,以及如何从父节点通过计算生成新对象的信息。下图显示了WordCount计算过程中的RDD Transformation生成的RDD对象的依赖关系。

Spark RDD的Transformation
Spark RDD的Transformation

          RDD Transformation生成的RDD对象的依赖关系

除了RDD创建过程会生成新的RDD外,RDD Transformation也会生成新的RDD,并且设置与前一个RDD的依赖关系。结合每一个RDD的数据和它们之间的依赖关系,每个RDD都可以按依赖链追溯它的祖先,这些依赖链接就是RDD重建的基础。因此,理解了RDD依赖,也就理解了RDD的重建容错机制。

下面以map为例进行介绍。实际上,这就是生成了一个新的RDD对象,其类型是MapPartitionsRDD(它是RDD的子类):

代码语言:javascript
复制
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

MapPartitionsRDD的定义如下:

代码语言:javascript
复制
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
    extends RDD[U](prev) {
    override val partitioner = if (preservesPartitioning)
        firstParent[T].partitioner else None
    override def getPartitions: Array[Partition] = firstParent[T].partitions
    override def compute(split: Partition, context: TaskContext): Iterator[U] =
        f(context, split.index, firstParent[T].iterator(split, context))
}

可以看到,MapPartitionsRDD最主要的工作是用变量f保存传入的计算函数,以便compute调用它来进行计算。其他4个重要属性基本保持不变:分区和优先计算位置没有重新定义,保持不变,依赖关系默认依赖调用的RDD,分区器优先使用上一级RDD的分区器,否则为None

在Spark中,RDD是有依赖关系的,这种依赖关系有两种类型。

  • 窄依赖。依赖上级RDD的部分分区。
  • Shuffle依赖。依赖上级RDD的所有分区。

对应类的关系如下图所示。

对应类的关系

之所以这么区分依赖关系,是因为它们之间有本质的区别。使用窄依赖时,可以精确知道依赖的上级RDD的分区。一般情况下,会选择与自己在同一节点的上级RDD分区,这样计算过程都在同一节点进行,没有网络IO开销,非常高效,常见的mapflatMapfilter操作都是这一类。而Shuffle依赖则不同,无法精确定位依赖的上级RDD的分区,相当于依赖所有分区(想想reduceByKey计算,需要对所有的key重新排列)。计算时涉及所有节点之间的数据传输,开销巨大。所以,以Shuffle依赖为分隔,Task被分成Stage,方便计算时的管理。

RDD仔细维护着这种依赖关系和计算方法,使得通过重新计算来恢复RDD成为可能。当然,这也不是万能的。如果依赖链条太长,那么通过计算来恢复的代价就太大了。所以,Spark又提供了一种叫检查点的机制。对于依赖链条太长的计算,对中间结果存一份快照,这样就不需要从头开始计算了。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017年02月14日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档