Spark核心数据结构RDD的定义

摘 要

RDD是Spark最重要的抽象,掌握了RDD,可以说就掌握了Spark计算的精髓。它不但对理解现有Spark程序大有帮助,也能提升Spark程序的编写能力。

RDD是Spark最重要的抽象,掌握了RDD,可以说就掌握了Spark计算的精髓。它不但对理解现有Spark程序大有帮助,也能提升Spark程序的编写能力。

什么是RDD

RDD的全称是“弹性分布式数据集”(Resilient Distributed Dataset)。首先,它是一个数据集,就像Scala语言中的ArrayListTupleSetMap也是数据集合一样,但从操作上看RDD最像ArrayList,里面的数据都是平铺的,可以顺序遍历。而且ArrayList对象拥有的许多操作RDD对象也有,比如flatMapmapfilterreducegroupBy等。

其次,RDD是分布存储的。里面的成员被水平切割成小的数据块,分散在集群的多个节点上,便于对RDD里面的数据进行并行计算。

最后,RDD的分布是弹性的,不是固定不变的。RDD的一些操作可以被拆分成对各数据块直接计算,不涉及其他节点,比如map。这样的操作一般在数据块所在的节点上直接进行,不影响RDD的分布,除非某个节点故障需要转换到其他节点上。但是在有些操作中,只访问部分数据块是无法完成的,必须访问RDD的所有数据块。比如groupBy,在做groupBy之前完全不知道每个key的分布,必须遍历RDD的所有数据块,将具有相同key的元素汇聚在一起,这样RDD的分布就完全重组,而且数量也可能发生变化。此外,RDD的弹性还表现在高可靠性上。

RDD特点

  • RDD是只读的,一旦生成,内容就不能修改了。这样的好处是让整个系统的设计相对简单,比如并行计算时不用考虑数据互斥的问题。
  • RDD可指定缓存在内存中。一般计算都是流水式生成、使用RDD,新的RDD生成之后,旧的不再使用,并被Java虚拟机回收掉。但如果后续有多个计算依赖某个RDD,我们可以让这个RDD缓存在内存中,避免重复计算。这个特性在机器学习等需要反复迭代的计算场景下对性能的提升尤其明显。
  • RDD可以通过重新计算得到。RDD的高可靠性不是通过复制来实现的,而是通过记录足够的计算过程,在需要时(比如因为节点故障导致内容失效)重新从头或从某个镜像重新计算来恢复的。

RDD核心属性

一个RDD对象,包含如下5个核心属性。

  • 一个分区列表,每个分区里是RDD的部分数据(或称数据块)。
  • 一个依赖列表,存储依赖的其他RDD。
  • 一个名为compute的计算函数,用于计算RDD各分区的值。
  • 分区器(可选),用于键/值类型的RDD,比如某个RDD是按散列来分区。
  • 计算各分区时优先的位置列表(可选),比如从HDFS上的文件生成RDD时,RDD分区的位置优先选择数据所在的节点,这样可以避免数据移动带来的开销。

下面我们直接来看看这5个属性的具体代码定义。

分区与依赖:

// 依赖关系定义在一个Seq数据集中,类型是Dependency
// 有检查点时,这些信息会被重写,指向检查点
private var dependencies_ : Seq[Dependency[_]] = null
// 分区定义在Array数据中,类型是Partition,没用Seq,这主要考虑到随时需要通过下标来访问或更新
// 分区内容,而dependencies_使用Seq是因为它的使用场景一般是取第一个成员或遍历
@transient private var partitions_ : Array[Partition] = null

计算函数:

/**
 * compute方法由子类来实现,对输入的RDD分区进行计算
 */
def compute(split: Partition, context: TaskContext): Iterator[T]

分区器:

/** 可选,子类可以重写以指定新的分区方式。Spark支持两种分区方式:Hash和Range*/
@transient val partitioner: Option[Partitioner] = None

优先计算位置:

/**
 * 可选,子类可以指定分区优先的位置,比如HadoopRDD会重写此方法,让分区尽可能与数据在相同的节点上
 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/**
 * RDD提供统一的调用方法,统一处理检查点问题
 */
final def preferredLocations(split: Partition): Seq[String] = {
    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
        getPreferredLocations(split)
    }
}

而且,Spark从第一个开源版本0.3-scala-2.8开始,到目前最新的1.4.1,RDD一直使用这5个核心属性,没有增加,也没减少。可以说,这就是Spark计算的基因。

Spark调度和计算都基于这5个属性,各种RDD都有自己实现的计算,用户也可以方便地实现自己的RDD,比如从一个新的存储系统中读取数据。

下图显示了RDD及其常见子类的继承关系。

           RDD及其常见子类的继承关系

每个Transformation操作都会生成一个新的RDD,不同操作也可能返回相同类型的RDD,只是计算方法等参数不同。比如,mapflatMapfilter这3个操作都会生成MapPartitionsRDD类型的RDD:

/**
 * Transformation:map
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
/**
 *  Transformation:flatMap
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
/**
 * Transformation:filter
 */
def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
        this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
}

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏小小挖掘机

来学习几个简单的Hive函数啦

咳咳,今天来介绍一下几个Hive函数吧,先放一张我登哥划水的照片,希望大家也做一只自由的鱼儿,在知识的海洋里游呀游,嘻嘻! ? 今天我们来介绍几个Hive常用的...

3524
来自专栏岑玉海

Spark作业调度

  Spark在standalone模式下,默认是使用FIFO的模式,我们可以使用spark.cores.max 来设置它的最大核心数,使用spark.exec...

3817
来自专栏Python攻城狮

MySQL中char、varchar和text的区别

它们的存储方式和数据的检索方式都不一样。 数据的检索效率是:char > varchar > text 空间占用方面,就要具体情况具体分析了。

664
来自专栏用户画像

7.7.1外部排序

内部排序都是在内存中进行的,而在实际应用中,经常需要对大文件进行排序,因为文件中的记录很多、信号量庞大,无法将整个文件拷贝进内存中进行排序。因此,需要将待排序的...

551
来自专栏Android 研究

Java虚拟机基础——1Java的内存模型

最近和几个之前一起做安卓的朋友喝酒,他最近在研究JVM,我们就简单的讨论了起来,他比我研究的深很多,我也不甘堕落,自己也开始研究了一下,写了4篇文章整理了一下自...

442
来自专栏Jack的Android之旅

疯狂Java笔记之Java的内存与回收

对于JVM的垃圾回收机制来说,是否回收一个对象的标准在于:是否还有引用变量引用改对象?只要有引用变量引用对象,垃圾回收机制就不会回收它。

664
来自专栏Java成长之路

三、JVM之对象的创建

上篇博文中已经介绍过了jvm内存的概况,接下来我们从jvm的角度来重新来认识一下Java对象是如何创建。 Java是一门面向对象的语言,在Java程序运行的...

652
来自专栏从零开始学 Web 前端

C/C++练习题(二)

2、查找字符串中第一个只出现一次的字符并输出。(如:aabbcddefg 则输出 'c')

824
来自专栏Java成长之路

volatile变量详解

关键字volatile可以说是Java虚拟机提供的最轻量级的同步机制,但是它并不容易完全被正确、 完整地理解,以至于许多程序员都习惯不去使用它,遇到需要处理多线...

732
来自专栏测试开发架构之路

C语言之预处理命令与用typedef命名已有类型

 预处理命令 主要是改进程序设计环境,以提高编程效率,不属于c语言本身的组成部分,不能直接对它们进行编译,必须在对 程序编译之前,先对程序中的这些特殊命令进行...

3969

扫码关注云+社区