前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >4.3 RDD操作

4.3 RDD操作

作者头像
Albert陈凯
发布2018-04-08 10:38:58
8670
发布2018-04-08 10:38:58
举报
文章被收录于专栏:Albert陈凯Albert陈凯

4.3 RDD操作

RDD提供了一个抽象的分布式数据架构,我们不必担心底层数据的分布式特性,而应用逻辑可以表达为一系列转换处理。

通常应用逻辑是以一系列转换(Transformation)和执行(Action)来表达的,前者在RDD之间指定处理的相互依赖关系,后者指定输出的形式。

其中:

□转换:是指该操作从已经存在的数据集上创建一个新的数据集,是数据集的逻辑操作,并没有真正计算。

□执行:是指该方法提交一个与前一个Action之间的所有Transformation组成的Job进行计算,Spark会根据Action将作业切分成多个Job。

比如,Map操作传递数据集中的每一个元素经过一个函数,形成一个新的RDD转换结果,而Reduce操作通过一些函数对RDD的所有元素进行操作,并返回最终结果给Driver程序。

在默认情况下,Spark所有的转换操作都是惰性(Lazy)的,每个被转换得到的RDD不会立即计算出结果,只是记下该转换操作应用的一些基础数据集,可以有多个转换结果。转换只有在遇到一个Action时才会执行,如图4-2所示。

[插图]

图4-2 Spark转换和执行

这种设计使得Spark以更高的效率运行。例如,可以通过将要在Reduce操作中使用的Map转换来创建一个数据集,并且只返回Reduce的结果给驱动程序,而不是整个Map所得的数据集。

每当一个Job计算完成,其内部的所有RDD都会被清除,如果在下一个Job中有用到其他Job中的RDD,会引发该RDD的再次计算,为避免这种情况,我们可以使用Persist(默认是Cache)方法“持久化”一个RDD到内存中。在这种情况下,Spark将会在集群中保留这个RDD,以便其他Job可以更快地访问,另外,Spark也支持持久化RDD到磁盘中,或者复制RDD到各个节点。

下面,通过几行简单的程序,进一步说明RDD的基础知识。

          val lines=sc.textFile("data.txt")

val lineLengths=lines.map(s=>s.length)

val totalLength=lineLengths.reduce((a,b)=>a+b)

第一行读取外部文件data.txt返回一个基础的MappedRDD,该MappedRDD并不加载到内存中或被执行操作,lines只是记录转换操作结果的指针。

第二行定义了lineLengths作为一个Map转换的结果,由于惰性机制的存在,lineLengths的值不会立即计算。

最后,运行Reduce,该操作为一个Action。Spark将计算打散成多个任务以便在不同的机器上分别运行,每台机器并行运行Map,并将结果进行Reduce操作,返回结果值Driver程序。

如果需要继续使用lineLengths,可以添加缓存Persist或Cache,该持久化会在执行Reduce之前,第一次计算成功之后,将lineLengths保存在内存中。

4.3.1 转换操作

转换操作是RDD的核心之一,通过转换操作实现不同的RDD结果,作为下一次RDD计算的数据输入,转换操作不会触发Job的提交,仅仅是标记对RDD的操作,形成DAG图,以供Action触发Job提交后调用。

常用的转换操作包括:基础转换操作和键-值转换操作。

1.基础转换操作

表4-2列出了目前支持的基础转换操作,具体内容请参见RDD的API官方文档,以获得更多的细节。

表4-2 基础转换操作

[插图]

(续)

[插图]

2.键-值转换操作

尽管大多数Spark操作都基于包含各种类型对象的RDD,但是一小部分特殊的却只能在键-值对形式的RDD上执行。其中,最普遍的就是分布式“洗牌”(shuffle)操作,比如通过键进行分组或聚合元素。

例如,使用reduceByKey操作对文件中每行出现的文字次数进行计数,各种语言的示例如下。

在Scala中,只要在程序中导入org.apache.spark.SparkContext,就能使用Spark的隐式转换,这些操作就可用于包含二元组对象的RDD(Scala中的内建元组,可通过(a,b)创建),键-值对操作可用PairRDDFunction类,如果导入了转换,该类将自动封装元组RDD。

            val lines = sc.textFile("data.txt")

val pairs = lines.map(s => (s, 1))

