举例说明Spark RDD的分区、依赖

例子如下:

scala> val textFileRDD = sc.textFile("/Users/zhuweibin/Downloads/hive_04053f79f32b414a9cf5ab0d4a3c9daf.txt")
15/08/03 07:00:08 INFO MemoryStore: ensureFreeSpace(57160) called with curMem=0, maxMem=278019440
15/08/03 07:00:08 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 55.8 KB, free 265.1 MB)
15/08/03 07:00:08 INFO MemoryStore: ensureFreeSpace(17237) called with curMem=57160, maxMem=278019440
15/08/03 07:00:08 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.8 KB, free 265.1 MB)
15/08/03 07:00:08 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:51675 (size: 16.8 KB, free: 265.1 MB)
15/08/03 07:00:08 INFO SparkContext: Created broadcast 0 from textFile at <console>:21
textFileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21

scala>     println( textFileRDD.partitions.size )
15/08/03 07:00:09 INFO FileInputFormat: Total input paths to process : 1
2

scala>     textFileRDD.partitions.foreach { partition =>
     |       println("index:" + partition.index + "  hasCode:" + partition.hashCode())
     |     }
index:0  hasCode:1681
index:1  hasCode:1682

scala>     println("dependency size:" + textFileRDD.dependencies)
dependency size:List(org.apache.spark.OneToOneDependency@543669de)

scala>     println( textFileRDD )
MapPartitionsRDD[1] at textFile at <console>:21

scala>     textFileRDD.dependencies.foreach { dep =>
     |       println("dependency type:" + dep.getClass)
     |       println("dependency RDD:" + dep.rdd)
     |       println("dependency partitions:" + dep.rdd.partitions)
     |       println("dependency partitions size:" + dep.rdd.partitions.length)
     |     }
dependency type:class org.apache.spark.OneToOneDependency
dependency RDD:/Users/zhuweibin/Downloads/hive_04053f79f32b414a9cf5ab0d4a3c9daf.txt HadoopRDD[0] at textFile at <console>:21
dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2

scala> 

scala>     val flatMapRDD = textFileRDD.flatMap(_.split(" "))
flatMapRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:23

scala>     println( flatMapRDD )
MapPartitionsRDD[2] at flatMap at <console>:23

scala>     flatMapRDD.dependencies.foreach { dep =>
     |       println("dependency type:" + dep.getClass)
     |       println("dependency RDD:" + dep.rdd)
     |       println("dependency partitions:" + dep.rdd.partitions)
     |       println("dependency partitions size:" + dep.rdd.partitions.length)
     |     }
dependency type:class org.apache.spark.OneToOneDependency
dependency RDD:MapPartitionsRDD[1] at textFile at <console>:21
dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2

scala> 

scala>     val mapRDD = flatMapRDD.map(word => (word, 1))
mapRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:25

scala>     println( mapRDD )
MapPartitionsRDD[3] at map at <console>:25

scala>     mapRDD.dependencies.foreach { dep =>
     |       println("dependency type:" + dep.getClass)
     |       println("dependency RDD:" + dep.rdd)
     |       println("dependency partitions:" + dep.rdd.partitions)
     |       println("dependency partitions size:" + dep.rdd.partitions.length)
     |     }
dependency type:class org.apache.spark.OneToOneDependency
dependency RDD:MapPartitionsRDD[2] at flatMap at <console>:23
dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2

scala> 

scala> 

scala>     val counts = mapRDD.reduceByKey(_ + _)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:27

scala>     println( counts )
ShuffledRDD[4] at reduceByKey at <console>:27

scala>     counts.dependencies.foreach { dep =>
     |       println("dependency type:" + dep.getClass)
     |       println("dependency RDD:" + dep.rdd)
     |       println("dependency partitions:" + dep.rdd.partitions)
     |       println("dependency partitions size:" + dep.rdd.partitions.length)
     |     }
