前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core源码精读计划19 | RDD的依赖与分区逻辑

Spark Core源码精读计划19 | RDD的依赖与分区逻辑

作者头像
大数据真好玩
发布2019-08-19 11:32:59
6280
发布2019-08-19 11:32:59
举报

目录

  • 前言
  • RDD依赖
    • Dependency抽象类及子类
    • 窄依赖
    • 宽依赖
  • RDD分区器
    • Partitioner抽象类与伴生对象
    • HashPartitioner
  • 总结

前言

按照计划,本文来讲解RDD的依赖与分区器。这两者不仅与之后调度系统的细节(DAG、Shuffle等)息息相关,而且也是面试Spark系大数据研发工程师时经常被问到的基础问题(反正我是会问的),因此看官也可以将本文当做一篇面试知识点解析来看。

RDD依赖

Dependency抽象类及子类

在Spark Core中,RDD依赖关系的基类就是Dependency抽象类。它的定义只有一句话。

代码#19.1 - o.a.s.Dependency抽象类

@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

该类中只定义了一个方法rdd(),它用于取得当前RDD依赖的父RDD。Dependency及其子类的类图如下所示。

图#19.1 - Dependency继承体系

我们已经知道,RDD依赖分为窄依赖和宽依赖(Shuffle依赖)两种,下面分别来看。

窄依赖

所谓窄依赖,是指父RDD的每个分区都仅被子RDD的一个分区所依赖,也就是说子RDD的一个分区固定对应一个父RDD的单个分区。窄依赖在代码中的基类是NarrowDependency抽象类。

代码#19.2 - o.a.s.NarrowDependency抽象类

@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

可见,NarrowDependency类带有一个构造方法参数_rdd,并重写rdd()方法让其返回之,它就是当前RDD依赖的父RDD。另外,它还定义了一个抽象方法getParents(),用来返回partitionId对应分区依赖的所有父RDD的分区ID。该方法由NarrowDependency的子类实现,分别为OneToOneDependency(一对一依赖)和RangeDependency(范围依赖),它们的代码都比较简单。

代码#19.3 - o.a.s.OneToOneDependency与RangeDependency类

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

可见,它们返回的都是只有一个元素的List,且OneToOneDependency的父子RDD分区ID严格相同,常见的map()或filter()等算子都会产生OneToOneDependency。而在RangeDependency中,子RDD中ID为partitionId的分区与父RDD中ID为(partitionId - outStart + inStart)的分区一一对应,其中inStart为父RDD分区ID的起始值,outStart为子RDD分区ID的起始值,例如union()算子就会产生这种依赖。

上面讲的两种情况都是一一对应关系。当子RDD分区对应多个父RDD的分区(如join()算子)时,也可以形成窄依赖。其前提是父子RDD的分区规则完全相同,即子RDD的某个分区p对应父RDD 1的分区p,也对应父RDD 2的分区p。如果分区规则不同,就会变成宽依赖。

下面的图来自网络,原始出处已不可考,比较形象地说明了窄依赖。

图#19.2 - 窄依赖的三种情况

宽依赖

严格来讲,它的名字应该叫“Shuffle依赖”,因为在Spark代码中,它的类名是ShuffleDependency。不过在中文圈子里,“宽依赖”这个名字也同样通用。它就是指子RDD的一个分区会对应一个父RDD的多个分区,并且往往是全部分区。ShuffleDependency类的代码如下。

代码#19.4 - o.a.s.ShuffleDependency类

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

由于Shuffle是调度系统的重要组成部分,因此我们这里只限于读ShuffleDependency的源码,做一些基本介绍,大头留到后面去说。

ShuffleDependency类有3个泛型参数,K代表键类型,V代表值类型,而C则代表Combiner的类型。如果看官有Hadoop MapReduce的基础,对Combiner这个词自然不会陌生。由于Shuffle过程对键值型数据才有意义,因此ShuffleDependency对父RDD的泛型类型有限制,必须是Product2[K,V]或者其子类,Product2在Scala中代表两个元素的笛卡尔积。

其他构造方法参数说明如下:

  • partitioner:分区器,下面马上就会讲到。
  • serializer:闭包序列化器,SparkEnv中已经创建,为JavaSerializer。
  • keyOrdering:可选的对键类型K排序的排序规则。
  • aggregator:可选的Map端数据聚合逻辑。
  • mapSideCombine:指定是否启用Map数据预聚合。