val counts = pairs.reduceByKey((a, b) => a + b)

基于counts,可以使用counts.sortByKey()按字母表顺序对这些键-值对排序,然后使用counts.collect(),以对象数组的形式向Driver返回结果。

顺便说一句,进行分组的groupByKey不进行本地合并,而进行聚合的reduceByKey会在本地对每个分区的数据合并后再做Shuffle,效率比groupByKey高得多。下面通过几行基于Scala的代码对键-值转换操作进行说明。

// 初始化List

scala>val data = List(("a",1),("b",1),("c",1),("a",2),("b",2),("c",2))

data: List[(String, Int)] = List((a,1), (b,1), (c,1), (a,2), (b,2), (c,2))

// 并行数组创建RDD

scala>val rdd =sc.parallelize(data)

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0]

// 按照key进行reduceByKey操作

scala>val rbk = rdd.reduceByKey(_+_).collect

rbk: Array[(String, Int)] = Array((a,3), (b,3), (c,3))

// 按照key进行groupByKey操作

scala>val gbk = rdd.groupByKey().collect

gbk:  Array[(String,  Iterable[Int])]  =  Array((a,CompactBuffer(1,  2)),  (b,

CompactBuffer(1, 2)), (c,CompactBuffer(1, 2)))

// 按照key进行sortByKey操作

scala>val sbk = rdd.sortByKey().collect

sbk: Array[(String, Int)] = Array((a,1), (a,2), (b,1), (b,2), (c,1), (c,2))

表4-3列出了常用的健-值转换。

表4-3 常用的键-值转换

[插图]

4.3.2 执行操作

Spark将提交的Action与前一个Action之间的所有Transformation组成的Job进行计算,并根据Action将作业切分成多个Job,指定Transformation的输出结果。

1.常用执行操作

这里以加载Spark自带的本地文件README.md文件进行测试,返回一个MappedRDD文件,进行Filter转换操作和Count执行。

          // 读取README.md数据,并转化为RDD

scala>val data = sc.textFile("file:///$SPARK_HOME/README.md")

data: org.apache.spark.rdd.RDD[String] = file:///$SPARK_HOME/README.md MappedRDD[1]

// 执行f ilter操作,提取带有"Spark"的子集

scala>val datafilter = data.filter(line =>line.contains("Spark"))

datafilter: org.apache.spark.rdd.RDD[String] = FilteredRDD[2]

// 执行Action操作,输出结果

scala>val datacount = datafilter.count()

datacount: Long = 21

如果想了解更多,请参考表4-4中列出的常用的执行操作。

表4-4 常用的执行操作

[插图]

通过常用执行操作,Spark可以实现大部分MapReduce流式计算的任务,提升了计算效率,对Transformation操作进行结果值输出。

2.存储执行操作

常用存储操作主要包含的执行如表4-5所示。

表4-5 常用存储操作包含的执行

[插图]

存储执行操作将结果进行保存,以文本、序列化文件、对象文件的方式输出到存储设备进行持久化。

4.3.3 控制操作

控制操作主要包括故障恢复、数据持久性,以及移除数据。其中,缓存操作Cache/Pesist是惰性的,在进行执行操作时才会执行,而Unpesist是即时的,会立即释放内存。checkpoint会直接将RDD持久化到磁盘或HDFS等路径,不同于Cache/Persist的是,被checkpoint的RDD不会因作业的结束而被消除,会一直存在,并可以被后续的作业直接读取并加载。

1. RDD故障恢复

在一个典型的分布式系统中,容错机制主要是采取检查点(checkpoint)机制和数据备份机制。故障恢复是由主动检查,以及不同机器之间的数据复制实现的。由于进行故障恢复需要跨集群网络来复制大量数据,这无疑是相当昂贵的。因此,在Spark中则采取了不同的方法进行故障恢复。

作为一个大型的分布式集群,Spark针对工作负载会做出两种假设:

□处理时间是有限的;

□保持数据持久性是外部数据源的职责,主要是让处理过程中的数据保持稳定。

基于假设,Spark在执行期间发生数据丢失时会选择折中方案,它会重新执行之前的步骤来恢复丢失的数据,但并不是说丢弃之前所有已经完成的工作,而重新开始再来一遍。

