前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[SPARK][CORE] 面试问题之 SortShuffleWriter的实现详情

[SPARK][CORE] 面试问题之 SortShuffleWriter的实现详情

作者头像
Tim在路上
发布2022-05-27 08:06:06
3190
发布2022-05-27 08:06:06
举报

SortShuffleWriter 是最基础的ShuffleWriter, 当其他几个ShuffleWriter不满足条件,或存在mapSide的聚合时只能选择SortShuffleWriter,它是支持最全面的兜底ShuffleWriter。

SortShuffleWriter又是如何实现大数据量下的shuffleWriter过程呢?

SortShuffleWriter源码详解

sortShuffleWriter也是被ShuffleWriteProcessor 调用的,在ShuffleWriteProcessor 中实现了

ShuffleWriter的获取, RDD write的写入和mapStatus的返回。具体可以参考Bypass文章。

那我们详细介绍下sortShuffleWriter如何实现write的过程:

代码语言:javascript
复制
// sortShuffleWriter
override def write(records: Iterator[Product2[K, V]]): Unit = {
// [1] 首先创建基于JVM的外排器ExternalSorter, 如果是需要mapSide聚合的,封装进去aggregator和ordering
sorter= if (dep.mapSideCombine) {
    new ExternalSorter[K, V, C](
      context,dep.aggregator,Some(dep.partitioner),dep.keyOrdering,dep.serializer)
  } else {
    // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
    // care whether the keys get sorted in each partition; that will be done on the reduce side
    // if the operation being run is sortByKey.
    new ExternalSorter[K, V, V](
      context, aggregator = None,Some(dep.partitioner), ordering = None,dep.serializer)
  }
// [2] mapTask的records全部insert到外部排序器
sorter.insertAll(records)

  // Don't bother including the time to open the merged output file in the shuffle write time,
  // because it just opens a single file, so is typically too fast to measure accurately
  // (see SPARK-3570).
  // [3] 创建处理mapTask所有分区数据commit提交writer
  val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
dep.shuffleId, mapId,dep.partitioner.numPartitions)
// [4] 将写入ExternalSorter中的所有数据写出到一个map output writer中
sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
// [5] 提交所有分区长度,生成索引文件
partitionLengths= mapOutputWriter.commitAllPartitions(sorter.getChecksums).getPartitionLengths
mapStatus=MapStatus(blockManager.shuffleServerId,partitionLengths, mapId)
}

可以看到在sortShuffleWrite中主要有以下五个步骤:

  • [1] 首先创建基于JVM的外排器ExternalSorter, 如果是需要mapSide聚合的,封装进去aggregator和ordering
  • [2] mapTask的records全部insert到外部排序器
  • [3] 创建处理mapTask所有分区数据commit提交writer
  • [4] 将写入ExternalSorter中的所有数据写出到一个map output writer中
  • [5] 提交所有分区长度,生成索引文件

从这里可以看出完成排序和溢写文件的操作主要是在ExternalSorter外部排序器中。

在进一步的学习前,我们先来简单了解了ExternalSorter。

ExternalSorter是一个外部的排序器,它提供将map任务的输出存储到JVM堆中,同时在其内部封装了PartitionedAppendOnlyMapPartitionedPairBuffer 用于数据的buffer, 如果采用PartitionedAppendOnlyMap 可以提供数据的聚合。此外其中还封装了spill , keyComparator, mergeSort 等提供了,使用分区计算器将数据按Key分组到不同的分区,然后使用比较器对分区中的键值进行排序,将每个分区输出到单个文件中方便reduce端进行fetch。