dependency type:class org.apache.spark.ShuffleDependency
dependency RDD:MapPartitionsRDD[3] at map at <console>:25
dependency partitions:[Lorg.apache.spark.Partition;@c197f46
dependency partitions size:2

scala>

从输出我们可以看出,对于任意一个RDD x来说,其dependencies代表了其直接依赖的RDDs(一个或多个)。那dependencies又是怎么能够表明RDD之间的依赖关系呢?假设dependency为dependencies成员

  • dependency的类型(NarrowDependency或ShuffleDependency)说明了该依赖是窄依赖还是宽依赖
  • 通过dependency的def getParents(partitionId: Int): Seq[Int]方法,可以得到子RDD的每个分区依赖父RDD的哪些分区
  • dependency包含RDD成员,即子RDD依赖的父RDD,该RDD的compute函数说明了对该父RDD的分区进行怎么样的计算能得到子RDD的分区
  • 该父RDD中同样包含dependency成员,该dependency同样包含上述特点,同样可以通过该父RDD的dependency成员来确定该父RDD依赖的爷爷RDD。同样可以通过dependency.getParents方法和爷爷RDD.compute来得出如何从父RDD回朔到爷爷RDD,依次类推,可以回朔到第一个RDD

那么,如果某个RDD的partition计算失败,要回朔到哪个RDD为止呢?上例中打印出的dependency.RDD如下:

MapPartitionsRDD[1] at textFile at <console>:21
MapPartitionsRDD[2] at flatMap at <console>:23
MapPartitionsRDD[3] at map at <console>:25
ShuffledRDD[4] at reduceByKey at <console>:27

可以看出每个RDD都有一个编号,在回朔的过程中,每向上回朔一次变回得到一个或多个相对父RDD,这时系统会判断该RDD是否存在(即被缓存),如果存在则停止回朔,如果不存在则一直向上回朔到某个RDD存在或到最初RDD的数据源为止。


本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏一个会写诗的程序员的博客

Kotlin 问答Kotlin 问答《Kotlin极简教程》正式上架:

可控类型特性,大大减少了 Java 中的 NPE。 代码量大幅度精简。 100% 兼容 Java。 更好的函数式编程支持。

6420
来自专栏华章科技

谁说不能用 Python开发企业应用?

语言多元化是PayPal编程文化中一个重要的组成部分。在C++和Java长期流行的同时,更多的团队选择了JvaScript和Scala。同时,Braintree...

22320
来自专栏一个会写诗的程序员的博客

《Spring Boot 实战:从0到1》第1章 Spring Boot简介第1章 Spring Boot简介小结参考资料

Java Web开发涉及的技术比较繁杂,涉及到很多开发框架和工具(Java, Scala, Kotlin, Clojure,Groovy, Grails,Gr...

13620
来自专栏一个会写诗的程序员的博客

JVM语言生态结构原理图 从Java,Kotlin,Scala,Groovy等语言的编译、执行全过程图示解析

JVM语言生态结构原理图 从Java,Kotlin,Scala,Groovy等语言的编译、执行全过程图示解析

12730
来自专栏华章科技

在twitter搞数据科学是怎样一种体验?

2015年6月17日是我在Twitter工作两周年的纪念日。回想起来,两年间,数据科学在Twitter的应用方式和范围发生了很大变化:

12430
来自专栏一个会写诗的程序员的博客

《Kotlin 极简教程 》第5章 集合类(1)

本章将介绍Kotlin标准库中的集合类,我们将了解到它是如何扩展的Java集合库,使得写代码更加简单容易。如果您熟悉Scala的集合库,您会发现Kotlin跟S...

17820
来自专栏码洞

Spark通信原理之Python与JVM的交互

我们知道Spark平台是用Scala进行开发的,但是使用Spark的时候最流行的语言却不是Java和Scala,而是Python。原因当然是因为Python写代...

15610
来自专栏一个会写诗的程序员的博客

第7章 集合类第7章 集合类

在 Java 类库中有一套相当完整的容器集合类来持有对象。Kotlin没有去重复造轮子(Scala则是自己实现了一套集合类框架),而是在Java 类库的基础上进...

9920
来自专栏一个会写诗的程序员的博客

《Kotin 编程思想·实战》

1.2 程序执行的三种方式 1.2.1 编译执行 1.2.2 解释执行 1.2.3 虚拟机执行

10910
来自专栏数据科学学习手札

(数据科学学习手札46)Scala中的面向对象

  在Scala看来,一切皆是对象,对象是Scala的核心,Scala面向对象涉及到class、object、构造器等,本文就将对class中的重点内容进行介绍...

14750

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励