前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark RDD 分布式弹性数据集

Spark RDD 分布式弹性数据集

作者头像
Tim在路上
发布2022-03-23 14:19:13
3560
发布2022-03-23 14:19:13
举报
文章被收录于专栏:后台技术底层理解

Spark RDD 分布式弹性数据集

rdd是一种弹性分布式的数据集,它代表着不可变的数据元素,可以被分区并行处理。

rdd是一个粗粒度的数据生成方式和流转迭代计算方式的描述。它可以通过稳定的存储器或者从其他RDD生成,它并不需要急着进行转换,只需要在特定的rdd进行一次性的数据的迭代流转。rdd记录着自己的依赖关系,以防在数据丢失时可以通过“血缘”关系再次生成数据。用户也可以自己选择在经常重用的rdd进行数据落地,放置丢失后重做。

rdd的特性总结:

  1. 显式抽象。将运算中的数据集进行显式抽象,定义了其接口和属性。由于数据集抽象的统一,从而可以将不同的计算过程组合起来进行统一的 DAG 调度。
  2. 基于内存。相较于 MapReduce 中间结果必须落盘,RDD 通过将结果保存在内存中,从而大大降低了单个算子计算延迟以及不同算子之间的加载延迟。
  3. 宽窄依赖。在进行 DAG 调度时,定义了宽窄依赖的概念,并以此进行阶段划分,优化调度计算。
  4. 谱系容错。主要依赖谱系图计算来进行错误恢复,而非进行冗余备份,因为内存实在是有限,只能以计算换存储了。
  5. 交互查询。修改了 Scala 的解释器,使得可以交互式的查询基于多机内存的大型数据集。进而支持类 SQL 等高阶查询语言。

RDD与共享内存的比较

分布式的共享内存是一种细粒度的读写,可以对每个存储单元进行读写,其一致性需要程序进行维护,其容错性需要设置检查点和程序回滚。但是RDD由于是不可变的粗粒度的读写,更适合于批量读写的任务,其可以使用“血缘”机制恢复数据,减少了设置检查点的开销。如果出现失败时,也只用重新计算分区中丢失的那一部分。另一方面,RDD的不可变性可以让系统可以像mapreduce一样采用后备任务的方式来代替运行缓慢的任务,不会出现相互影响的情况。

另外rdd也吸取了分布式共享内存的特性,rdd的批量操作可以根据数据所处的位置进行优化,提高性能。加载数据时,当内存不足时,rdd的性能下降是平稳的,不能载入内存的分区可以存储在磁盘上。

RDD 源码

代码语言:javascript
复制
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {

// rdd维护这sparkContext上下文环境和rdd的依赖关系
private def sc: SparkContext = {
    if (_sc == null) {
      throw new SparkException(...)
    }
    _sc
}

/** Construct an RDD with just a one-to-one dependency on one parent */
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context, List(new OneToOneDependency(oneParent)))

// 1. 实现compute函数实现rdd每个分区的迭代计算
// Implemented by subclasses to compute a given partition.
 def compute(split: Partition, context: TaskContext): Iterator[T]

/**
   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   *
   * The partitions in this array must satisfy the following property:
   *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
   */
  // 2. 每个rdd都有一个或多个分区
  protected def getPartitions: Array[Partition]

/**
   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
   * be called once, so it is safe to implement a time-consuming computation in it.
   */
  // 3. rdd维护这自己的依赖关系
  protected def getDependencies: Seq[Dependency[_]] = deps

  /**
   * Optionally overridden by subclasses to specify placement preferences.
   */
  // 4. rdd可以根据数据的位置优化分区的计算位置,提高效率
  protected def getPreferredLocations(split: Partition): Seq[String] = Nil

  /** Optionally overridden by subclasses to specify how they are partitioned. */
  // 5. rdd都有自己的分区器,表明他们分区的方式
  @transient val partitioner: Option[Partitioner] = None
}

上面的5点是rdd都会实现的接口,这也是rdd都具有的特性。

1. RDD 分区

如上源码所示,RDD提供了分区的抽象函数,即protected def getPartitions: Array[Partition],每个继承RDD抽象类的RDD都会有自己的getPartitions的实现。RDD分区的多少代表着计算时的并发粒度。

用户可以自己指定执行的分区数,如果用户不自己指定,则使用默认的分区数。

  • ParallelCollectionRDD

Untitled.png 从图中看出,通过sparkContext的parallelize从集合生成RDD, 生成的是ParallelCollectionRDD,而partitions.length 分区数为8。