代码语言:javascript
复制
// ExternalSorter
def insertAll(records: Iterator[Product2[K, V]]): Unit = {
  //TODO: stop combining if we find that the reduction factor isn't high
val shouldCombine = aggregator.isDefined

  // [1] 是否需要在mapSide的聚合
  if (shouldCombine) {
    // [1.1] 通过aggregator获取mergeValue和createCombiner
    // Combine values in-memory first using our AppendOnlyMap
    val mergeValue = aggregator.get.mergeValue
    val createCombiner = aggregator.get.createCombiner
    var kv: Product2[K, V] = null
    val update = (hadValue: Boolean, oldValue: C) => {
      if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
    }
   // [2] 如果需要map端聚合,将数据写入map缓存中
    while (records.hasNext) {
      addElementsRead()
      kv = records.next()
map.changeValue((getPartition(kv._1), kv._1), update)
      maybeSpillCollection(usingMap = true)
    }
  } else {
    // Stick values into our buffer
    while (records.hasNext) {
      addElementsRead()
      val kv = records.next()
   // [2] 如果不需要map端聚合,将数据写入buffer缓存中
buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
      // [3] 判断是否需要溢写,并进行溢写
      maybeSpillCollection(usingMap = false)
    }
  }
}

在insertAll中主要将数据写入缓存中,如果需要map端聚合的写入PartitionedAppendOnlyMap 中,不需要map端聚合的写入PartitionedPairBuffer ,最后调用maybeSpillCollection进行溢写操作。

我们先看下两种数据结构的异同点:

  • PartitionedAppendOnlyMap中数据存储在父类AppendOnlyMap的data数组中,PartitionedPairBuffer数据就存在该类的data数组中;
  • PartitionedAppendOnlyMap间接的继承SizeTracker,PartitionedPairBuffer是直接继承SizeTracker,用来进行要记录数据采样大小,以便上层进行适时的申请内存以及溢写磁盘操作
  • AppendOnlyMap会对元素在内存中进行更新或聚合,而PartitionedPairBuffer不支持map端聚合操作,只起到数据缓冲的作用;
  • 两者都实现了WritablePartitionedPairCollection,可以根据partitonId排序,也可以根据partitionId+key进行排序操作返回排序后的迭代器数据。

SizeTrackingAppendOnlyMap是继承自AppendOnlyMap 类并实现了SizeTracker 接口,其中SizeTracker通过对数据结构的采样对缓存大小进行估算的一种实现。AppendOnlyMap 是类似于HashMap的数据接口。主要针对java中的map不能缓存null值的情况,实现了基于array[]数组实现的k-v键值对缓存接口。

