RDD原理与基本操作 | Spark,从入门到精通

欢迎阅读美图数据技术团队的「Spark,从入门到精通」系列文章,本系列文章将由浅入深为大家介绍 Spark,从框架入门到底层架构的实现,相信总有一种姿势适合你,欢迎大家持续关注:)

往期直通车:Hello Spark!

Spark on Yarn

/ 什么是 RDD? /

传统的 MapReduce 虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是在迭代计算式的时候,要进行大量的磁盘 IO 操作,而 RDD 正是解决这一缺点的抽象方法。RDD(Resilient Distributed Datasets)即弹性分布式数据集,从名字说起:

弹性

当计算过程中内存不足时可刷写到磁盘等外存上,可与外存做灵活的数据交换;

RDD 使用了一种“血统”的容错机制,在结构更新和丢失后可随时根据血统进行数据模型的重建;

分布式

就是可以分布在多台机器上进行并行计算;

数据集

一组只读的、可分区的分布式数据集合,集合内包含了多个分区。分区依照特定规则将具有相同属性的数据记录放在一起,每个分区相当于一个数据集片段。

RDD 内部结构

图 1

图 1 所示是 RDD 的内部结构图,它是一个只读、有属性的数据集。它的属性用来描述当前数据集的状态,数据集由数据的分区(partition)组成,并由(block)映射成真实数据。RDD 的主要属性可以分为 3 类:与其他 RDD 的关系(parents、dependencies);数据(partitioner、checkpoint、storage level、iterator 等);RDD 自身属性(sparkcontext、sparkconf),接下来我们根据属性分类来深入介绍各个组件。

RDD 自身属性

从自身属性说起,SparkContext 是 Spark job 的入口,由 Driver 创建在 client 端,包括集群连接、RDD ID、累加器、广播变量等信息。SparkConf 是参数配置信息,包括:

  • Spark api,控制大部分的应用程序参数;
  • 环境变量,配置IP地址、端口等信息;
  • 日志配置,通过 log4j.properties 配置。

数据

RDD 内部的数据集合在逻辑上和物理上被划分成多个小子集合,这样的每一个子集合我们将其称为分区(Partitions),分区的个数会决定并行计算的粒度,而每一个分区数值的计算都是在一个单独的任务中进行的,因此并行任务的个数也是由 RDD分区的个数决定的。但事实上 RDD 只是数据集的抽象,分区内部并不会存储具体的数据。Partition 类内包含一个 index 成员,表示该分区在 RDD 内的编号,通过 RDD 编号+分区编号可以确定该分区对应的唯一块编号,再利用底层数据存储层提供的接口就能从存储介质(如:HDFS、Memory)中提取出分区对应的数据。

RDD 的分区方式主要包含两种:Hash Partitioner 和 Range Partitioner,这两种分区类型都是针对 Key-Value 类型的数据,如是非 Key-Value 类型则分区函数为 None。Hash 是以 Key 作为分区条件的散列分布,分区数据不连续,极端情况也可能散列到少数几个分区上导致数据不均等;Range 按 Key 的排序平衡分布,分区内数据连续,大小也相对均等。

Preferred Location 是一个列表,用于存储每个 Partition 的优先位置。对于每个 HDFS 文件来说,这个列表保存的是每个 Partition 所在的块的位置,也就是该文件的「划分点」。

Storage Level 是 RDD 持久化的存储级别,RDD 持久化可以调用两种方法:cache 和 persist:persist 方法可以自由的设置存储级别,默认是持久化到内存;cache 方法是将 RDD 持久化到内存,cache 的内部实际上是调用了persist 方法,由于没有开放存储级别的参数设置,所以是直接持久化到内存。

图 2

如图 2 所示是 Storage Level 各级别分布,那么如何选择一种最合适的持久化策略呢?默认情况下,性能最高的当然是 MEMORY_ONLY,但前提是你的内存必须足够大到可以绰绰有余地存放下整个 RDD 的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果 RDD 中数据比较多时(比如几十亿),直接用这种持久化级别,会导致 JVM 的 OOM 内存溢出异常。

