4.3 RDD操作

4.3 RDD操作

RDD提供了一个抽象的分布式数据架构,我们不必担心底层数据的分布式特性,而应用逻辑可以表达为一系列转换处理。

通常应用逻辑是以一系列转换(Transformation)和执行(Action)来表达的,前者在RDD之间指定处理的相互依赖关系,后者指定输出的形式。

其中:

□转换:是指该操作从已经存在的数据集上创建一个新的数据集,是数据集的逻辑操作,并没有真正计算。

□执行:是指该方法提交一个与前一个Action之间的所有Transformation组成的Job进行计算,Spark会根据Action将作业切分成多个Job。

比如,Map操作传递数据集中的每一个元素经过一个函数,形成一个新的RDD转换结果,而Reduce操作通过一些函数对RDD的所有元素进行操作,并返回最终结果给Driver程序。

在默认情况下,Spark所有的转换操作都是惰性(Lazy)的,每个被转换得到的RDD不会立即计算出结果,只是记下该转换操作应用的一些基础数据集,可以有多个转换结果。转换只有在遇到一个Action时才会执行,如图4-2所示。

[插图]

图4-2 Spark转换和执行

这种设计使得Spark以更高的效率运行。例如,可以通过将要在Reduce操作中使用的Map转换来创建一个数据集,并且只返回Reduce的结果给驱动程序,而不是整个Map所得的数据集。

每当一个Job计算完成,其内部的所有RDD都会被清除,如果在下一个Job中有用到其他Job中的RDD,会引发该RDD的再次计算,为避免这种情况,我们可以使用Persist(默认是Cache)方法“持久化”一个RDD到内存中。在这种情况下,Spark将会在集群中保留这个RDD,以便其他Job可以更快地访问,另外,Spark也支持持久化RDD到磁盘中,或者复制RDD到各个节点。

下面,通过几行简单的程序,进一步说明RDD的基础知识。

          val lines=sc.textFile("data.txt")

val lineLengths=lines.map(s=>s.length)

val totalLength=lineLengths.reduce((a,b)=>a+b)

第一行读取外部文件data.txt返回一个基础的MappedRDD,该MappedRDD并不加载到内存中或被执行操作,lines只是记录转换操作结果的指针。

第二行定义了lineLengths作为一个Map转换的结果,由于惰性机制的存在,lineLengths的值不会立即计算。

最后,运行Reduce,该操作为一个Action。Spark将计算打散成多个任务以便在不同的机器上分别运行,每台机器并行运行Map,并将结果进行Reduce操作,返回结果值Driver程序。

如果需要继续使用lineLengths,可以添加缓存Persist或Cache,该持久化会在执行Reduce之前,第一次计算成功之后,将lineLengths保存在内存中。

4.3.1 转换操作

转换操作是RDD的核心之一,通过转换操作实现不同的RDD结果,作为下一次RDD计算的数据输入,转换操作不会触发Job的提交,仅仅是标记对RDD的操作,形成DAG图,以供Action触发Job提交后调用。

常用的转换操作包括:基础转换操作和键-值转换操作。

1.基础转换操作

表4-2列出了目前支持的基础转换操作,具体内容请参见RDD的API官方文档,以获得更多的细节。

表4-2 基础转换操作

[插图]

(续)

[插图]

2.键-值转换操作

尽管大多数Spark操作都基于包含各种类型对象的RDD,但是一小部分特殊的却只能在键-值对形式的RDD上执行。其中,最普遍的就是分布式“洗牌”(shuffle)操作,比如通过键进行分组或聚合元素。

例如,使用reduceByKey操作对文件中每行出现的文字次数进行计数,各种语言的示例如下。

在Scala中,只要在程序中导入org.apache.spark.SparkContext,就能使用Spark的隐式转换,这些操作就可用于包含二元组对象的RDD(Scala中的内建元组,可通过(a,b)创建),键-值对操作可用PairRDDFunction类,如果导入了转换,该类将自动封装元组RDD。

            val lines = sc.textFile("data.txt")

val pairs = lines.map(s => (s, 1))

val counts = pairs.reduceByKey((a, b) => a + b)

基于counts,可以使用counts.sortByKey()按字母表顺序对这些键-值对排序,然后使用counts.collect(),以对象数组的形式向Driver返回结果。

顺便说一句,进行分组的groupByKey不进行本地合并,而进行聚合的reduceByKey会在本地对每个分区的数据合并后再做Shuffle,效率比groupByKey高得多。下面通过几行基于Scala的代码对键-值转换操作进行说明。

// 初始化List

scala>val data = List(("a",1),("b",1),("c",1),("a",2),("b",2),("c",2))

data: List[(String, Int)] = List((a,1), (b,1), (c,1), (a,2), (b,2), (c,2))

// 并行数组创建RDD

scala>val rdd =sc.parallelize(data)

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0]