随着宽依赖的创建,还会调用SparkContext.newShuffleId()方法分配一个新的Shuffle ID,以及调用ShuffleManager.registerShuffle方法注册该Shuffle,返回Shuffle句柄(ShuffleHandle)。虽然在很久之前的几篇文章中讲过Shuffle相关的话题,但是在真正讲到调度系统时,还是会继续深挖的。

下面的图形象地说明了宽依赖。

图#19.3 - 宽依赖的两种情况

RDD分区器

Partitioner抽象类与伴生对象

在上一篇文章讲RDD时,Partitioner就已经出现了,并且它在上一节的ShuffleDependency代码中也是作为构造参数出现。在Shuffle过程中,必须得有确定的计算逻辑来决定父RDD的分区数据如何分配并对应到子RDD的分区中,这就是分区器Partitioner的职责。

Partitioner抽象类的定义也很简单。

代码#19.5 - o.a.s.Partitioner抽象类

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

numPartitions()方法返回分区总数,而getPartitions()方法根据键返回其将被映射到的分区ID。

Partitioner还带有一个伴生对象,其中定义了defaultPartitioner()方法,顾名思义,它(根据上游的一个或一些RDD)返回默认的分区逻辑,其代码如下。

代码#19.6 - o.a.s.Partitioner.defaultPartitioner()/isEligiblePartitioner()方法

  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val rdds = (Seq(rdd) ++ others)
    val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
    val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
      Some(hasPartitioner.maxBy(_.partitions.length))
    } else {
      None
    }

    val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
      rdd.context.defaultParallelism
    } else {
      rdds.map(_.partitions.length).max
    }

    if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
        defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
      hasMaxPartitioner.get.partitioner.get
    } else {
      new HashPartitioner(defaultNumPartitions)
    }
  }

  private def isEligiblePartitioner(
     hasMaxPartitioner: RDD[_],
     rdds: Seq[RDD[_]]): Boolean = {
    val maxPartitions = rdds.map(_.partitions.length).max
    log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
  }

该方法会从输入的所有RDD中取出那些定义了分区逻辑的RDD,然后找到其中分区数最大的那个Partitioner。如果SparkConf中定义了缺省并行度配置项,即spark.default.parallelism,那么默认分区器的分区数就会采用该参数的值,否则就直接用所有RDD中最大的分区数(这就是为什么几乎总是推荐在提交Spark作业时设定spark.default.parallelism)。

然后,调用isElegiblePartitioner()方法,判断分区数最大的那个Partitioner是否“合格”,判断逻辑是其分区数与所有上游RDD中最大分区数之差小于一个数量级。如果通过检查,并且默认分区数比它小,就采用分区数最大的那个Partitioner作为分区逻辑,否则用默认分区数构造一个新的HashPartitioner并返回。

Partitioner在Spark Core中的实现类主要有两个:基于散列的HashPartitioner和基于采样范围的RangePartitioner,前者是默认实现。下面我们就以HashPartitioner为例来看看其具体实现(RangePartitioner太麻烦了)。

HashPartitioner

代码#19.7 - o.a.s.HashPartitioner类

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

可见,在重写的getPartition()方法中,会取得键的hashCode值,对分区数numPartitions取模,返回其绝对值,这样就确保映射到的分区落在[0,numPartitions - 1]的区间内。为了判断两个HashPartitioner是否相等,也必须同时重写其equals()和hashCode()方法,判断标准自然就只有分区数了。

我们也可以很容易地想到,用户通过自己继承Partitioner类,能够自定义分区逻辑。下面就是一个简单的示例,它通过Key的长度来分区。由于它不属于Spark源码,就不编号了。

class KeyLengthPartitioner(partitions: Int) extends Partitioner {
  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = {
    return key.asInstanceOf[String].length() & (partitions - 1)
  }
}

总结

本文分别以Spark Core中的Dependency与Partitioner两个抽象类为起点,比较详细地讲解了Spark中RDD依赖关系与分区逻辑的具体设计。依赖与分区是RDD五要素中最重要的两个点,在今后的源码阅读过程中,会经常用到它们。

— THE END —

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-08-14,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据真好玩 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • RDD依赖
    • Dependency抽象类及子类
      • 窄依赖
        • 宽依赖
        • RDD分区器
          • Partitioner抽象类与伴生对象
            • HashPartitioner
            • 总结
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档