如果使用 MEMORY_ONLY 级别时发生了内存溢出,那么建议尝试使用 MEMORY_ONLY_SER 级别。该级别会将 RDD 数据序列化后再保存在内存中,此时每个 partition 仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比 MEMORY_ONLY 多出来的性能开销主要就是序列化与反序列化的开销,但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。但可能发生 OOM 内存溢出的异常。

如果纯内存的级别都无法使用,那么建议使用 MEMORY_AND_DISK_SER 策略,而不是 MEMORY_AND_DISK 策略。因为既然到了这一步,就说明 RDD 的数据量很大,内存无法完全放下,序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。

通常不建议使用 DISK_ONLY 和后缀为_2 的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销。

Checkpoint 是 Spark 提供的一种缓存机制,当需要计算依赖链非常长又想避免重新计算之前的 RDD 时,可以对 RDD 做 Checkpoint 处理,检查 RDD 是否被物化或计算,并将结果持久化到磁盘或 HDFS 内。Checkpoint 会把当前 RDD 保存到一个目录,要触发 action 操作的时候它才会执行。在 Checkpoint 应该先做持久化(persist 或者 cache)操作,否则就要重新计算一遍。若某个 RDD 成功执行 checkpoint,它前面的所有依赖链会被销毁。

与 Spark 提供的另一种缓存机制 cache 相比:cache 缓存数据由 executor 管理,若 executor 消失,它的数据将被清除,RDD 需要重新计算;而 checkpoint 将数据保存到磁盘或 HDFS 内,job 可以从 checkpoint 点继续计算。Spark 提供了 rdd.persist(StorageLevel.DISK_ONLY) 这样的方法,相当于 cache 到磁盘上,这样可以使 RDD 第一次被计算得到时就存储到磁盘上,它们之间的区别在于:persist 虽然可以将 RDD 的 partition 持久化到磁盘,但一旦作业执行结束,被 cache 到磁盘上的 RDD 会被清空;而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的。

Compute 函数实现方式就是向上递归「获取父 RDD 分区数据进行计算」,直到遇到检查点 RDD 获取有缓存的 RDD。

Iterator 用来查找当前 RDD Partition 与父 RDD 中 Partition 的血缘关系,并通过 Storage Level 确定迭代位置,直到确定真实数据的位置。它的实现流程如下:

  • 若标记了有缓存,则取缓存,取不到则进行 computeOrReadCheckpoint(计算或读检查点)。完了再存入缓存,以备后续使用。
  • 若未标记有缓存,则直接进行 computeOrReadCheckpoint。
  • computeOrReadCheckpoint 这个过程也做两个判断:有做过 checkpoint 和没有做过 checkpoint,做过 checkpoint 则可以读取到检查点数据返回,没做过则调该 RDD 的实现类的 compute 函数计算。

血统关系

一个作业从开始到结束的计算过程中产生了多个 RDD,RDD 之间是彼此相互依赖的,我们把这种父子依赖的关系称之为「血统」。

RDD 只支持粗颗粒变换,即只记录单个块(分区)上执行的单个操作,然后创建某个 RDD 的变换序列(血统 lineage)存储下来。

*变换序列指每个 RDD 都包含了它是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。

因此 RDD 的容错机制又称「血统」容错。 要实现这种「血统」容错机制,最大的难题就是如何表达父 RDD 和子 RDD 之间的依赖关系。

图 3

如图 3 所示,父 RDD 的每个分区最多只能被子 RDD 的一个分区使用,称为窄依赖(narrow dependency);若父 RDD 的每个分区可以被子 RDD 的多个分区使用,称为宽依赖(wide dependency)。简单来讲,窄依赖就是父子RDD分区间「一对一」的关系,而宽依赖就是「一对多」关系。从失败恢复来看,窄依赖的失败恢复起来更高效,因为它只需找到父 RDD 的一个对应分区即可,而且可以在不同节点上并行计算做恢复;宽依赖牵涉到父 RDD 的多个分区,需要得到所有依赖的父 RDD 分区的 shuffle 结果,恢复起来相对复杂些。

