前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >大数据 Shuffle 原理与实践 | 青训营笔记

大数据 Shuffle 原理与实践 | 青训营笔记

作者头像
鳄鱼儿
发布2024-05-21 16:59:21
1630
发布2024-05-21 16:59:21
举报
文章被收录于专栏:鳄鱼儿的技术分享

大数据 Shuffle 原理与实践

Shuffle概述

MapReduce

MapReduce是一个分布式运算程序的编程框架,是用户开发”基于hadoop的数据分析应用“的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上。

MapReduce存在MapShuffleReduce三个阶段

  • Map阶段,是在单机上进行的针对一小块数据的计算过程
  • Shuffle阶段,在map阶段的基础上,进行数据移动,为后续的reduce阶段做准备
  • Reduce过程,reduce阶段,对移动后的数据进行处理,依然是在单机上处理一小份数据
MapReduce优缺点

优点:

  • 易于编程:用户只关心业务逻辑,实现框架的接口
  • 良好扩展性:可以动态增加服务器,解决计算资源不够问题。
  • 高容错性:任何一台机器挂掉,可以将任务转移到其他节点。
  • 适合海量数据计算(TB、PB)几千台服务器共同计算。

缺点:

  • 不擅长实时计算:MR擅长处理分钟、小时级别任务
  • 不擅长流式计算:Sparkstreaming、flink(擅长流式计算)
  • 不擅长DAG有向无环图(spark擅长)

Shuffle算子

Shuffle算子分类

Spark中会产生shuffle的算子大概可以分为4类:

Spark中对shuffle的抽象
  • 窄依赖:父RDD的每个分片至多被子RDD中的一个分片所依赖
  • 宽依赖:父RDD中的分片可能被子RDD中的多个分片所依赖

AB是宽依赖,CDF是窄依赖。

算子内部的依赖关系

Shuffle Dependency构造 -Aggregator

  • createCombiner:只有一个value的时候初始化方法
  • mergeValue:合并一个value到Aggregator中
  • mergeCombiners:合并两个Aggregator

Shuffle过程

Shuffle实现的发展历程

  • Spark 0.8 及以前 Hash Based Shuffle
  • Spark 0.8.1 为 Hash Based Shuffle 引入 File Consolidation 机制
  • Spark 0.9 引入 ExternalAppendOnlyMap
  • Spark 1.1 引入 Sort Based Shuffle ,但默认仍为 Hash Based Shuffle
  • Spark 1.2 默认的 Shuffle 方式改为 Sort Based Shuffle
  • Spark 1.4 引入 Tungsten-Sort Based Shuffle
  • Spark 1.6 Tungsten-Sort Based Shuffle 并入 Sort Based Shuffle
  • Spark 2.0 Hash Based Shuffle 退出历史舞台

Hash Shuffle 写数据

每个partition会映射到一个独立的文件。

Sort shuffle 写数据

每个 task 生成一个包含所有 partition 数据的文件。

Shuffle 读数据

每个 reduce task 分别获取所有 map task 生成属于自己的片段。

Shuffle 过程的触发流程

代码语言:javascript
复制
val text = sc.textFile("mytextfile.txt")
val counts = text
  .flatMap(line => line.split(" "))
  .map(word => (word,1))
  .reduceByKey(_+_) 
counts. collect

Shuffle Handle 的创建

Register Shuffle时做的最重要的事情是根据不同条件创建不同的 shuffle Handle

Shuffle优化使用的技术

Zero Copy

DMA(Direct Memory Access) :直接存储器存取,是指外部设备不通过CPU而直接与系统内存交换数据的接口技术。

Netty Zero Copy

  • 可堆外内存,避免JVM堆内存到堆外内存的数据拷贝。
  • CompositeByteBuf、Unpooled.wrappedBuffer. ByteBuf.slice , 可以合并、包装、切分数组,避免发生内存拷贝。
  • Netty使用FileRegion实现文件传输FileRegion 底层封装了FileChannel#transferTo() 方法,可以将文件缓冲区的数据直接传输到目标Channel,避免内核缓冲区和用户态缓冲区之间的数据拷贝。

Shuffle优化

  • 避免shuffle
    • 使用 broadcast 替代 join