// 按照key进行reduceByKey操作

scala>val rbk = rdd.reduceByKey(_+_).collect

rbk: Array[(String, Int)] = Array((a,3), (b,3), (c,3))

// 按照key进行groupByKey操作

scala>val gbk = rdd.groupByKey().collect

gbk:  Array[(String,  Iterable[Int])]  =  Array((a,CompactBuffer(1,  2)),  (b,

CompactBuffer(1, 2)), (c,CompactBuffer(1, 2)))

// 按照key进行sortByKey操作

scala>val sbk = rdd.sortByKey().collect

sbk: Array[(String, Int)] = Array((a,1), (a,2), (b,1), (b,2), (c,1), (c,2))

表4-3列出了常用的健-值转换。

表4-3 常用的键-值转换

[插图]

4.3.2 执行操作

Spark将提交的Action与前一个Action之间的所有Transformation组成的Job进行计算,并根据Action将作业切分成多个Job,指定Transformation的输出结果。

1.常用执行操作

这里以加载Spark自带的本地文件README.md文件进行测试,返回一个MappedRDD文件,进行Filter转换操作和Count执行。

          // 读取README.md数据,并转化为RDD

scala>val data = sc.textFile("file:///$SPARK_HOME/README.md")

data: org.apache.spark.rdd.RDD[String] = file:///$SPARK_HOME/README.md MappedRDD[1]

// 执行f ilter操作,提取带有"Spark"的子集

scala>val datafilter = data.filter(line =>line.contains("Spark"))

datafilter: org.apache.spark.rdd.RDD[String] = FilteredRDD[2]

// 执行Action操作,输出结果

scala>val datacount = datafilter.count()

datacount: Long = 21

如果想了解更多,请参考表4-4中列出的常用的执行操作。

表4-4 常用的执行操作

[插图]

通过常用执行操作,Spark可以实现大部分MapReduce流式计算的任务,提升了计算效率,对Transformation操作进行结果值输出。

2.存储执行操作

常用存储操作主要包含的执行如表4-5所示。

表4-5 常用存储操作包含的执行

[插图]

存储执行操作将结果进行保存,以文本、序列化文件、对象文件的方式输出到存储设备进行持久化。

4.3.3 控制操作

控制操作主要包括故障恢复、数据持久性,以及移除数据。其中,缓存操作Cache/Pesist是惰性的,在进行执行操作时才会执行,而Unpesist是即时的,会立即释放内存。checkpoint会直接将RDD持久化到磁盘或HDFS等路径,不同于Cache/Persist的是,被checkpoint的RDD不会因作业的结束而被消除,会一直存在,并可以被后续的作业直接读取并加载。

1. RDD故障恢复

在一个典型的分布式系统中,容错机制主要是采取检查点(checkpoint)机制和数据备份机制。故障恢复是由主动检查,以及不同机器之间的数据复制实现的。由于进行故障恢复需要跨集群网络来复制大量数据,这无疑是相当昂贵的。因此,在Spark中则采取了不同的方法进行故障恢复。

作为一个大型的分布式集群,Spark针对工作负载会做出两种假设:

□处理时间是有限的;

□保持数据持久性是外部数据源的职责,主要是让处理过程中的数据保持稳定。

基于假设,Spark在执行期间发生数据丢失时会选择折中方案,它会重新执行之前的步骤来恢复丢失的数据,但并不是说丢弃之前所有已经完成的工作,而重新开始再来一遍。

假如其中一个RDD坏掉,RDD中有记录之前的依赖关系,且依赖关系中记录算子和分区。此时,仅仅需要再执行一遍父RDD的相应分区。

但是,跨宽依赖的再执行能够涉及多个父RDD,从而引发全部的再执行。为了规避这一点,Spark会保持Map阶段中间数据输出的持久,在机器发生故障的情况下,再执行只需要回溯Mapper持续输出的相应分区,来获取中间数据。

Spark还提供了数据检查点和记录日志,用于持久化中间RDD,这样再执行就不必追溯到最开始的阶段。通过比较恢复延迟和检查点开销进行权衡,Spark会自动化地选择相应的策略进行故障恢复。

2. RDD持久化

Spark的持久化,是指在不同转换操作之间,将过程数据缓存在内存中,实现快速重用,或者故障快速恢复。持久化主要分为两类,主动持久化和自动持久化。

主动持久化,主要目标是RDD重用,从而实现快速处理,是Spark构建迭代算法的关键。例如,持久化一个RDD,每一个节点都将把它的计算分块结果保存在内存中,并在该数据集(或者衍生数据集)进行的后续Action中重用,使得后续Action执行变得更加迅速(通常快10倍)。

可以使用persist()方法标记一个持久化的RDD,一旦被一个执行(action)触发计算,它将会被保留在计算节点的内存中并重用。如果RDD的任一分区丢失,通过使用原先创建的转换操作,它将会被自动重算,不需要全部重算,而只计算丢失的部分。