图 4

根据 RDD 之间的宽窄依赖关系引申出 Stage 的概念,Stage 是由一组 RDD 组成的执行计划。如果 RDD 的衍生关系都是窄依赖,则可放在同一个 Stage 中运行,若 RDD 的依赖关系为宽依赖,则要划分到不同的 Stage。这样 Spark 在执行作业时,会按照 Stage 的划分, 生成一个最优、完整的执行计划。

/ RDD 的创建方式与分区机制 /

RDD 的创建方式

RDD 的创建方式有四种:

1.使用程序中的集合创建 RDD,RDD 的数据源是程序中的集合,通过 parallelize 或者 makeRDD 将集合转化为 RDD;

*例

val num = Array(1,2,3,4,5)

val rdd = sc.parallelize(num)

2.使用本地文件或 HDFS 创建 RDD,RDD 的数据源是本地文件系统或 HDFS 的数据,使用 textFile 方法创建RDD。

*例

val rdd = sc.textFile(“hdfs://master:9000/rec/data”)

3.使用数据流创建 RDD,使用 Spark Streaming 的相关类,接收实时的输入数据流创建 RDD(数据流来源可以是 kafka、flume 等)。

*例

val ssc = new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream(“localhost”, 9999)

val words = lines.flatMap(_.split(“ ”))

4.使用其他方式创建 RDD,从其他数据库上创建 RDD,例如 Hbase、MySQL 等。

*例

val sqlContext = new SQLContext(sc)

val url = "jdbc:mysql://ip:port/xxxx"

val prop = new Properties()

val df = sqlContext.read.jdbc(url, “play_time”, prop)

RDD 的分区机制

RDD 的分区机制有两个关键点:一个是关键参数,即 Spark 的默认并发数 spark.default.parallelism;另一个是关键原则,RDD 分区尽可能使得分区的个数等于集群核心数目。

当配置文件 spark-default.conf 中显式配置了 spark.default.parallelism,那么 spark.default.parallelism=配置的值,否则按照如下规则进行取值:

1.本地模式(不会启动 executor,由 SparkSubmit 进程生成指定数量的线程数来并发)

spark-shell spark.default.parallelism = 1 spark-shell --master local[N] spark.default.parallelism = N (使用 N 个核) spark-shell --master local spark.default.parallelism = 1

2.伪集群模式(x 为本机上启动的 executor 数,y 为每个 executor 使用的 core 数,z 为每个 executor 使用的内存)

spark-shell --master local-cluster[x,y,z] spark.default.parallelism = x * y

3.Yarn、standalone 等模式

spark.default.parallelism = max(所有 executor 使用的 core 总数,2)

4.Mesos

spark.default.parallelism = 8

spark.context 会生成两个参数,由 spark.default.parallelism 推导出这两个参数的值:

sc.defaultParallelism     = spark.default.parallelism

sc.defaultMinPartitions  = min(spark.default.parallelism, 2)

当 sc.defaultParallelism 和 sc.defaultMinPartitions 确认后,就可以推算 RDD 的分区数了。

  • 以 parallelize 方法为例
val rdd = sc.parallelize(1 to 10)

如果使用 parallelize 方法时没指定分区数, RDD 的分区数 = sc.defaultParallelism

  • 以 textFile 方法为例
val rdd = sc.textFile(“path/file”)

分区机制分两种情况:

1.从本地文件生成的 RDD,如果没有指定分区数,则默认分区数规则为

rdd 的分区数 = max(本地 file 的分片数, sc.defaultMinPartitions)

2.从 HDFS 生成的 RDD,如果没有指定分区数,则默认分区数规则为:

rdd 的分区数 = max(hdfs 文件的 block 数目, sc.defaultMinPartitions)

/ RDD 的常用操作 /

RDD 支持两种类型的操作:转换(Transformation)和动作(Action),转换操作是从已经存在的数据集中创建一个新的数据集,而动作操作是在数据集上进行计算后返回结果到 Driver,既触发 SparkContext 提交 Job 作业。转换操作都具有 Lazy 特性,即 Spark 不会立刻进行实际的计算,只会记录执行的轨迹,只有触发行动操作的时候,它才会根据 DAG 图真正执行。

转换与动作具体包含的操作种类如下图所示:

图 5:转换操作

图 6:动作操作

最后我们通过一段代码来看看它具体的操作:

这段代码是用来计算某个视频被男性或女性用户的播放次数,其中 rdd_attr 用来记录用户性别,rdd_src 是用户对某个视频进行播放的记录,这两个 RDD 会进行一个 join 操作,比如这是某个男性用户对某个视频进行了播放,进行 map 操作之后得到视频 id 和性别作为 key,根据这个 key 做 reduceByKey 的操作,最终得到一个视频被男性/女性用户总共播放了多少次的 RDD,然后使用 combineByKey 合并同一个视频 id 的多个结果,最后保存到 HDFS 上。

附:参考文章

《Spark之深入理解RDD结构》 https://blog.csdn.net/u011094454/article/details/78992293 《RDD的数据结构模型》 https://www.jianshu.com/p/dd7c7243e7f9?from=singlemessage 《Spark RDD详解》 https://blog.csdn.net/wangxiaotongfan/article/details/51395769 《Spark RDD的默认分区数:(spark 2.1.0)》 https://www.jianshu.com/p/4b7d07e754fa 《Spark性能优化指南——基础篇》 https://tech.meituan.com/spark_tuning_basic.html

原文发布于微信公众号 - 美图数据技术团队(gh_feb1d206d92b)

原文发表时间:2018-08-15

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏肖力涛的专栏

Spark踩坑记:Spark Streaming+kafka应用及调优

本文首先对spark streaming嵌入kafka的方式进行归纳总结,之后简单阐述Spark streaming+kafka 在舆情项目中的应用,最后将自己...

5.3K3
来自专栏about云

让你真正明白spark streaming

spark streaming介绍 Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等...

3517
来自专栏祝威廉

Spark 2.0 Structured Streaming 分析

Spark 2.0 将流式计算也统一到DataFrame里去了,提出了Structured Streaming的概念,将数据源映射为一张无线长度的表,同时将流式...

1383
来自专栏about云

spark入门基础知识常见问答整理

一. Spark基础知识 1.Spark是什么? UCBerkeley AMPlab所开源的类HadoopMapReduce的通用的并行计算框架 dfsSpa...

36210
来自专栏简单聊聊Spark

Spark内核分析之DAGScheduler划分算法实现原理讲解(重要)

        接着上一篇,我们接着来分析下一个非常重要的组建DAGScheduler的运行原理是怎么实现的;通过之前对Spark的分析讲解,我们的Spark作...

1492
来自专栏个人分享

Shuffle相关分析

 Shuffle描述是一个过程,表现出的是多对多的依赖关系。Shuffle是连接map阶段和Reduce阶段的纽带,每个Reduce Task都会从Map Ta...

1044
来自专栏岑玉海

Spark Streaming编程指南

Overview Spark Streaming属于Spark的核心api,它支持高吞吐量、支持容错的实时流数据处理。 它可以接受来自Kafka, Flume,...

7025
来自专栏芋道源码1024

分布式作业系统 Elastic-Job-Lite 源码分析 —— 作业执行

Lite调度作业( LiteJob ),作业被调度后,调用 #execute() 执行作业。

6052
来自专栏大数据-Hadoop、Spark

Spark Streaming + Kafka整合

2615
来自专栏大数据学习笔记

Spark2.x学习笔记:12、Shuffle机制

12、Shuffle机制 12.1 背景 在MapReduce计算框架中,shuffle是连接Map和Reduce之间的桥梁。 Map的输出要用到Reduce中...

3067

扫码关注云+社区

领取腾讯云代金券