Untitled.png 在生成RDD传入设置分区大小为3,最后生成的分区数为3。

代码语言:javascript
复制
def parallelize[T: ClassTag](
    seq: Seq[T],
    numSlices: Int = defaultParallelism): RDD[T] = withScope {
  assertNotStopped()
  new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}

TaskSchedulerImpl类:
override def defaultParallelism(): Int = backend.defaultParallelism()
CoarseGrainedSchedulerBackend类:
override def defaultParallelism(): Int = {
    conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}

从源码中可以看出,如果不传入分区数,则默认分区数为defaultParallelism,而defaultParallelism=math.max(totalCoreCount.get(), 2)所以最小是2,最大是主机核数。

  • HadoopRDD

HadoopRDD是读取hdfs文件的rdd。HadoopRDD使用的是MapReduce API。

spark.sparkContext.textFile("hdfs://user/local/admin.text") 中textFile是读取hdfs文件的方法。其中会调用HadoopRDD。

代码语言:javascript
复制
override def getPartitions: Array[Partition] = {
    val jobConf = getJobConf()
    // add the credentials here as this can be called before SparkContext initialized
    SparkHadoopUtil.get.addCredentials(jobConf)
    try {
      // 从这里可以看出allInputSplits来自getSplits
      val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
      val inputSplits = if (ignoreEmptySplits) {
        allInputSplits.filter(_.getLength > 0)
      } else {
        allInputSplits
      }
      // 分区数为inputSplits.size
      val array = new Array[Partition](inputSplits.size)
      for (i <- 0 until inputSplits.size) {
        array(i) = new HadoopPartition(id, i, inputSplits(i))
      }
      array
    } catch {
       ...
    }
  }

FileInputFormat类:
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
        ...
        long totalSize = 0L;
        FileStatus[] arr$ = files;
        int len$ = files.length;
        // totalSize 为所有文件的总字节数
        for(int i$ = 0; i$ < len$; ++i$) {
            FileStatus file = arr$[i$];
            if (file.isDirectory()) {
                throw new IOException("Not a file: " + file.getPath());
            }

            totalSize += file.getLen();
        }
        // 对totalSize切分后的字节数goalSize,numSplits默认为min(2,totalCores)=2
        long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
        // minSize为配置的mapreduce切分值
        long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
        ....
        for(int i$ = 0; i$ < len$; ++i$) {
        ....
            else {
                    long blockSize = file.getBlockSize();
                    long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);

                    long bytesRemaining;
                    String[][] splitHosts;
                    // 如果在切片大小1.1倍内的都不会被切分
                    for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
                        splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
                        splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
                    }

                    if (bytesRemaining != 0L) {
                        splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
                        splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
                    }
                }
            }
        }

        sw.stop();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.elapsedMillis());
        }

        return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
    }

protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
        return Math.max(minSize, Math.min(goalSize, blockSize));
    }

textFile 是从HDFS分布式文件系统的所有节点上读取数据,返回Strings的RDD。

总结下HadoopRDD分区规则:

1.如果textFile指定分区数量为0或者1的话,defaultMinPartitions值为1,则有多少个文件,就会有多少个分区。

2.如果不指定默认分区数量,则默认分区数量为2,则会根据所有文件字节大小totalSize除以分区数量,得到的值goalSize,然后比较goalSize和hdfs指定分块大小(这里是128M)作比较,以较小的最为goalSize作为切分大小,对每个文件进行切分,若文件大于大于goalSize,则会生成该(文件大小/goalSize)个分区,如果文件内的数据不能除尽则分区数会+1,则为(fileSize/goalSize)+1。

3.如果指定分区数量大于等于2,则默认分区数量为指定值,生成实际分区数量规则任然同2中的规则一致。

总之:文件总大小除以分区数,大于分块大小,则与分块大小相关,否则以得到的商相关。

2. RDD 优先位置

rdd优先位置返回的是每一个分区的位置信息,按照移动计算的思路,将计算尽量分配到数据所在的机器上。

代码语言:javascript
复制
override def getPreferredLocations(split: Partition): Seq[String] = {
  val hsplit: InputSplit = split.asInstanceOf[HadoopPartition].inputSplit.value
  val locs = hsplit match {
    case lsplit: InputSplitWithLocationInfo =>
      HadoopRDD.convertSplitLocationInfo(lsplit.getLocationInfo)
    case _ => None
  }
  // 返回ip不为localhost的列表
  locs.getOrElse(hsplit.getLocations.filter(_ != "localhost"))
}

3. RDD 依赖关系

