大数据 Shuffle 原理与实践
Shuffle概述
MapReduce
MapReduce是一个分布式运算程序的编程框架,是用户开发”基于hadoop的数据分析应用“的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。
MapReduce存在Map、Shuffle、Reduce三个阶段
- Map阶段,是在单机上进行的针对一小块数据的计算过程
- Shuffle阶段,在map阶段的基础上,进行数据移动,为后续的reduce阶段做准备
- Reduce过程,reduce阶段,对移动后的数据进行处理,依然是在单机上处理一小份数据
MapReduce优缺点
优点:
- 易于编程:用户只关心业务逻辑,实现框架的接口
- 良好扩展性:可以动态增加服务器,解决计算资源不够问题。
- 高容错性:任何一台机器挂掉,可以将任务转移到其他节点。
- 适合海量数据计算(TB、PB)几千台服务器共同计算。
缺点:
- 不擅长实时计算:MR擅长处理分钟、小时级别任务
- 不擅长流式计算:Sparkstreaming、flink(擅长流式计算)
- 不擅长DAG有向无环图(spark擅长)
Shuffle算子
Shuffle算子分类
Spark中会产生shuffle的算子大概可以分为4类:
Spark中对shuffle的抽象
- 窄依赖:父RDD的每个分片至多被子RDD中的一个分片所依赖
- 宽依赖:父RDD中的分片可能被子RDD中的多个分片所依赖
AB是宽依赖,CDF是窄依赖。
算子内部的依赖关系
Shuffle Dependency构造 -Aggregator
- createCombiner:只有一个value的时候初始化方法
- mergeValue:合并一个value到Aggregator中
- mergeCombiners:合并两个Aggregator
Shuffle过程
Shuffle实现的发展历程
- Spark 0.8 及以前 Hash Based Shuffle
- Spark 0.8.1 为 Hash Based Shuffle 引入 File Consolidation 机制
- Spark 0.9 引入 ExternalAppendOnlyMap
- Spark 1.1 引入 Sort Based Shuffle ,但默认仍为 Hash Based Shuffle
- Spark 1.2 默认的 Shuffle 方式改为 Sort Based Shuffle
- Spark 1.4 引入 Tungsten-Sort Based Shuffle
- Spark 1.6 Tungsten-Sort Based Shuffle 并入 Sort Based Shuffle
- Spark 2.0 Hash Based Shuffle 退出历史舞台
Hash Shuffle 写数据
每个partition会映射到一个独立的文件。
Sort shuffle 写数据
每个 task 生成一个包含所有 partition 数据的文件。
Shuffle 读数据
每个 reduce task 分别获取所有 map task 生成属于自己的片段。
Shuffle 过程的触发流程
val text = sc.textFile("mytextfile.txt")
val counts = text
.flatMap(line => line.split(" "))
.map(word => (word,1))
.reduceByKey(_+_)
counts. collect
Shuffle Handle 的创建
Register Shuffle时做的最重要的事情是根据不同条件创建不同的 shuffle Handle
Shuffle优化使用的技术
Zero Copy
DMA(Direct Memory Access) :直接存储器存取,是指外部设备不通过CPU而直接与系统内存交换数据的接口技术。
Netty Zero Copy
- 可堆外内存,避免JVM堆内存到堆外内存的数据拷贝。
- CompositeByteBuf、Unpooled.wrappedBuffer. ByteBuf.slice , 可以合并、包装、切分数组,避免发生内存拷贝。
- Netty使用FileRegion实现文件传输FileRegion 底层封装了FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝。
Shuffle优化
//传统的join操作会导致shuffle操作。
//因为两个RDD中,相同的key 都需要通过网络拉取到一个节点上, 由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
//Broadcas t +map的join操作,不会导致shuffle操作。
//使用Broadcast将一个 数据量较小的RDD作为厂播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
//在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2所有数据
//然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行
//此时就可以根据自已需要的方式,将rdd1 当前数据与rdd2中可以连接的数据,拼接在一起
val rdd3 = rdd1.map(rdd2DataBroadcast...)
//注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
//因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
- 使用可以 map-side预聚合的算子
- Shuffle 参数优化
- spark.default.parallelism && spark.sql.shuffle.partitions
- spark.hadoopRDD.ignoreEmptySplits
- spark.hadoop.mapreduce.input.fileinputformat.split.minsize
- spark.sql.file.maxPartitionBytes
- spark.sql.adaptive.enabled &&
- spark.sql.adaptive.shuffle.targetPostShuffleInputSize
- spark.reducer.maxSizeInFlight
- spark.reducer.maxReqsInFlight
- spark.reducer.maxBlocksInFlightPerAddress
Shuffle 倾斜优化
绝大多数task执行得都非常快,但个别task执行极慢。这种情况称为数据倾斜。
数据倾斜的原因
| | |
---|
| | 分发到某一个或几个Reduce上的数据远高于平均值 |
| | |
| | |
| | |
- key分布不均匀
- 业务数据本身的特性
- 建表时考虑不周
- 某些SQL语句本身就有数据倾斜
解决方法
- 提高并行度
- Spark AQE Skew Join
- AQE根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后各自进行join。
Push Shuffle
为什么需要 Push Shuffle
- Avg IO size太小,造成了大量的随机IO,严重影响磁盘的吞吐
- M * R次读请求,造成大量的网络联届,影响稳定性
企业的Push Shuffle实现
Facebook :CoSco Linkedln : magnet Uber : Zeus Alibaba : RSS Tencent : FireStorm Bytedance : CSS Spark3.2 : push based shuffle
Magnet实现原理
- Spark driver组件,协调整体的shuffle操作
- map任务的shuffle writer过程完成后,增加了一个额外的操作Push-merge,将数据复制一份推到远程shuffle服务上
- magnet shuffle service是一个强化版的ESS.将隶属于同一个shuffle partition的block,会在远程传输到magnet后被merge到一个文件中
- reduce任务从magnet shuffle service接收合并好的shuffle数据
- bitmap:存储已merge的mapper id,防止重复merge
- position offset:如果本次block没有正常merge,可以恢复到上一个block的位置
- currentMapId :标识当前正在append的block ,保证不同mapper的block能依次 append
Magnet可靠性
- 如果Map task输出的Block没有成功Push到magnet上,并且反复重试仍然失败,则reduce task直接从ESS.上拉取原始block数据
- 如果magnet上的block因为重复或者冲突等原因,没有正常完成merge的过程,则reduce task直接拉取未完成merge的block
- 如果reduce拉取已经merge好的block失败,则会直接拉取merge前的原始block
- 本质上,magnet中维护了两份shuffle数据的副本
Cloud Shuffle Service思想
Cloud Shuffle Service 架构
- Zookeeper WorkerList [服务发现]
- css Worker [Partitions / Disk I Hdfs]
- Spark Driver [集成启动 CSS Master]
- CSS Master [Shuffle 规划 / 统计]
- CSS ShuffleClient [Write / Read]
- Spark Executor [Mapper + Reducer]
Cloud Shuffle Service AQE
一个partition会最终对应到多个Epochfile,每个EPoch 目前设置时512MB