代码语言:javascript
复制
//传统的join操作会导致shuffle操作。
//因为两个RDD中,相同的key 都需要通过网络拉取到一个节点上, 由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
//Broadcas t +map的join操作,不会导致shuffle操作。
//使用Broadcast将一个 数据量较小的RDD作为厂播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
//在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2所有数据
//然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行
//此时就可以根据自已需要的方式,将rdd1 当前数据与rdd2中可以连接的数据,拼接在一起
val rdd3 = rdd1.map(rdd2DataBroadcast...)
//注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
//因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
  • 使用可以 map-side预聚合的算子
  • Shuffle 参数优化
    • spark.default.parallelism && spark.sql.shuffle.partitions
    • spark.hadoopRDD.ignoreEmptySplits
    • spark.hadoop.mapreduce.input.fileinputformat.split.minsize
    • spark.sql.file.maxPartitionBytes
    • spark.sql.adaptive.enabled &&
    • spark.sql.adaptive.shuffle.targetPostShuffleInputSize
    • spark.reducer.maxSizeInFlight
    • spark.reducer.maxReqsInFlight
    • spark.reducer.maxBlocksInFlightPerAddress

Shuffle 倾斜优化

绝大多数task执行得都非常快,但个别task执行极慢。这种情况称为数据倾斜。

数据倾斜的原因

关键词

情形

后果

Join

其中一个表较小,但是key集中

分发到某一个或几个Reduce上的数据远高于平均值

大表与大表,但是分桶的判断字段0值或空值过多

这些空值都由一个reduce处理,灰常慢

group by

group by 维度过小,某值的数量过多

处理某值的reduce灰常耗时

Count Distinct

某特殊值过多

处理此特殊值的reduce耗时

  • key分布不均匀
  • 业务数据本身的特性
  • 建表时考虑不周
  • 某些SQL语句本身就有数据倾斜
解决方法
  • 提高并行度
    • 优点:足够简单
    • 缺点:只缓解、不根治
  • Spark AQE Skew Join
    • AQE根据shuffle文件统计数据自动检测倾斜数据,将那些倾斜的分区打散成小的子分区,然后各自进行join。

Push Shuffle

为什么需要 Push Shuffle

  • Avg IO size太小,造成了大量的随机IO,严重影响磁盘的吞吐
  • M * R次读请求,造成大量的网络联届,影响稳定性

企业的Push Shuffle实现

Facebook :CoSco Linkedln : magnet Uber : Zeus Alibaba : RSS Tencent : FireStorm Bytedance : CSS Spark3.2 : push based shuffle

Magnet实现原理
  • Spark driver组件,协调整体的shuffle操作
  • map任务的shuffle writer过程完成后,增加了一个额外的操作Push-merge,将数据复制一份推到远程shuffle服务上
  • magnet shuffle service是一个强化版的ESS.将隶属于同一个shuffle partition的block,会在远程传输到magnet后被merge到一个文件中
  • reduce任务从magnet shuffle service接收合并好的shuffle数据
  • bitmap:存储已merge的mapper id,防止重复merge
  • position offset:如果本次block没有正常merge,可以恢复到上一个block的位置
  • currentMapId :标识当前正在append的block ,保证不同mapper的block能依次 append

Magnet可靠性

  • 如果Map task输出的Block没有成功Push到magnet上,并且反复重试仍然失败,则reduce task直接从ESS.上拉取原始block数据
  • 如果magnet上的block因为重复或者冲突等原因,没有正常完成merge的过程,则reduce task直接拉取未完成merge的block
  • 如果reduce拉取已经merge好的block失败,则会直接拉取merge前的原始block
  • 本质上,magnet中维护了两份shuffle数据的副本

Cloud Shuffle Service思想

Cloud Shuffle Service 架构

  • Zookeeper WorkerList [服务发现]
  • css Worker [Partitions / Disk I Hdfs]
  • Spark Driver [集成启动 CSS Master]
  • CSS Master [Shuffle 规划 / 统计]
  • CSS ShuffleClient [Write / Read]
  • Spark Executor [Mapper + Reducer]
Cloud Shuffle Service AQE

一个partition会最终对应到多个Epochfile,每个EPoch 目前设置时512MB

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2024-05-21,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 大数据 Shuffle 原理与实践
    • Shuffle概述
      • MapReduce
    • Shuffle算子
      • Shuffle算子分类
    • Shuffle过程
      • Shuffle实现的发展历程
      • Hash Shuffle 写数据
      • Sort shuffle 写数据
      • Shuffle 读数据
      • Shuffle 过程的触发流程
      • Shuffle Handle 的创建
      • Shuffle优化使用的技术
      • Shuffle优化
      • Shuffle 倾斜优化
    • Push Shuffle
      • 为什么需要 Push Shuffle
      • 企业的Push Shuffle实现
      • Magnet可靠性
      • Cloud Shuffle Service思想
      • Cloud Shuffle Service 架构
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档