假如其中一个RDD坏掉,RDD中有记录之前的依赖关系,且依赖关系中记录算子和分区。此时,仅仅需要再执行一遍父RDD的相应分区。

但是,跨宽依赖的再执行能够涉及多个父RDD,从而引发全部的再执行。为了规避这一点,Spark会保持Map阶段中间数据输出的持久,在机器发生故障的情况下,再执行只需要回溯Mapper持续输出的相应分区,来获取中间数据。

Spark还提供了数据检查点和记录日志,用于持久化中间RDD,这样再执行就不必追溯到最开始的阶段。通过比较恢复延迟和检查点开销进行权衡,Spark会自动化地选择相应的策略进行故障恢复。

2. RDD持久化

Spark的持久化,是指在不同转换操作之间,将过程数据缓存在内存中,实现快速重用,或者故障快速恢复。持久化主要分为两类,主动持久化和自动持久化。

主动持久化,主要目标是RDD重用,从而实现快速处理,是Spark构建迭代算法的关键。例如,持久化一个RDD,每一个节点都将把它的计算分块结果保存在内存中,并在该数据集(或者衍生数据集)进行的后续Action中重用,使得后续Action执行变得更加迅速(通常快10倍)。

可以使用persist()方法标记一个持久化的RDD,一旦被一个执行(action)触发计算,它将会被保留在计算节点的内存中并重用。如果RDD的任一分区丢失,通过使用原先创建的转换操作,它将会被自动重算,不需要全部重算,而只计算丢失的部分。

此外,每一个RDD都可以用不同的保存级别进行保存,从而允许持久化数据集在硬盘或内存作为序列化的Java对象(节省空间),甚至跨节点复制。

持久化的等级选择,是通过将一个StorageLevel对象传递给persist()方法进行确定的,cache()方法调用persist()的默认级别MEMORY_ONLY。表4-6是持久化的等级。

表4-6 持久化的等级

[插图]

相对于MEMORY_ONLY_SER,OFF_HEAP减小了垃圾回收的开销,同时也允许Executor变得更小且可共享内存储备,Executor的崩溃不会导致内存中的缓存丢失。在这种模式下,Tachyon中的内存是不可丢弃的。

自动持久化,是指不需要用户调用persist(),Spark自动地保存一些Shuffle操作(如reduceByKey)的中间结果。这样做是为了避免在Shuffle过程中一个节点崩溃时重新计算所有的输入。

持久化时,一旦设置了就不能改变,想要改变就要先去持久化。推荐用户在重用RDD结果时调用Persist,这样会使持久化变得可控。

Persist持久化RDD,修改了RDD的meta info中的StorageLevel。而检查点在持久化的同时切断Lineage,修改了RDD的meta info中的Lineage。二者均返回经过修改的RDD对象自身,而非新的RDD对象,也均属于Lazy操作。

3. 选择存储等级

Spark的不同存储级别,旨在满足内存使用和CPU效率权衡上的不同需求,建议通过以下步骤进行选择:

□如果你的RDD可以很好地与默认的存储级别(MEMORY_ONLY)契合,那么就不需要做任何修改。这已经是CPU使用效率最高的选项,它使RDD的操作尽可能快。

□如果不能与MEMORY_ONLY很好地契合,建议使用MEMORY_ONLY_SER并选择一个快速序列化的库,使对象在有较高空间使用率的情况下,依然可以较快地被访问。

□尽可能不要存储数据到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度与从硬盘中读取的效率差不多。

□如果想拥有快速故障恢复能力,可使用复制存储级别(例如,用Spark来响应Web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续地运行任务,而不需要等待丢失的分区被重新计算。

□如果想要定义自己的存储级别(如复制因子为3而不是2),可以使用StorageLevel单例对象的apply()方法。

4. 移除数据

RDD可以随意在RAM中进行缓存,因此它提供了更快速的数据访问。目前,缓存的粒度为RDD级别,只能缓存全部的RDD。

Spark自动监视每个节点上使用的缓存,在集群中没有足够的内存时,Spark会根据缓存情况确定一个LRU(Least Recently Used,最近最少使用算法)的数据分区进行删除。

如果想手动删除RDD,而不想等待它从缓存中消失,可以使用RDD的unpersist()方法移除数据,unpersist()方法是立即生效的。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.07.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
流计算 Oceanus
流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档