从coalesce算子发散开的

coalesce算子,相当绕口的一个英文单词,来闭上眼睛回忆一下编程手册,咋说的来着?

coalesce(numPartitions):

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

翻译一下: 把一个RDD的分区数降低到指定的分区个数(即numPartitions个),主要用途是在大数据集的过滤后,使得后续操作更加搞笑,啊,不是,是更加高效。解释一下:原始数据集非常的大,所以我们需要把原始数据集切的很细(partition个数非常多),这样就可以充分利用spark的分布式、高并发的特性,来加快数据的过滤。巴特,过滤完之后,可能数据集就非常小了,比如把10亿过滤成了1000条,以前10亿需要1000个partition并行处理来满足时间要求,但是1000条数据如果还是用1000个partition,那意味着神马?意味着资源的极大浪费!因为rdd的partition是和task对应的,一个partition就会启动一个task进行处理,1000条数据,1000个partition,1000个task,基本上一个task处理一条数据,那就真的是有点搞笑了。虽然在一个executor内,使用线程池来减少启动task的开销,但是浪费依然是存在的,这个task占用资源期间,其他job的task就得等待了。所以这时就需要减少partition的数量了,于是coalesce应运而生。

好了,这时又需要大家再闭上眼睛回忆一下,网上咋评价这个算子来着?对,这个算子和repartition相比,某些情况下不会带来shuffle的开销。大家再跟着我脑补一下,假如有1000个partition,有100台服务器,那么最理想的分配方式就是平均分配,每台服务器处理10个partition,那么如果在调用coalesce时,传入的numPartitions是100,那么我直接把每个机器上的10个partition合并成一个,不就可以达到减少partition数但是又没有在服务器间传输大量数据(shuffle)的目的了吗?这个过程也和coalesce的中文释义吻合,即“合并”,而非repartition的“重分发”。那我们来看看coalesce神器是不是这么做的。首先来看主入口方法:

要进行分区合并,spark提供了一个叫DefaultPartitionCoalescer分区合并器的类来完成这个工作,分区合并器中一个重要的概念是PartitionGroup,即分区组,一个分区组就对应了一个把多个父rdd分区合并后的子rdd的分区,即CoalescedRDD的分区,但是为什么叫分区组而不直接就叫CoalescedRDD分区呢,那是因为PartitionGroup还是一个中间状态,无法表达一个真正的RDD的partition。

我们来倒着进行源码分析,看看getPartitions这个方法,这是一个接口方法,driver端在执行调度时,会调用这个方法,获取到一个rdd有哪些partition,然后进行task的分配(就是把task分配到哪些executor机器上去执行):

看了上图的执行分析,我们就会想知道父rdd的partition是怎么被分配到一个PartitionGroup中去的,那么就来看看coalesce方法吧:

在setupGroups方法中,就涉及到一个我们本篇文章要发散的一个点了,那就是preferedLocation的概念,先说location,其实location就是一个ip地址或者主机名,用来标识一个partition要被分配到哪个服务器去处理。如果一个partition相关的数据在A节点,但是被分配到B节点执行,那存在两种情况,如果是HDFS这种共享磁盘的文件系统,那么就需要HDFS底层来把A节点上的数据拉到B节点,这就带来了磁盘读(从A节点读)磁盘写(在B节点写)以及网络开销;另外一种情况就是本地文件系统,那么就直接会报错了,因为不可能在B节点找到A节点上存储的文件数据。但是如果能让task直接就在A节点去执行,就可以解决上边的两种问题了,这就是传说中的“让计算去靠近存储”,而调度系统要完成这样一个优化,就需要RDD告诉调度系统,一个分区的preferedLocation具体是什么。 好了,回到上边setupGroups方法中,如果父partition都没有这个preferedLocation,那么所有的父partition根据其在数组中的位置,会被平均的分配到一个PartitionGroup中,进行CoalescedRDD分区的构造。如果父partition有preferedLocation,那么就要进行额外的处理了,我们来看throwBalls方法:

