Spark Core源码精读计划19 | RDD的依赖与分区逻辑

目录

  • 前言
  • RDD依赖
    • Dependency抽象类及子类
    • 窄依赖
    • 宽依赖
  • RDD分区器
    • Partitioner抽象类与伴生对象
    • HashPartitioner
  • 总结

前言

按照计划,本文来讲解RDD的依赖与分区器。这两者不仅与之后调度系统的细节(DAG、Shuffle等)息息相关,而且也是面试Spark系大数据研发工程师时经常被问到的基础问题(反正我是会问的),因此看官也可以将本文当做一篇面试知识点解析来看。

RDD依赖

Dependency抽象类及子类

在Spark Core中,RDD依赖关系的基类就是Dependency抽象类。它的定义只有一句话。

代码#19.1 - o.a.s.Dependency抽象类

@DeveloperApi
abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

该类中只定义了一个方法rdd(),它用于取得当前RDD依赖的父RDD。Dependency及其子类的类图如下所示。

图#19.1 - Dependency继承体系

我们已经知道,RDD依赖分为窄依赖和宽依赖(Shuffle依赖)两种,下面分别来看。

窄依赖

所谓窄依赖,是指父RDD的每个分区都仅被子RDD的一个分区所依赖,也就是说子RDD的一个分区固定对应一个父RDD的单个分区。窄依赖在代码中的基类是NarrowDependency抽象类。

代码#19.2 - o.a.s.NarrowDependency抽象类

@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
  def getParents(partitionId: Int): Seq[Int]

  override def rdd: RDD[T] = _rdd
}

可见,NarrowDependency类带有一个构造方法参数_rdd,并重写rdd()方法让其返回之,它就是当前RDD依赖的父RDD。另外,它还定义了一个抽象方法getParents(),用来返回partitionId对应分区依赖的所有父RDD的分区ID。该方法由NarrowDependency的子类实现,分别为OneToOneDependency(一对一依赖)和RangeDependency(范围依赖),它们的代码都比较简单。

代码#19.3 - o.a.s.OneToOneDependency与RangeDependency类

@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
  extends NarrowDependency[T](rdd) {

  override def getParents(partitionId: Int): List[Int] = {
    if (partitionId >= outStart && partitionId < outStart + length) {
      List(partitionId - outStart + inStart)
    } else {
      Nil
    }
  }
}

可见,它们返回的都是只有一个元素的List,且OneToOneDependency的父子RDD分区ID严格相同,常见的map()或filter()等算子都会产生OneToOneDependency。而在RangeDependency中,子RDD中ID为partitionId的分区与父RDD中ID为(partitionId - outStart + inStart)的分区一一对应,其中inStart为父RDD分区ID的起始值,outStart为子RDD分区ID的起始值,例如union()算子就会产生这种依赖。

上面讲的两种情况都是一一对应关系。当子RDD分区对应多个父RDD的分区(如join()算子)时,也可以形成窄依赖。其前提是父子RDD的分区规则完全相同,即子RDD的某个分区p对应父RDD 1的分区p,也对应父RDD 2的分区p。如果分区规则不同,就会变成宽依赖。

下面的图来自网络,原始出处已不可考,比较形象地说明了窄依赖。

图#19.2 - 窄依赖的三种情况

宽依赖

严格来讲,它的名字应该叫“Shuffle依赖”,因为在Spark代码中,它的类名是ShuffleDependency。不过在中文圈子里,“宽依赖”这个名字也同样通用。它就是指子RDD的一个分区会对应一个父RDD的多个分区,并且往往是全部分区。ShuffleDependency类的代码如下。

代码#19.4 - o.a.s.ShuffleDependency类

@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient private val _rdd: RDD[_ <: Product2[K, V]],
    val partitioner: Partitioner,
    val serializer: Serializer = SparkEnv.get.serializer,
    val keyOrdering: Option[Ordering[K]] = None,
    val aggregator: Option[Aggregator[K, V, C]] = None,
    val mapSideCombine: Boolean = false)
  extends Dependency[Product2[K, V]] {
  override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

  private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
  private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
  private[spark] val combinerClassName: Option[String] =
    Option(reflect.classTag[C]).map(_.runtimeClass.getName)

  val shuffleId: Int = _rdd.context.newShuffleId()

  val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
    shuffleId, _rdd.partitions.length, this)

  _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}

由于Shuffle是调度系统的重要组成部分,因此我们这里只限于读ShuffleDependency的源码,做一些基本介绍,大头留到后面去说。

ShuffleDependency类有3个泛型参数,K代表键类型,V代表值类型,而C则代表Combiner的类型。如果看官有Hadoop MapReduce的基础,对Combiner这个词自然不会陌生。由于Shuffle过程对键值型数据才有意义,因此ShuffleDependency对父RDD的泛型类型有限制,必须是Product2[K,V]或者其子类,Product2在Scala中代表两个元素的笛卡尔积。

