从coalesce算子发散开的

coalesce算子,相当绕口的一个英文单词,来闭上眼睛回忆一下编程手册,咋说的来着?

coalesce(numPartitions):

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

翻译一下: 把一个RDD的分区数降低到指定的分区个数(即numPartitions个),主要用途是在大数据集的过滤后,使得后续操作更加搞笑,啊,不是,是更加高效。解释一下:原始数据集非常的大,所以我们需要把原始数据集切的很细(partition个数非常多),这样就可以充分利用spark的分布式、高并发的特性,来加快数据的过滤。巴特,过滤完之后,可能数据集就非常小了,比如把10亿过滤成了1000条,以前10亿需要1000个partition并行处理来满足时间要求,但是1000条数据如果还是用1000个partition,那意味着神马?意味着资源的极大浪费!因为rdd的partition是和task对应的,一个partition就会启动一个task进行处理,1000条数据,1000个partition,1000个task,基本上一个task处理一条数据,那就真的是有点搞笑了。虽然在一个executor内,使用线程池来减少启动task的开销,但是浪费依然是存在的,这个task占用资源期间,其他job的task就得等待了。所以这时就需要减少partition的数量了,于是coalesce应运而生。

好了,这时又需要大家再闭上眼睛回忆一下,网上咋评价这个算子来着?对,这个算子和repartition相比,某些情况下不会带来shuffle的开销。大家再跟着我脑补一下,假如有1000个partition,有100台服务器,那么最理想的分配方式就是平均分配,每台服务器处理10个partition,那么如果在调用coalesce时,传入的numPartitions是100,那么我直接把每个机器上的10个partition合并成一个,不就可以达到减少partition数但是又没有在服务器间传输大量数据(shuffle)的目的了吗?这个过程也和coalesce的中文释义吻合,即“合并”,而非repartition的“重分发”。那我们来看看coalesce神器是不是这么做的。首先来看主入口方法:

要进行分区合并,spark提供了一个叫DefaultPartitionCoalescer分区合并器的类来完成这个工作,分区合并器中一个重要的概念是PartitionGroup,即分区组,一个分区组就对应了一个把多个父rdd分区合并后的子rdd的分区,即CoalescedRDD的分区,但是为什么叫分区组而不直接就叫CoalescedRDD分区呢,那是因为PartitionGroup还是一个中间状态,无法表达一个真正的RDD的partition。

我们来倒着进行源码分析,看看getPartitions这个方法,这是一个接口方法,driver端在执行调度时,会调用这个方法,获取到一个rdd有哪些partition,然后进行task的分配(就是把task分配到哪些executor机器上去执行):

看了上图的执行分析,我们就会想知道父rdd的partition是怎么被分配到一个PartitionGroup中去的,那么就来看看coalesce方法吧:

在setupGroups方法中,就涉及到一个我们本篇文章要发散的一个点了,那就是preferedLocation的概念,先说location,其实location就是一个ip地址或者主机名,用来标识一个partition要被分配到哪个服务器去处理。如果一个partition相关的数据在A节点,但是被分配到B节点执行,那存在两种情况,如果是HDFS这种共享磁盘的文件系统,那么就需要HDFS底层来把A节点上的数据拉到B节点,这就带来了磁盘读(从A节点读)磁盘写(在B节点写)以及网络开销;另外一种情况就是本地文件系统,那么就直接会报错了,因为不可能在B节点找到A节点上存储的文件数据。但是如果能让task直接就在A节点去执行,就可以解决上边的两种问题了,这就是传说中的“让计算去靠近存储”,而调度系统要完成这样一个优化,就需要RDD告诉调度系统,一个分区的preferedLocation具体是什么。 好了,回到上边setupGroups方法中,如果父partition都没有这个preferedLocation,那么所有的父partition根据其在数组中的位置,会被平均的分配到一个PartitionGroup中,进行CoalescedRDD分区的构造。如果父partition有preferedLocation,那么就要进行额外的处理了,我们来看throwBalls方法:

