前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark RDD依赖的深度优先搜索

Spark RDD依赖的深度优先搜索

作者头像
大数据真好玩
发布2019-09-02 15:56:56
7250
发布2019-09-02 15:56:56
举报
文章被收录于专栏:暴走大数据暴走大数据

来源:菜鸟的大数据日记

作者:runzhliu

By 大数据技术与架构

场景描述:最近在刷算法题,看到经典的树搜索的算法,正巧之前记得 Spark RDD 中有一处利用 DFS 来判断 RDD 依赖关系的代码,因此专门拿出来分析一下。

关键词:Spark 深度优先搜索

Overview

最近在刷刷算法题,看到经典的树搜索的算法,正巧之前记得 Spark RDD 中有一处利用 DFS 来判断 RDD 依赖关系的代码,因此专门拿出来分析一下。

Code

/**
* Return the ancestors of the given RDD that are related to it only through a sequence of
* narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains
* no ordering on the RDDs returned.
*/
private[spark] def getNarrowAncestors: Seq[RDD[_]] = {
    val ancestors = new mutable.HashSet[RDD[_]]

    def visit(rdd: RDD[_]): Unit = {
      val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
      val narrowParents = narrowDependencies.map(_.rdd)
      val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
      narrowParentsNotVisited.foreach { parent =>
        ancestors.add(parent)
        visit(parent)
      }
    }

    visit(this)

    // In case there is a cycle, do not include the root itself
    ancestors.filterNot(_ == this).toSeq
}

分析

代码很清晰,就是用递归的方式写完这个寻找 RDD 的 Narrow 祖先。

val ancestors = new mutable.HashSet[RDD[_]]

ancestors 是一个 Set 数据结构,用来存放已经查找过的 父 RDD。

narrowDependencies, narrowParents, narrowParentsNotVisited 三个变量,按照名字是很容易理解的,分别是找到 RDD 的窄依赖,窄依赖的父依赖以及没有被访问过的窄依赖。

最后这一段,将没有被访问过的父依赖,依次加入 ancetors 表示已经访问过了。

narrowParentsNotVisited.foreach { parent =>
    ancestors.add(parent)
    visit(parent)
}

有心的读者会发现最后一行注释。

In case there is a cycle, do not include the root itself

大意就是如果如果不去除根节点 RDD,那么 narrowParentsNotVisited 是不能被结束的,意思就是相乘了环而导致循环无法结束。

Test Case

// org/apache/spark/rdd/RDDSuite.scala
test("getNarrowAncestors") {
    val rdd1 = sc.parallelize(1 to 100, 4)
    val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
    val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i))
    val rdd4 = rdd3.reduceByKey(_ + _)
    val rdd5 = rdd4.mapValues(_ + 1).mapValues(_ + 2).mapValues(_ + 3)
    val ancestors1 = rdd1.getNarrowAncestors
    val ancestors2 = rdd2.getNarrowAncestors
    val ancestors3 = rdd3.getNarrowAncestors
    val ancestors4 = rdd4.getNarrowAncestors
    val ancestors5 = rdd5.getNarrowAncestors

    // Simple dependency tree with a single branch
    assert(ancestors1.size === 0)
    assert(ancestors2.size === 2)
    assert(ancestors2.count(_ === rdd1) === 1)
    assert(ancestors2.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 1)
    assert(ancestors3.size === 5)
    assert(ancestors3.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 4)

    // Any ancestors before the shuffle are not considered
    assert(ancestors4.size === 0)
    assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 0)
    assert(ancestors5.size === 3)
    assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
    assert(ancestors5.count(_ === rdd3) === 0)
    assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 2)
}

建议可以跑一下 RDDSuite.scala 测试类中的关于 getNarrowAncestors 方法。很显然,针对第二部分的情况,窄依赖只跟踪到 shuffle 之前,也就是一个 RDD 血缘遇到 shuffle 操作,那么窄依赖的依赖链条就会重新计数。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-31,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档