一提到shuffle, 我们犹如“谈虎色变”。shuffle是大数据中的性能杀手,其来源于大数据中的元老级的组件Hadoop。
在Hadoop中,map被定义为数据的初次拆分获取解析阶段, reduce被定义为负责最终数据的收集汇总阶段,除了业务
逻辑的功能外,其他的核心数据处理都是由shuffle来支持。
在Hadoop组件中定义的Shuffle包括了什么呢? 为什么Shuffle是资源和时间开销比较大的阶段呢?
简单来说,Shuffle中有三次数据排序。
Spark 中的shuffle, 经历了Hash、Sort 和 Tungsten-Sort 3个重要阶段。
在1.1之前Spark采用Hash Shuffle, 1.1 之后引入Sort Shuffle, 1.6 时引入Tungsten-Sort Shuffle, 2.0 版本所有的Shuffle被统一到了Sort Shuffle中,3.2 时引入push-based shuffle; 当然还有未被合入社区,但在各大厂被开源使用Remote Shuffle Service;除此以外还有将向量计算引入shuffle计算的实现。
由上面的介绍可知, Shuffle 就是将map阶段的拆分数据,通过设置的聚合方式按照分区进行聚合后,由reduce进行处理的过程。下面我们来总的认识下Spark中的shuffle方式:
Hash Shuffle, 顾名思义,就是采取Hash的方式在Map的任务中为每个reduce端的任务生成一个文件。因此如果有M个map任务, R个reduce任务就会产生M x R个文件。巨量磁盘小文件而产生大量性能低下的Io操作,从而性能较低,因为其巨量的磁盘小文件还可能导致OOM,HashShuffle的合并机制通过重复利用buffer从而将磁盘小文件的数量降低到Core R个,但是当Reducer 端的并行任务或者是数据分片过多的时候,依然会产生大量的磁盘小文件。
开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个
磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘
文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少
个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。
Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的
shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会
写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效
将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升
shuffle write的性能。
Sort Shuffle 的引入是如何解决上述问题的呢?
首先,在Shuffle的map阶段会将所有数据进行排序,并将分区的数据写入同一个文件中,在创建数据文件的同时会产生索引文件,来记录分区的大小和偏移量。所以这里产生文件的数量和reduce分区就没有关系了,只会产生2 * M个临时文件。
下面我们先通过简单分析来对Spark Shuffle有个简单的了解,后面再详细介绍:
Spark 在启动时就会在SparkEnv中创建ShuffleManager来管理shuffle过程。
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames =Map(
"sort" ->classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" ->classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
从上面的代码可以看出,Spark目前只有唯一一种ShuffleManager的实现方式,就是SortShuffleManager。
下面我们可以看下ShuffleManager这个接口类:
private[spark] trait ShuffleManager {
/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*向shuffleManager注册shuffle,并返回handle
*/
def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle
/** Get a writer for a given partition. Called on executors by map tasks. */
def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context: TaskContext): ShuffleWriter[K, V]
/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
def getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]
...
}
我们都知道在Spark的DAG中,顶点是一个个 RDD,边则是 RDD 之间通过 dependencies 属性构成的父子关系。dependencies 又分为宽依赖和窄依赖,分别对应ShuffleDependency和NarrowDependency。当RDD间的依赖关系为ShuffleDependency时,RDD会通过其SparkEnv向ShuffleManager注册一个shuffle, 并返回当前处理当前shuffle所需要的句柄。
Spark有2个类型的Task: ShuffleMapTask和ResultTask,在Spark中stage是以Pipeline运行的, 除了最后一个Stage对应的是ResultStage,其余的Stage对应的都是ShuffleMapStage。
ShuffleMapStage中的每个Task,叫做ShuffleMapTask,getWriter()方法的调用主要是在ShuffleMapTask中进行。调用getWriter方法会返回一个ShuffleWriter的trait。
除了需要从外部存储读取数据和RDD已经做过cache或者checkpoint的Task,一般Task的开始都是从ShuffledRDD的调用getReader()。调用getReader()会返回一个ShuffleReader的trait。
微信截图_20220519215721.png
综上,Spark的Shuffle模块主要有ShuffleManager、ShuffleWriter和ShuffleReader。ShuffleManager目前在社区版中只有SortShuffleManager一种实现,ShuffleReader也只有BlockStoreShuffleReader一种实现,但是ShuffleWriter目前有BypassMergerSortShuffleWriter, SortShuffleWriter和UnsafeShuffleWriter三种实现。
push-based shuffle方案,会在mapper执行后并且会被自动合并数据,然后将数据移动到下游的reducer。目前只支持Yarn方式的实现。
在中大规模的Spark shuffle中,Shuffle依然是很多的性能问题:
push-based shuffle 利用共享的ESS服务,在map阶段时将溢写的数据文件,通过推送的方式推送到reduce对应的ESS节点,并在其中对小文件进行合并。其中push-based shuffle 实现了一个magnet shuffle服务,是一个增强的Spark ESS,它可以接受远程推送的shuffle block,它会将block合并到每一个唯一shuffle分区文件。
push merge shuffle采用的push-merge shuffle机制。Mapper生成的shuffle数据被推送到远程的magnet shuffle服务,并按照每个shuffle合并。Magnet在此期间可以将小的shuffle块的随机读取转换为MB大小的顺序读取。这个push操作与map任务完全解耦,所以无需添加到执行map任务的运行时中,一旦推送失败就会导致maptask失败。
尽可能地执行push,Magnet无需所有的shuffle都是完美的push成功。通过push-merge shuffle,Magnet复制shuffle数据,Reducer可以获取合并后的、或者是没有合并的shuffle数据作为任务输入。也就是,即使没有合并也可以读取。
那么Spark是如何选择Sort-based ShuffleWriter的具体实现方式呢?
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}
可以看出,根据条件的不同,会返回3种不同的handle,对应3种shuffle机制。从上到下来分析一下:
1. 检查是否符合SortShuffleWriter.shouldBypassMergeSort()方法的条件:
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}
判断是否符合bypassMergeSort的条件主要有以下两个:
spark.shuffle.sort.bypassMergeThreshold
规定的值(默认200)那么会返回BypassMergeSortShuffleHandle,启用bypass merge-sort shuffle机制。
2. 如果不启用上述bypass机制,那么继续检查是否符合canUseSerializedShuffle()方法的条件:
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(/*...*/)
false
} else if (dependency.aggregator.isDefined) {
log.debug(/*...*/)
false
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(/*...*/)
false
} else {
log.debug(/*...*/)
true
}
}
}
也就是说,如果同时满足以下三个条件:
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE
的值(最大分区ID号+1,即2^24=16777216)那么会返回SerializedShuffleHandle,启用序列化sort shuffle机制(也就是tungsten-sort)。
3. 如果既不用bypass也不用tungsten-sort,那么就返回默认的BaseShuffleHandle,采用基本的sort shuffle机制。
Untitled.png
目前spark中只有一种Shuffle的实现方式,即sort-Shuffle, 但是它包括了使用Tungsten实现的unsafeSortShuffle, 不需要排序的BypassMergeSortShuffle和baseSortShuffle, 并通过实现ShuffleWriter的方式根据不通过的情况进行选择,可以更好的处理不同情况的数据任务。
总结,Spark为不同情况实现不同有趣的Shuffle机制。Spark的shuffle是通过将中间文件物化到spark.local.dir的本地临时文件中,来增强spark的容错性,但是也造成了shuffle时的性能压力。在传统的Hash shuffle中,可以直接将map端的数据shuffle归类为对应的reduce 分区的数据,但是也造成了产生MxN(M是map Task, N 是Reduce Task)数量级的中间文件, 即使通过重用buffer, 将不同批次的Task的写入文件进行重用,减少了一定量的数据文件,但是并不能从根本上减少文件的数量级。采用Sort-based Shuffle 主要是使用在数据量比较大的情况下,通过将map端的数据进行排序,并生成文件索引,那么就可以通过读取文件的偏移量来区别不同的reduce应该拉取那部分的数据,产生的中间文件数据也变成了2 * M 个, 大大的减少了处理的文件数量。但是随着数据服务压力增加,大量的中间小文件会造成随机io, io的压力也会导致fetchfail的发生几率的上升,push-based shuffle 主要是将map端的数据push到共享ESS进行合并,进一步的减少小文件的数量,将随机io变为顺序io, 同时减少中间文件的数量,提升集群的稳定性。
今天就先到这里,通过上面的介绍,我们也留下些面试题?