其他构造方法参数说明如下:

  • partitioner:分区器,下面马上就会讲到。
  • serializer:闭包序列化器,SparkEnv中已经创建,为JavaSerializer。
  • keyOrdering:可选的对键类型K排序的排序规则。
  • aggregator:可选的Map端数据聚合逻辑。
  • mapSideCombine:指定是否启用Map数据预聚合。

随着宽依赖的创建,还会调用SparkContext.newShuffleId()方法分配一个新的Shuffle ID,以及调用ShuffleManager.registerShuffle方法注册该Shuffle,返回Shuffle句柄(ShuffleHandle)。虽然在很久之前的几篇文章中讲过Shuffle相关的话题,但是在真正讲到调度系统时,还是会继续深挖的。

下面的图形象地说明了宽依赖。

图#19.3 - 宽依赖的两种情况

RDD分区器

Partitioner抽象类与伴生对象

在上一篇文章讲RDD时,Partitioner就已经出现了,并且它在上一节的ShuffleDependency代码中也是作为构造参数出现。在Shuffle过程中,必须得有确定的计算逻辑来决定父RDD的分区数据如何分配并对应到子RDD的分区中,这就是分区器Partitioner的职责。

Partitioner抽象类的定义也很简单。

代码#19.5 - o.a.s.Partitioner抽象类

abstract class Partitioner extends Serializable {
  def numPartitions: Int
  def getPartition(key: Any): Int
}

numPartitions()方法返回分区总数,而getPartitions()方法根据键返回其将被映射到的分区ID。

Partitioner还带有一个伴生对象,其中定义了defaultPartitioner()方法,顾名思义,它(根据上游的一个或一些RDD)返回默认的分区逻辑,其代码如下。

代码#19.6 - o.a.s.Partitioner.defaultPartitioner()/isEligiblePartitioner()方法

  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    val rdds = (Seq(rdd) ++ others)
    val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
    val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) {
      Some(hasPartitioner.maxBy(_.partitions.length))
    } else {
      None
    }

    val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
      rdd.context.defaultParallelism
    } else {
      rdds.map(_.partitions.length).max
    }

    if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
        defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
      hasMaxPartitioner.get.partitioner.get
    } else {
      new HashPartitioner(defaultNumPartitions)
    }
  }

  private def isEligiblePartitioner(
     hasMaxPartitioner: RDD[_],
     rdds: Seq[RDD[_]]): Boolean = {
    val maxPartitions = rdds.map(_.partitions.length).max
    log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1
  }

该方法会从输入的所有RDD中取出那些定义了分区逻辑的RDD,然后找到其中分区数最大的那个Partitioner。如果SparkConf中定义了缺省并行度配置项,即spark.default.parallelism,那么默认分区器的分区数就会采用该参数的值,否则就直接用所有RDD中最大的分区数(这就是为什么几乎总是推荐在提交Spark作业时设定spark.default.parallelism)。

然后,调用isElegiblePartitioner()方法,判断分区数最大的那个Partitioner是否“合格”,判断逻辑是其分区数与所有上游RDD中最大分区数之差小于一个数量级。如果通过检查,并且默认分区数比它小,就采用分区数最大的那个Partitioner作为分区逻辑,否则用默认分区数构造一个新的HashPartitioner并返回。

Partitioner在Spark Core中的实现类主要有两个:基于散列的HashPartitioner和基于采样范围的RangePartitioner,前者是默认实现。下面我们就以HashPartitioner为例来看看其具体实现(RangePartitioner太麻烦了)。

HashPartitioner

代码#19.7 - o.a.s.HashPartitioner类

class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

可见,在重写的getPartition()方法中,会取得键的hashCode值,对分区数numPartitions取模,返回其绝对值,这样就确保映射到的分区落在[0,numPartitions - 1]的区间内。为了判断两个HashPartitioner是否相等,也必须同时重写其equals()和hashCode()方法,判断标准自然就只有分区数了。

我们也可以很容易地想到,用户通过自己继承Partitioner类,能够自定义分区逻辑。下面就是一个简单的示例,它通过Key的长度来分区。由于它不属于Spark源码,就不编号了。

class KeyLengthPartitioner(partitions: Int) extends Partitioner {
  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = {
    return key.asInstanceOf[String].length() & (partitions - 1)
  }
}

总结

本文分别以Spark Core中的Dependency与Partitioner两个抽象类为起点,比较详细地讲解了Spark中RDD依赖关系与分区逻辑的具体设计。依赖与分区是RDD五要素中最重要的两个点,在今后的源码阅读过程中,会经常用到它们。

— THE END —

原文发布于微信公众号 - 暴走大数据(zhouqiantanxi)

原文发表时间:2019-08-14

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

扫码关注云+社区

领取腾讯云代金券