代码语言:javascript
复制
```

1d.png

代码语言:javascript
复制
   在`AppendOnlyMap` 中时将k-v依次放入到数组中缓存数据。在HASH冲突时,Java原生的`HashMap`是通过拉链法去解决hash冲突的,`AppendOnlyMap`是通过开放地址法–线性探测的方法进行解决冲突的,线性探测间隔总是固定的,通常为1。 `AppendOnlyMap`支持key为null的情况,使用一个变量`nullValue`保存对应的值,`haveNullValue`表示是否存在null的key,如果之前不存在,就将size+1,然后更新值;存在时候只需要更新值即可;另外一点和java的`HashMap`不同的是,`AppendOnlyMap`提供了聚合的方法,来应对[shuffle](<https://so.csdn.net/so/search?q=shuffle&spm=1001.2101.3001.7020>)过程中指定了map-side聚合的情况,使用者需要提供`updateFunc` 。

   由于PartitionedPairBuffer只是一个数据缓冲区,不需要对元素进行聚合操作等,所以添加元素直接将元素append到数组的back即可,不过需要先判断数据容量是否已经满了,满了则需要扩容。然后首先会将<partition, key>作为Tuple放在2*curSize位置上,然后相邻位置2*curSize+1放具体的value,添加完毕后需要进行重采样操作。

总而言之,AppendOnlyMap的行为更像map,元素以散列的方式放入data数组,而PartitionedPairBuffer的行为更像collection,元素都是从data数组的起始索引0和1开始连续放入的。

了解了map和buffer两种数据结果,那么接下来我们学习下它是如何进行溢出处理的?

代码语言:javascript
复制
// ExternalSorter
private def maybeSpillCollection(usingMap: Boolean): Unit = {
  var estimatedSize = 0L
  if (usingMap) {
    // [1] 估算当前缓存数据结构的size
    estimatedSize =map.estimateSize()
    // [2] 判断是否需要溢写,如果执行溢写后,会重新创建缓存数据结构
    if (maybeSpill(map, estimatedSize)) {
map= new PartitionedAppendOnlyMap[K, C]
    }
  } else {
    estimatedSize =buffer.estimateSize()
    if (maybeSpill(buffer, estimatedSize)) {
buffer= new PartitionedPairBuffer[K, C]
    }
  }
   // [3] 记录当前的峰值内存
  if (estimatedSize >_peakMemoryUsedBytes) {
_peakMemoryUsedBytes= estimatedSize
  }
}

判断是否需要溢出主要有以下三步:

  • [1] 估算当前缓存数据结构的size
  • [2] 判断是否需要溢写,如果执行溢写后,会重新创建缓存数据结构
  • [3] 记录当前的峰值内存

在执行spill前会先尝试申请内存,不满足才会进行溢出:

代码语言:javascript
复制
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
  var shouldSpill = false
  // [1] 如果当前的记录数是32的倍数, 同时当前内存超过了门限,默认5M
  if (elementsRead % 32 == 0 && currentMemory >=myMemoryThreshold) {
    // Claim up to double our current memory from the shuffle memory pool
    // [2] 尝试申请2倍当前内存,并将门限调整为两倍当前内存
    val amountToRequest = 2 * currentMemory -myMemoryThreshold
val granted = acquireMemory(amountToRequest)
myMemoryThreshold+= granted
    // If we were granted too little memory to grow further (either tryToAcquire returned 0,
    // or we already had more memory than myMemoryThreshold), spill the current collection
    // [3] 如果没申请下来,则应该进行spill, 或者当前写入的records数大于了强制spill门限,默认是整数的最大值
    shouldSpill = currentMemory >=myMemoryThreshold
}
  shouldSpill = shouldSpill ||_elementsRead>numElementsForceSpillThreshold
 // [4] 进行spill
// Actually spill
  if (shouldSpill) {
_spillCount+= 1
    logSpillage(currentMemory)
    spill(collection)
_elementsRead= 0
_memoryBytesSpilled+= currentMemory
    releaseMemory()
  }
  shouldSpill
}

在真正溢写数据之前,writer会先申请内存扩容,如果申请不到或者申请的过少,才会开始溢写。这符合Spark尽量充分地利用内存的中心思想。

另外需要注意的是,传入的currentMemory参数含义为“缓存的预估内存占用量”,而不是“缓存的当前占用量”。这是因为PartitionedAppendOnlyMap与PartitionedPairBuffer都能动态扩容,并且具有SizeTracker特征,能够通过采样估计其大小。

负责溢写数据的spill()方法是抽象方法,其实现仍然在ExternalSorter中。

代码语言:javascript
复制
// ExternalSorter
override protected[this] def spill(collection: WritablePartitionedPairCollection[K, C]): Unit = {
    //【根据指定的比较器comparator进行排序,返回排序结果的迭代器】
    //【如果细看的话,destructiveSortedWritablePartitionedIterator()方法最终采用TimSort算法来排序】
    val inMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)
    //【将内存数据溢写到磁盘文件】
    val spillFile = spillMemoryIteratorToDisk(inMemoryIterator)
    //【用ArrayBuffer记录所有溢写的磁盘文件】
    spills += spillFile
  }

那么 sortShuffleWriter是如何将in-memory中的数据溢写到磁盘的?

代码语言:javascript
复制
/**
 * Spill contents of in-memory iterator to a temporary file on disk.
 */
private[this] def spillMemoryIteratorToDisk(inMemoryIterator: WritablePartitionedIterator[K, C])
    : SpilledFile = {
  // Because these files may be read during shuffle, their compression must be controlled by
  // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
  // createTempShuffleBlock here; see SPARK-3426 for more context.
  // [1] 创建临时的blockid和对应的file
  val (blockId, file) =diskBlockManager.createTempShuffleBlock()

  // These variables are reset after each flush
  var objectsWritten: Long = 0
  val spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
  // [2] 创建个DiskBlockObjectWriter的写出流
  val writer: DiskBlockObjectWriter =
blockManager.getDiskWriter(blockId, file,serInstance,fileBufferSize, spillMetrics)

  // List of batch sizes (bytes) in the order they are written to disk
  val batchSizes = new ArrayBuffer[Long]

  // How many elements we have in each partition
  val elementsPerPartition = new Array[Long](numPartitions)

  // Flush the disk writer's contents to disk, and update relevant variables.
  // The writer is committed at the end of this process.
  def flush(): Unit = {
    val segment = writer.commitAndGet()
    batchSizes += segment.length
_diskBytesSpilled+= segment.length
    objectsWritten = 0
  }

  var success = false
  try {
    // [3] 遍历内存数据结构中的数据,在调用writeNext迭代器时会根据comparator按key排序,缓存中的key为(partitionId, key), 会先按分区排序,再按key排序。
    while (inMemoryIterator.hasNext) {
      val partitionId = inMemoryIterator.nextPartition()
require(partitionId >= 0 && partitionId <numPartitions,
        s"partition Id:${partitionId} should be in the range [0,${numPartitions})")
      inMemoryIterator.writeNext(writer)
      // [3.1] 记录每个分区的元数数
      elementsPerPartition(partitionId) += 1
      objectsWritten += 1
      // [3.2] 默认每1000条生成一个fileSegement
      if (objectsWritten ==serializerBatchSize) {
        flush()
      }
    }
    if (objectsWritten > 0) {
      flush()
      writer.close()
    } else {
      writer.revertPartialWritesAndClose()
    }
    success = true
  } finally {
    if (!success) {
      // This code path only happens if an exception was thrown above before we set success;
      // close our stuff and let the exception be thrown further
      writer.revertPartialWritesAndClose()
      if (file.exists()) {
        if (!file.delete()) {
          logWarning(s"Error deleting${file}")
        }
      }
    }
  }
// [4] 最终将溢写的文件封装为SpilledFile返回
SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
}

实现溢写有四个步骤:

  • [1] 创建临时的blockid和对应的file
  • [2] 创建个DiskBlockObjectWriter的写出流
  • [3] 遍历内存数据结构中的数据,在调用writeNext迭代器时会根据comparator按key排序,缓存中的key为(partitionId, key), 会先按分区排序,再按key排序。
    • [3.1] 记录每个分区的元素数
    • [3.2] 默认每1000条,进行一次flush生成一个fileSegement
  • [4] 最终将溢写的文件封装为SpilledFile返回

从这里可以看出spillMemoryIteratorToDisk是真正的溢写类,其完成了数据的排序和溢写。从上面代码可以看出,这里只创建了一个临时文件,一个DiskBlockObjectWriter写出流。这相比于Bypass的为每个分区创建一个io流和临时文件, 是少了许多。这得益于其基于缓存的排序,首先按partitionid排序,然后按key排序,天然的将不同的分区聚集到了一起。

在溢写的过程中,如果满足溢写的条件就会溢写出一个SpilledFile,或产生很多文件,最终是如何汇总实现的呢?那我们看看sortShuffle是如何将写入ExternalSorter中的所有数据写出到一个map output writer中吧。

由于代码太长,我们跳过spills.isEmpty的情况,这种情况下我们不复杂就是将缓存中的数据排序写出就完成了,我们主要看下存在溢写的情况:

代码语言:javascript
复制
// ExternalSorter
def writePartitionedMapOutput(
    shuffleId: Int,
    mapId: Long,
    mapOutputWriter: ShuffleMapOutputWriter): Unit = {
  var nextPartitionId = 0
  if (spills.isEmpty) {
    // Case where we only have in-memory data
    val collection = if (aggregator.isDefined)mapelsebuffer
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
    while (it.hasNext) {
      ...
    }
  } else {
    // We must perform merge-sort; get an iterator by partition and write everything directly.
    // [1] 调用分区迭代器,将分区数据生成(id, elements)二元组
    for ((id, elements) <- this.partitionedIterator) {
      val blockId =ShuffleBlockId(shuffleId, mapId, id)
      var partitionWriter: ShufflePartitionWriter = null
      var partitionPairsWriter: ShufflePartitionPairsWriter = null
      TryUtils.tryWithSafeFinally{
        // 每个分区打开的writer进行并发写入的优化,最终生成一个文件
        partitionWriter = mapOutputWriter.getPartitionWriter(id)
        partitionPairsWriter = new ShufflePartitionPairsWriter(
          partitionWriter,
serializerManager,
serInstance,
          blockId,
          context.taskMetrics().shuffleWriteMetrics,
          if (partitionChecksums.nonEmpty)partitionChecksums(id) else null)
        if (elements.hasNext) {
          for (elem <- elements) {
            partitionPairsWriter.write(elem._1, elem._2)
          }
        }
      } {
        if (partitionPairsWriter != null) {
          partitionPairsWriter.close()
        }
      }
      nextPartitionId = id + 1
    }
  }

  context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
  context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
  context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
}

分区迭代器的实现代码:

代码语言:javascript
复制
// ExternalSorter
def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = {
  val usingMap = aggregator.isDefined
  val collection: WritablePartitionedPairCollection[K, C] = if (usingMap)mapelsebuffer
// [1] 如果没有溢写,直接groupByPartition
if (spills.isEmpty) {
    // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
    // we don't even need to sort by anything other than partition ID
    if (ordering.isEmpty) {
      // The user hasn't requested sorted keys, so only sort by partition ID, not key
      groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
    } else {
      // We do need to sort by both partition ID and key
      groupByPartition(destructiveIterator(
        collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
    }
  } else {
   // [2] 存在溢写,需要先将在内存中和溢写文件中的数据封装为迭代器执行归并排序, 归并排序时通过最小堆实现的
    // Merge spilled and in-memory data
    merge(spills.toSeq, destructiveIterator(
      collection.partitionedDestructiveSortedIterator(comparator)))
  }
}

从整个shuffle write流程可知,每一个ShuffleMapTask不管是否需要mapSide的聚合都会将数据写入到内存缓存中,如果申请不到内存或者达到强制溢出的条件,则会将缓存中的数据溢写到磁盘,在溢写前会使用TimSort对缓存中的数据进行排序,并将其封装为SpilledFile返回,此时溢写文件中的数据是可能存在多个分区的数据的。

在输出之前会将写入到ExternalSort中的数据写出到一个map output Writer中。写出时如果存在溢写,会分别从SpilledFile和缓存中获取对应分区的迭代器,交由归并排序实现数据的合并,这里的归并排序使用的是最小堆,然后在将其交由最终output Writer进行写出。最后提交文件和各分区长度,生成索引文件。

总之,通过SortShuffleWriter只会产生两个文件,一个分区的数据文件,一个索引文件。整个sortshuffleWriter过程只会产生2 * M 个中间文件。

今天就先到这里,通过上面的介绍,我们也留下些面试题:

  1. 如果数据全部写到缓存buffer中,如何实现最终的归并排序? 那么如果存在溢写数据,又如何实现归并排序? SortShuffleWriter 是如何实现的?
  2. SortShuffleWriter 中实现了数据排序,那么最终形成的结果是全局有序的吗?
  3. 一句话简单说写SortShufflerWriter的实现过程?
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2022-05-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • SortShuffleWriter源码详解
相关产品与服务
数据保险箱
数据保险箱(Cloud Data Coffer Service,CDCS)为您提供更高安全系数的企业核心数据存储服务。您可以通过自定义过期天数的方法删除数据,避免误删带来的损害,还可以将数据跨地域存储,防止一些不可抗因素导致的数据丢失。数据保险箱支持通过控制台、API 等多样化方式快速简单接入,实现海量数据的存储管理。您可以使用数据保险箱对文件数据进行上传、下载,最终实现数据的安全存储和提取。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档