此外,每一个RDD都可以用不同的保存级别进行保存,从而允许持久化数据集在硬盘或内存作为序列化的Java对象(节省空间),甚至跨节点复制。

持久化的等级选择,是通过将一个StorageLevel对象传递给persist()方法进行确定的,cache()方法调用persist()的默认级别MEMORY_ONLY。表4-6是持久化的等级。

表4-6 持久化的等级

[插图]

相对于MEMORY_ONLY_SER,OFF_HEAP减小了垃圾回收的开销,同时也允许Executor变得更小且可共享内存储备,Executor的崩溃不会导致内存中的缓存丢失。在这种模式下,Tachyon中的内存是不可丢弃的。

自动持久化,是指不需要用户调用persist(),Spark自动地保存一些Shuffle操作(如reduceByKey)的中间结果。这样做是为了避免在Shuffle过程中一个节点崩溃时重新计算所有的输入。

持久化时,一旦设置了就不能改变,想要改变就要先去持久化。推荐用户在重用RDD结果时调用Persist,这样会使持久化变得可控。

Persist持久化RDD,修改了RDD的meta info中的StorageLevel。而检查点在持久化的同时切断Lineage,修改了RDD的meta info中的Lineage。二者均返回经过修改的RDD对象自身,而非新的RDD对象,也均属于Lazy操作。

3. 选择存储等级

Spark的不同存储级别,旨在满足内存使用和CPU效率权衡上的不同需求,建议通过以下步骤进行选择:

□如果你的RDD可以很好地与默认的存储级别(MEMORY_ONLY)契合,那么就不需要做任何修改。这已经是CPU使用效率最高的选项,它使RDD的操作尽可能快。

□如果不能与MEMORY_ONLY很好地契合,建议使用MEMORY_ONLY_SER并选择一个快速序列化的库,使对象在有较高空间使用率的情况下,依然可以较快地被访问。

□尽可能不要存储数据到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度与从硬盘中读取的效率差不多。

□如果想拥有快速故障恢复能力,可使用复制存储级别(例如,用Spark来响应Web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续地运行任务,而不需要等待丢失的分区被重新计算。

□如果想要定义自己的存储级别(如复制因子为3而不是2),可以使用StorageLevel单例对象的apply()方法。

4. 移除数据

RDD可以随意在RAM中进行缓存,因此它提供了更快速的数据访问。目前,缓存的粒度为RDD级别,只能缓存全部的RDD。

Spark自动监视每个节点上使用的缓存,在集群中没有足够的内存时,Spark会根据缓存情况确定一个LRU(Least Recently Used,最近最少使用算法)的数据分区进行删除。

如果想手动删除RDD,而不想等待它从缓存中消失,可以使用RDD的unpersist()方法移除数据,unpersist()方法是立即生效的。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏码匠的流水账

聊聊jdbc的batch操作

statement的batch操作,可以批量进行insert或update操作,提升操作性能,特别是在大数据量的insert或update的时候。

1072
来自专栏Java 源码分析

SparkStreaming 入门

2418
来自专栏芋道源码1024

分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业执行

Lite调度作业( LiteJob ),作业被调度后,调用 #execute() 执行作业。

6102
来自专栏大数据和云计算技术

SparkStreaming入门

黄文辉同学第二篇,请大家支持! 1.SparkStreaming简介 Spark Streaming属于核心Spark API的扩展,支持实时数据流的可扩展、高...

3594
来自专栏Albert陈凯

Spark详解06容错机制Cache 和 Checkpoint Cache 和 Checkpoint

Cache 和 Checkpoint 作为区别于 Hadoop 的一个重要 feature,cache 机制保证了需要访问重复数据的应用(如迭代型算法和交互式应...

48112
来自专栏Albert陈凯

Spark系列课程-00xxSpark RDD持久化

我们这节课讲一下RDD的持久化 ? RDD的持久化 这段代码我们上午已经看过了,有瑕疵大家看出来了吗? 有什么瑕疵啊? 大家是否还记得我在第二节课的时候跟大...

4088
来自专栏Albert陈凯

2018-11-07 Spark应用程序开发参数调优深入剖析-Spark商业调优实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归作者(秦凯新)所有...

1124
来自专栏行者悟空

Spark RDD的Action

1516
来自专栏岑玉海

Spark Streaming编程指南

Overview Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 它可以接受来自Kafka, Flume,...

7055
来自专栏牛肉圆粉不加葱

Spark Shuffle 模块② - Hash Based Shuffle write

Spark 最开始只有 Hash Based Shuffle,因为在很多场景中并不需要排序,在这些场景中多余的排序反而会损耗性能。

891

扫码关注云+社区

领取腾讯云代金券