RDD的操作是粗粒度的操作,RDD进行转换会形成新的RDD。形成的RDD和原RDD形成依赖关系,RDD通过这种“血缘”关系来维护数据的容错性。RDD的依赖关系可以分为宽依赖和窄依赖两种。

  • 窄依赖:父RDD的每一个分区都只被一个子RDD的一个分区依赖。即是一对一的过程,当然也可以是多对一的过程(coalesce() 缩减分区数量)。总之,就是只要不发生shuffle过程,就可以归结为窄依赖的关系。窄依赖的RDD直接可以直接归结为一个pipeline, 分区内的计算可以发生在一台机器上,多个分区可以并发的执行,上一个rdd的分区计算完成后,将结果缓存在内存中,子RDD可以直接使用。其次,窄依赖的明确依赖关系,可以在数据发生错误后,只重新计算发生错误的一个分区。
  • 宽依赖:父RDD的分区被子RDD的多个分区依赖。即多对多的关系,其中由于一个父RDD需要将数据分发到子RDD的多个分区中,(不同分区可能在不同的机器上)所以需要发生数据的读写(shuffle过程)。宽依赖反生数据错误后,需要重新计算多个分区。
代码语言:javascript
复制
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val listRDD = spark.sparkContext.parallelize(1 until n, slices)
val mapRDD = listRDD.map(i => (i, i * i))
val groupRDD = mapRDD.groupByKey()

Untitled.png

从中可以看出mapRDD是OneToOneDependency依赖,其父RDD为ParallelCollectionRDD。

Untitled.png

从中可以看出groupRDD的依赖是ShuffleDependency依赖,其父依赖是MapPartitionsRDD。而groupbykey是需要进行shuffle的算子,属于宽依赖。

Spark通过创建的类来表明,RDD间的依赖关系的类型,NarrowDependency属于窄依赖,ShuffleDenpendency属于宽依赖。之后会通过一节来具体介绍其中的细节。

Untitled.png

4. RDD 分区计算

从上面的RDD源码可以发现,每个RDD中都存在一个compute()的函数,这个函数的作用就是为实现RDD具体的分区计算。

def compute(split: Partition, context: TaskContext): Iterator[T]

compute的返回值是分区的迭代器,每一个分区都会调用这个函数。只有到action算子才会真正的执行计算。

5. RDD 分区函数

partitioner指的是Spark的分区函数,目前最常用的有两种,HashPartitioner和RangePartitioner, 其次还有缩减分区数的分区函数CoalescedPartitioner。分区这个概念,只存在于(K,V)键值对的RDD中,非键值对的RDD中partitioner为None。

Untitled.png

分区函数即决定了RDD本身分区的数量,也决定了Shuffle中MapOut输出中每个分区进行切割的依据。

Untitled.png

从上图可以看出,非k-v RDD的分区器为None, k-v RDD的分区函数默认为HashPartitioner。

代码语言:javascript
复制
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
 ...
  // 如果存在配置并发配置,将并发配置作为默认分区数,否则上个rdd的最大值
  val defaultNumPartitions = if (rdd.context.conf.contains("spark.default.parallelism")) {
    rdd.context.defaultParallelism
  } else {
    rdds.map(_.partitions.length).max
  }

  // If the existing max partitioner is an eligible one, or its partitions number is larger
  // than the default number of partitions, use the existing partitioner.
  if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) ||
      defaultNumPartitions < hasMaxPartitioner.get.getNumPartitions)) {
    hasMaxPartitioner.get.partitioner.get
  } else {
    // 默认返回HashPartitioner
    new HashPartitioner(defaultNumPartitions)
  }
}

HashPartitioner会对数据的key进行 key.hascode%numpartitions 计算,得到的数值会放到对应的分区中,这样能较为平衡的分配数据到partition。

RangePartitioner:它是在排序算子中会用到的分区器,比如sortbykey、sortby、orderby等。该分区器先对输入的数据的key做采样,来估算Key的分布,然后按照指定的排序切分range,尽量让每个partition对应的range里的key分布均匀。

RDD 基本转换

rdd中的算子可以分为两种,一个是transformation, 一个是action算子。

1. Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。

2. Action:行动算子,这类算子会触发SparkContext提交Job作业。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2022.02.13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Spark RDD 分布式弹性数据集
    • RDD与共享内存的比较
      • RDD 源码
        • 1. RDD 分区
        • 2. RDD 优先位置
        • 3. RDD 依赖关系
        • 4. RDD 分区计算
        • 5. RDD 分区函数
      • RDD 基本转换
      相关产品与服务
      大数据
      全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档