好了,我们知道了CoalescedRDD是如何选择分区的最优执行节点了,那么这个获取最优分区的过程会在哪里被调用呢?我们思考一下,spark的进程主要分为driver和executor,executor只需要根据driver发送过来的信息就行执行就行了,driver端才需要知道这些全局相关的信息,所以自然,RDD的preferredLocations方法会在调度阶段被调用,好了就不贴代码了,快去DAGScheduler中印证一下吧。 那么我们顺着这个思路来看看还有哪些子RDD实现了这个getPreferredLocations方法呢?

HadoopRDD

HadoopPartition中存储了一个partition的三个副本都在哪些节点上,调度系统尅选择其中的一个进行任务的分发。

UnionRDD

Union是把多个RDD合并成一个,但是其中每个RDD在物理上海市完全独立的,所以UnionRDD的partition在进行选择preferedlocation的原则就是,父rdd选择的是哪个,unionrdd这个儿子的partition的preferedlocation就是哪个节点。

ShuffledRDD

shuffle过程中,中间结果会写入到本地的内存或者本地磁盘,所以这里一定要明确的告诉调度系统,地洞shuffleread时,需要到哪个节点上去读shufflewrite已经写好的中间结果。

结语

通过coalesce算子源码的阅读,我们可以了解到父RDD的parititon子RDD的partition的对应关系,有利于我们理解整个spark job的计算流程。通过coalesceRDD在对父RDD的多个partition进行打包的过程,我们看到了preferedlocation的使用,这可以让我们在整体上调度系统的执行流程有一个了解。

更多spark学习资源和经验分享,加入spark技术学院,BAT一线工程师为你答疑解惑:

原文发布于微信公众号 - Spark学习技巧(bigdatatip)

原文发表时间:2018-06-17

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏冷冷

基于Redis实现分布式应用限流

限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务。 前几天在DD的公众号,看了一篇关于使用 ...

4508
来自专栏Spark学习技巧

Spark Structured Streaming高级特性

一,事件时间窗口操作 使用Structured Streaming基于事件时间的滑动窗口的聚合操作是很简单的,很像分组聚合。在一个分组聚合操作中,聚合值被唯一保...

4316
来自专栏携程技术中心

高性能Key/Value存储引擎SessionDB

简介 随着公司业务量的逐年成长,粘性会话(Sticky Session)越来越成为应用横向扩展(Scale Out)的瓶颈,为消除粘性会话,支持应用无状态(St...

22210
来自专栏美图数据技术团队

Spark Streaming VS Flink

本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理...

711
来自专栏数据和云

内存为王:DBIM RAC Share Nothing架构的挑战和解决方案

陈焕生 Oracle Real-World Performance Group 成员,senior performance engineer,专注于 OLTP...

2575
来自专栏吉浦迅科技

DAY7:阅读 CUDA C编程接口之CUDA C runtime

1193
来自专栏廖可知的专栏

Redis 4.0 PSYNC2中second_replid_offset探究

Redis 4.0起引入了PSYNC2同步方式,分析源码时我们注意到,server数据中增加了replid2、second_replid_offset两个成员。...

1496
来自专栏java 成神之路

java 成神之路

2463
来自专栏程序你好

Apache Spark大数据处理 - 性能分析(实例)

今天的任务是将伦敦自行车租赁数据分为两组,周末和工作日。将数据分组到更小的子集进行进一步处理是一种常见的业务需求,我们将看到Spark如何帮助我们完成这项任务。

1053
来自专栏数据和云

Oracle构造序列的方法分析对比

编辑手记:关于Oracle的序列,相信大家并不陌生,但很多人平时只用到connect by 的方式来构造序列,今天一起来学习更多的构造序列的方法及每个方法的优缺...

3407

扫码关注云+社区