前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark核心数据结构RDD的定义

Spark核心数据结构RDD的定义

作者头像
天策
发布2018-06-22 14:34:19
1.5K0
发布2018-06-22 14:34:19
举报
文章被收录于专栏:行者悟空行者悟空

摘 要

RDD是Spark最重要的抽象,掌握了RDD,可以说就掌握了Spark计算的精髓。它不但对理解现有Spark程序大有帮助,也能提升Spark程序的编写能力。

RDD是Spark最重要的抽象,掌握了RDD,可以说就掌握了Spark计算的精髓。它不但对理解现有Spark程序大有帮助,也能提升Spark程序的编写能力。

什么是RDD

RDD的全称是“弹性分布式数据集”(Resilient Distributed Dataset)。首先,它是一个数据集,就像Scala语言中的ArrayListTupleSetMap也是数据集合一样,但从操作上看RDD最像ArrayList,里面的数据都是平铺的,可以顺序遍历。而且ArrayList对象拥有的许多操作RDD对象也有,比如flatMapmapfilterreducegroupBy等。

其次,RDD是分布存储的。里面的成员被水平切割成小的数据块,分散在集群的多个节点上,便于对RDD里面的数据进行并行计算。

最后,RDD的分布是弹性的,不是固定不变的。RDD的一些操作可以被拆分成对各数据块直接计算,不涉及其他节点,比如map。这样的操作一般在数据块所在的节点上直接进行,不影响RDD的分布,除非某个节点故障需要转换到其他节点上。但是在有些操作中,只访问部分数据块是无法完成的,必须访问RDD的所有数据块。比如groupBy,在做groupBy之前完全不知道每个key的分布,必须遍历RDD的所有数据块,将具有相同key的元素汇聚在一起,这样RDD的分布就完全重组,而且数量也可能发生变化。此外,RDD的弹性还表现在高可靠性上。

RDD特点
  • RDD是只读的,一旦生成,内容就不能修改了。这样的好处是让整个系统的设计相对简单,比如并行计算时不用考虑数据互斥的问题。
  • RDD可指定缓存在内存中。一般计算都是流水式生成、使用RDD,新的RDD生成之后,旧的不再使用,并被Java虚拟机回收掉。但如果后续有多个计算依赖某个RDD,我们可以让这个RDD缓存在内存中,避免重复计算。这个特性在机器学习等需要反复迭代的计算场景下对性能的提升尤其明显。
  • RDD可以通过重新计算得到。RDD的高可靠性不是通过复制来实现的,而是通过记录足够的计算过程,在需要时(比如因为节点故障导致内容失效)重新从头或从某个镜像重新计算来恢复的。
RDD核心属性

一个RDD对象,包含如下5个核心属性。

  • 一个分区列表,每个分区里是RDD的部分数据(或称数据块)。
  • 一个依赖列表,存储依赖的其他RDD。
  • 一个名为compute的计算函数,用于计算RDD各分区的值。
  • 分区器(可选),用于键/值类型的RDD,比如某个RDD是按散列来分区。
  • 计算各分区时优先的位置列表(可选),比如从HDFS上的文件生成RDD时,RDD分区的位置优先选择数据所在的节点,这样可以避免数据移动带来的开销。

下面我们直接来看看这5个属性的具体代码定义。

分区与依赖:

代码语言:javascript
复制
// 依赖关系定义在一个Seq数据集中,类型是Dependency
// 有检查点时,这些信息会被重写,指向检查点
private var dependencies_ : Seq[Dependency[_]] = null
// 分区定义在Array数据中,类型是Partition,没用Seq,这主要考虑到随时需要通过下标来访问或更新
// 分区内容,而dependencies_使用Seq是因为它的使用场景一般是取第一个成员或遍历
@transient private var partitions_ : Array[Partition] = null

计算函数:

代码语言:javascript
复制
/**
 * compute方法由子类来实现,对输入的RDD分区进行计算
 */
def compute(split: Partition, context: TaskContext): Iterator[T]

分区器:

代码语言:javascript
复制
/** 可选,子类可以重写以指定新的分区方式。Spark支持两种分区方式:Hash和Range*/
@transient val partitioner: Option[Partitioner] = None

优先计算位置:

代码语言:javascript
复制
/**
 * 可选,子类可以指定分区优先的位置,比如HadoopRDD会重写此方法,让分区尽可能与数据在相同的节点上
 */
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
/**
 * RDD提供统一的调用方法,统一处理检查点问题
 */
final def preferredLocations(split: Partition): Seq[String] = {
    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
        getPreferredLocations(split)
    }
}

而且,Spark从第一个开源版本0.3-scala-2.8开始,到目前最新的1.4.1,RDD一直使用这5个核心属性,没有增加,也没减少。可以说,这就是Spark计算的基因。

Spark调度和计算都基于这5个属性,各种RDD都有自己实现的计算,用户也可以方便地实现自己的RDD,比如从一个新的存储系统中读取数据。

下图显示了RDD及其常见子类的继承关系。

           RDD及其常见子类的继承关系

每个Transformation操作都会生成一个新的RDD,不同操作也可能返回相同类型的RDD,只是计算方法等参数不同。比如,mapflatMapfilter这3个操作都会生成MapPartitionsRDD类型的RDD:

代码语言:javascript
复制
/**
 * Transformation:map
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
/**
 *  Transformation:flatMap
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
/**
 * Transformation:filter
 */
def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
        this,
        (context, pid, iter) => iter.filter(cleanF),
        preservesPartitioning = true)
}
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017年02月13日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是RDD
  • RDD特点
  • RDD核心属性
相关产品与服务
GPU 云服务器
GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档