Spark Shuffle Write阶段磁盘文件分析

前言

上篇写了 Spark Shuffle 内存分析 后,有不少人提出了疑问,大家也对如何落文件挺感兴趣的,所以这篇文章会详细介绍,Sort Based Shuffle Write 阶段是如何进行落磁盘的

流程分析

入口处:

org.apache.spark.scheduler.ShuffleMapTask.runTask

runTask对应的代码为:

val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](
                              dep.shuffleHandle, 
                              partitionId, 
                              context)
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get

这里manager 拿到的是

   org.apache.spark.shuffle.sort.SortShuffleWriter

我们看他是如何拿到可以写磁盘的那个sorter的。我们分析的线路假设需要做mapSideCombine

 sorter = if (dep.mapSideCombine) {  
 require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")  
 new ExternalSorter[K, V, C](
                 dep.aggregator, 
                 Some(dep.partitioner), 
                 dep.keyOrdering, de.serializer)

接着将map的输出放到sorter当中:

sorter.insertAll(records)

其中insertAll 的流程是这样的:

 while (records.hasNext) {  
 addElementsRead()  kv = records.next() 
 map.changeValue((getPartition(kv._1), kv._1), update)
 maybeSpillCollection(usingMap = true)}

里面的map 其实就是PartitionedAppendOnlyMap,这个是全内存的一个结构。当把这个写满了,才会触发spill操作。你可以看到maybeSpillCollection在PartitionedAppendOnlyMap每次更新后都会被调用。

一旦发生呢个spill后,产生的文件名称是:

    "temp_shuffle_" + id

逻辑在这:

val (blockId, file) = diskBlockManager.createTempShuffleBlock() 

  def createTempShuffleBlock(): (TempShuffleBlockId, File) = {  
  var blockId = new TempShuffleBlockId(UUID.randomUUID()) 
        while (getFile(blockId).exists()) {   
           blockId = new TempShuffleBlockId(UUID.randomUUID())  
        }  
  (blockId, getFile(blockId))
  }

产生的所有 spill文件被被记录在一个数组里:

  private val spills = new ArrayBuffer[SpilledFile]

迭代完一个task对应的partition数据后,会做merge操作,把磁盘上的spill文件和内存的,迭代处理,得到一个新的iterator,这个iterator的元素会是这个样子的:

 (p, mergeWithAggregation(  
             iterators, 
             aggregator.get.mergeCombiners, keyComparator,
             ordering.isDefined))

其中p 是reduce 对应的partitionId, p对应的所有数据都会在其对应的iterator中。

接着会获得最后的输出文件名:

val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

文件名格式会是这样的:

 "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"

其中reduceId 是一个固定值NOOP_REDUCE_ID,默认为0。

然后开始真实写入文件

   val partitionLengths = sorter.writePartitionedFile(
     blockId, 
     context, 
     outputFile)

写入文件的过程过程是这样的:

for ((id, elements) <- this.partitionedIterator) { 
 if (elements.hasNext) {   
 
val writer = blockManager.getDiskWriter(blockId,
      outputFile, 
      serInstance,
      fileBufferSize,  
      context.taskMetrics.shuffleWriteMetrics.get)   

for (elem <- elements) {     
     writer.write(elem._1, elem._2)   
 }   
 
writer.commitAndClose()    
val segment = writer.fileSegment()   
lengths(id) = segment.length  
   }
}

刚刚我们说了,这个 this.partitionedIterator 其实内部元素是reduce partitionID -> 实际record 的 iterator,所以它其实是顺序写每个分区的记录,写完形成一个fileSegment,并且记录偏移量。这样后续每个的reduce就可以根据偏移量拿到自己需要的数据。对应的文件名,前面也提到了,是:

"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".data"

刚刚我们说偏移量,其实是存在内存里的,所以接着要持久化,通过下面的writeIndexFile来完成:

 shuffleBlockResolver.writeIndexFile(
           dep.shuffleId,
           mapId, 
          partitionLengths)

具体的文件名是:

  "shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".index"

至此,一个task的写入操作完成,对应一个文件。

最终结论

所以最后的结论是,一个Executor 最终对应的文件数应该是:

MapNum (注:不包含index文件)

同时持有并且会进行写入的文件数最多为::

 CoreNum

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏岑玉海

Spark源码系列(一)spark-submit提交作业过程

前言 折腾了很久,终于开始学习Spark的源码了,第一篇我打算讲一下Spark作业的提交过程。 ? 这个是Spark的App运行图,它通过一个Driver来和集...

52760
来自专栏chenssy

【死磕Sharding-jdbc】---读写分离

先执行 sharding-jdbc-example-config-spring-masterslave模块中的的SQL脚本 all_schema.sql,这里有...

18640
来自专栏芋道源码1024

分布式作业 Elastic-Job-Lite 源码分析 —— 作业数据存储

JobNodePath,作业节点路径类。作业节点是在普通的节点前加上作业名称的前缀。

13420
来自专栏架构师小秘圈

HBase极简教程

HBase 系统架构 HBase是Apache Hadoop的数据库,能够对大型数据提供随机、实时的读写访问。HBase的目标是存储并处理大型的数据。HBase...

51660
来自专栏公有云大数据平台弹性 MapReduce

腾讯云EMR使用说明: 配置工作流

本文将通过一个简单,并且具有典型代表的例子,描述如何使用EMR产品中的Hue组件创建工作流,并使该工作流每天定时执行。

10.2K130
来自专栏有趣的django

Django rest framework源码分析(3)----节流

添加节流 自定义节流的方法  限制60s内只能访问3次 (1)API文件夹下面新建throttle.py,代码如下: # utils/throttle.py ...

53680
来自专栏我是攻城师

如何给Apache Pig自定义UDF函数?

39260
来自专栏阿杜的世界

分布式系统学习资料(ing)一、理论知识二、数据库三、分布式服务调用四、消息系统

10920
来自专栏芋道源码1024

分布式作业 Elastic-Job-Lite 源码分析 —— 主节点选举

无中心化,意味着 Elastic-Job-Lite 不存在一个中心执行一些操作,例如:分配作业分片项。Elastic-Job-Lite 选举主节点,通过主节点进...

13520
来自专栏分布式系统进阶

Kafka源码分析-启动流程

使用getPropsFromArgs方法来获取各配置项, 然后将启动和停止动作全部代理给KafkaServerStartable类;

24400

扫码关注云+社区

领取腾讯云代金券