好了,我们知道了CoalescedRDD是如何选择分区的最优执行节点了,那么这个获取最优分区的过程会在哪里被调用呢?我们思考一下,spark的进程主要分为driver和executor,executor只需要根据driver发送过来的信息就行执行就行了,driver端才需要知道这些全局相关的信息,所以自然,RDD的preferredLocations方法会在调度阶段被调用,好了就不贴代码了,快去DAGScheduler中印证一下吧。 那么我们顺着这个思路来看看还有哪些子RDD实现了这个getPreferredLocations方法呢?

HadoopRDD

HadoopPartition中存储了一个partition的三个副本都在哪些节点上,调度系统尅选择其中的一个进行任务的分发。

UnionRDD

Union是把多个RDD合并成一个,但是其中每个RDD在物理上海市完全独立的,所以UnionRDD的partition在进行选择preferedlocation的原则就是,父rdd选择的是哪个,unionrdd这个儿子的partition的preferedlocation就是哪个节点。

ShuffledRDD

shuffle过程中,中间结果会写入到本地的内存或者本地磁盘,所以这里一定要明确的告诉调度系统,地洞shuffleread时,需要到哪个节点上去读shufflewrite已经写好的中间结果。

结语

通过coalesce算子源码的阅读,我们可以了解到父RDD的parititon子RDD的partition的对应关系,有利于我们理解整个spark job的计算流程。通过coalesceRDD在对父RDD的多个partition进行打包的过程,我们看到了preferedlocation的使用,这可以让我们在整体上调度系统的执行流程有一个了解。

更多spark学习资源和经验分享,加入spark技术学院,BAT一线工程师为你答疑解惑:

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-06-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏点滴积累

geotrellis使用(三十九)COG 写入更新

前言 前面介绍过了如何在 ETL 的时候更新 Layer,使得能够在大数据量的时候完成 ETL 操作,同时前两篇文章也介绍了 COG 以及如何在 Geotrel...

40312
来自专栏喔家ArchiSelf

MCU上的代码执行时间

在许多实时应用程序中,二八原则并不生效,CPU 可以花费95%(或更多)的时间在不到5% 的代码上。电动机控制、引擎控制、无线通信以及其他许多对时间敏感的应用程...

852
来自专栏Java成长之路

java中的异步处理和Feature接口(一)

想象这样一个场景:你可能希望为你的法国客户提供指定主题的热点报道。为实现这一功能,你需要向 谷歌或者Twitter的API请求所有语言中针对该主题最热门的评论,...

1932
来自专栏安恒网络空间安全讲武堂

翻译 | python利用shodan搜集信息

文中提及的部分技术、工具可能带有一定的攻击性、仅供安全学习和教学用途,禁止非法使用! 安装 为了开始使用Shodan的Python库,首先要确保你已经收到了AP...

43410
来自专栏一个技术人的金融之路

简讲LSM树(Log-Structured Merge Tree)

前言:最近在了解大数据实时分析技术druid,究其原理时发现用到了类LSM树思想以实现高效的数据插入,于是展开了对LSM的了解,了解之后感觉这东西虽然也并没有很...

4727
来自专栏owent

对atbus的小数据包的优化

atbus是我按之前的思路写得服务器消息通信中间件,目标是简化服务器通信的流程,能够自动选择最优路线,自动的断线重连和通信通道维护。能够跨平台并且高效。

952
来自专栏H2Cloud

Future Pattern

Started: 俗话说一年之计在于春,一天之计在于晨,当我起床的时候,看见表正指向九点钟,十一点下班,十点上班,这是我现在的工作节奏。来北京马上就一个月了,近...

3535
来自专栏大数据智能实战

Spark Hbase读取操作的一些总结与测试

Spark连接HBase实现查询的操作有好多种步骤,其中常用的是直接调用Hbase本身提供的写入和读出的接口。 然而不少人在此基础上进行了各种封装,有的支持sp...

2487
来自专栏吉浦迅科技

DAY40:阅读Memory Fence Functions

The CUDA programming model assumes a device with a weakly-ordered memory model, ...

744
来自专栏大数据学习笔记

Hadoop基础教程-第14章 大数据面试笔试题汇总(持续更新)

第14章 大数据面试笔试题汇总(持续更新) 注意:大部分题目来自互联网,部分题目来自同事口述 14.1 Zookeeper (1)Zookeeper是什...

3716

扫码关注云+社区