前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >3.0Spark RDD实现详解

3.0Spark RDD实现详解

作者头像
Albert陈凯
发布2018-04-08 10:23:17
8670
发布2018-04-08 10:23:17
举报
文章被收录于专栏:Albert陈凯

Spark技术内幕:深入解析Spark内核架构设计与实现原理

第三章 Spark RDD实现详解

RDD是Spark最基本也是最根本的数据抽象,它具备像MapReduce等数据流模型的容错性,并且允许开发人员在大型集群上执行基于内存的计算。现有的数据流系统对两种应用的处理并不高效:一是迭代式算法,这在图应用和机器学习领域很常见;二是交互式数据挖掘工具。这两种情况下,将数据保存在内存中能够极大地提高性能。为了有效地实现容错,RDD提供了一种高度受限的共享内存,即RDD是只读的,并且只能通过其他RDD上的批量操作来创建。尽管如此,RDD仍然足以表示很多类型的计算,包括MapReduce和专用的迭代编程模型(如Pregel)等。Spark实现的RDD在迭代计算方面比Hadoop快20多倍,同时还可以在5~7秒内交互式地查询1TB数据集。

3.1 概述

Spark的目标是为基于工作集的应用(即多个并行操作重用中间结果的应用)提供抽象,同时保持MapReduce及其相关模型的优势特性,即自动容错、位置感知性调度和可伸缩性。RDD比数据流模型更易于编程,同时基于工作集的计算也具有良好的描述能力。

在这些特性中,最难实现的是容错性。一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。我们面向的是大规模数据分析,数据检查点操作成本很高:需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗更多的存储资源(在内存中复制数据可以减少需要缓存的数据量,而存储到磁盘则会降低应用程序速度)。所以,我们选择记录更新的方式。但是,如果更新太多,记录更新成本也不低。因此,RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列转换记录下来(即Lineage),以便恢复丢失的分区。

虽然只支持粗粒度转换限制了编程模型,但RDD仍然可以很好地适用于很多应用,特别是支持数据并行的批量分析应用,包括数据挖掘、机器学习、图算法等,因为这些程序通常都会在很多记录上执行相同的操作。RDD不太适合那些异步更新共享状态的应用,例如并行Web网络爬虫。因此,Spark的目标是为大多数分析型应用提供有效的编程模型,而其他类型的应用则交给专门的系统。

3.2 什么是RDD

什么是RDD?RDD是只读的、分区记录的集合。RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。这些确定性操作称为转换,如map、filter、groupBy、join。RDD不需要物化。RDD含有如何从其他RDD衍生(即计算)出本RDD的相关信息(即Lineage),因此在RDD部分分区数据丢失的时候可以从物理存储的数据计算出相应的RDD分区。

RDD支持基于工作集的应用,同时具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

每个RDD有5个主要的属性:

1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。图3-1描述了分区存储的计算模型,每个分配的存储是由BlockManager实现的。每个分区都会被逻辑映射成BlockManager的一个Block,而这个Block会被一个Task负责计算。

2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。详情请参阅3.4.5节。

3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

[插图]

图3-1 RDD Partition的存储和计算模型

3.2.1 RDD的创建

可以通过两种方式创建RDD:

1)由一个已经存在的Scala集合创建。

2)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase、Amazon S3等。

RDD创建后,就可以在RDD上进行数据处理。RDD支持两种操作:转换(trans-formation),即从现有的数据集创建一个新的数据集;动作(action),即在数据集上进行计算后,返回一个值给Driver程序。例如,map就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布式数据集表示结果。另一方面,reduce是一种动作,通过一些函数将所有元素叠加起来,并将最终结果返回给Driver(还有一个并行的reduceByKey,能返回一个分布式数据集)。

图3-2描述了从外部数据源创建RDD,经过多次转换,通过一个动作操作将结果写回外部存储系统的逻辑运行图。整个过程的计算都是在Worker中的Executor中运行。

[插图]

图3-2 RDD创建、转换和动作的逻辑计算图

3.2.2 RDD的转换

RDD中的所有转换都是惰性的,也就是说,它们并不会直接计算结果。相反的,它们只是记住这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这个设计让Spark更加有效率地运行。例如我们可以实现:通过map创建的一个新数据集,并在reduce中使用,最终只返回reduce的结果给Driver,而不是整个大的新数据集。图3-3描述了RDD在进行groupByRey时的内部RDD转换的实现逻辑图。图3-4描述了reduceByKey的实现逻辑图。

[插图]

图3-3 RDD groupByKey的逻辑转换图

在groupByKey的操作中,会在MapPartitionsRDD做一次Shuffle,图3-3中设置的分片数量是3,因此ShuffledRDD会有3个分片,ShuffledRDD实际上仅仅是从上游的任务中读取Shuffle的结果,因此图的箭头是指向上游的MapPartitionsRDD的。关于Shuffle的实现实际上要比图中展示得复杂得多,具体的实现细节可以参阅第7章。

reduceByKey和groupByKey的实现差不多,它在Shuffle完成之后,需要做一次reduce。

[插图]

图3-4 RDD reduceByKey的逻辑转换图

默认情况下,每一个转换过的RDD都会在它执行一个动作时被重新计算。不过也可以使用persist(或者cache)方法,在内存中持久化一个RDD。在这种情况下, Spark将会在集群中保存相关元素,下次查询这个RDD时能更快访问它。也支持在磁盘上持久化数据集,或在集群间复制数据集,这些选项将在下一节进行描述。

RDD支持的转换如表3-1所示。

表3-1 RDD支持的转换

[插图]

(续)

[插图]

3.2.3 RDD的动作

RDD支持的动作如表3-2所示。

表3-2 RDD支持的动作

[插图]

3.2.4 RDD的缓存

(续)

[插图]

Spark速度非常快的原因之一,就是在不同操作中在内存中持久化(或缓存)一个数据集。当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其他动作(action)中重用。这使得后续的动作变得更加迅速(通常快10倍)。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

通过persist()或cache()方法可以标记一个要被持久化的RDD,一旦首次被触发,该RDD将会被保留在计算节点的内存中并重用。实际上,cache()是使用persist()的快捷方法,它们的实现如下:

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */

def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */

def cache(): this.type = persist()

图3-5中,假设首先进行了RDD0→RDD1→RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中了。在进行RDD0→RDD1→RDD3的计算作业时,由于RDD1已经缓存在系统中,因此RDD0→RDD1的转换不会重复进行,计算作业只须进行RDD1→RDD3的计算就可以了,因此计算速度可以得到很大提升。

[插图]

图3-5 RDD缓存过的Partition可以加快下一次的计算速度

缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除。RDD的缓存的容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列的转换,丢失的数据会被重算。RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

3.2.5 RDD的检查点

RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。但是,如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算耗时特别多,那么缓存丢失对于整个Job的影响是不容忽视的。为了避免缓存丢失重新计算带来的开销,Spark又引入了检查点(checkpoint)机制。

缓存是在计算结束后,直接将计算结果通过用户定义的存储级别(存储级别定义了缓存存储的介质,现在支持内存、本地文件系统和Tachyon)写入不同的介质。而检查点不同,它是在计算完成后,重新建立一个Job来计算。为了避免重复计算,推荐先将RDD缓存,这样就能保证检查点的操作可以快速完成。

用户可以通过调用org.apache.spark.rdd.RDD#checkpoint()来指定RDD需要检查点机制。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2017.07.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
云数据库 HBase
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档