Spark源码系列之foreach和foreachPartition的区别

一,基本使用

1,RDD分布式数据集的五大特性

1),A list of partitions(一系列的分区)

2),A function for computing each split(计算每个分片的方法)

3),A list of dependencies on other RDDs(一系列的依赖RDD)

4),Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)

(可选,对于key-value类型的RDD都会有一个分区器)

5),Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)(可选,最佳位置)

2,RDD的操作类型:

Transformations:转换操作,lazy型,不会触发计算

Action:触发job

Persist:缓存,也不会触发job,在第一次触发job之后才会真正进行缓存。

3,RDD的计算

RDD的计算实际上我们可以分为两个大部分:

1),Driver端的计算

主要是stage划分,task的封装,task调度执行

2),Executor端的计算

真正的计算开始,默认情况下每个cpu运行一个task。一个task实际上就是一个分区,我们的方法无论是转换算子里封装的,还是action算子里封装的都是此时在一个task里面计算一个分区的数据。

下面就那这两个例子,开始讲解吧,针对转换类型的操作可以类比查看。

jsonRDD.foreach(each=>{
 //连接数据库
  //插入数据库
  //关闭数据库连接
})

jsonRDD.foreachPartition(partition=>{
 //此处连接上数据库
 partition.foreach(each=>
 //插入数据
 })
 //关闭数据库连接
})

这两个算子里面,上述我说的”我们的方法是”,每个算子圆括号内部的所有内容。

二,源码相关

1,第一次封装

/**
 * Applies a function f to all elements of this RDD.
 */
def foreach(f: T => Unit): Unit = withScope {
 val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
/**
 * Applies a function f to each partition of this RDD.
 */
def foreachPartition(f: Iterator[T] => Unit): Unit = withScope {
 val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
}

可以看到方法通过clean操作(清理闭包,为序列化和网络传输做准备),进行了一层匿名函数的封装,

针对foreach方法,是我们的方法被传入了迭代器的foreach(每个元素遍历执行一次函数),

而对于foreachpartiton方法是迭代器被传入了我们的方法(每个分区执行一次函数,我们获取迭代器后需要自行进行迭代处理,也即上述第二个demo的partition.foreach)。

2,第二次封装

这次很统一就在

/**
 * Run a job on a given set of partitions of an RDD, but take a function of type
 * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
 */
def runJob[T, U: ClassTag](
    rdd: RDD[T],
 func: Iterator[T] => U,
 partitions: Seq[Int]): Array[U] = {
 val cleanedFunc = clean(func)
  runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}

就是讲上述进一步封装的方法进一步按照匿名函数封装

(ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it)

3,执行的时候

Spark的Task类型我们用到的也就两个

1),ShuffleMapTask

2),ResultTask

Action算子的方法执行是在ResultTask中执行的,也即ResultTask的runTask方法。

首先反序列化得到我们的方法(2步骤封装的)和RDD,然后执行。传入的是迭代器

val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
  ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))

三,总结

RDD.foreach(foreachFunction)

RDD.foreachPatition(foreachPartitionFunction)

经过第二步的分析我们可以理解,展开之后实际上就是

RDD的每个分区的iterator(集合):

iterator.foreach(foreachFunction)

foreachPartitionFunction(iterator)

这就很明显了,假如我们的Function中有数据库,网络TCP等IO链接,文件流等等的创建关闭操作,采用foreachPatition方法,针对每个分区集合进行计算,更能提高我们的性能。

本文分享自微信公众号 - Spark学习技巧(bigdatatip)

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2017-07-21

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spark生态圈

[Spark SQL] 源码解析之Analyzer

Analyzer模块将Unresolved LogicalPlan结合元数据catalog进行绑定,最终转化为Resolved LogicalPlan。跟着代码...

15320
来自专栏岑玉海

Spark源码系列(九)Spark SQL初体验之解析过程详解

好久没更新博客了,之前学了一些R语言和机器学习的内容,做了一些笔记,之后也会放到博客上面来给大家共享。一个月前就打算更新Spark Sql的内容了,因为一些别的...

54150
来自专栏技术小黑屋

单例这种设计模式

随着我们编写代码的深入,我们或多或少都会接触到设计模式,其中单例(Singleton)模式应该是我们耳熟能详的一种模式。本文将比较特别的介绍一下Java设计模式...

13530
来自专栏Hongten

spring开发_使用p名称空间配置属性

http://www.cnblogs.com/hongten/gallery/image/112563.html

13120
来自专栏coolblog.xyz技术专栏

MyBatis 源码分析 - SQL 的执行过程

本篇文章较为详细的介绍了 MyBatis 执行 SQL 的过程。该过程本身比较复杂,牵涉到的技术点比较多。包括但不限于 Mapper 接口代理类的生成、接口方法...

1K20
来自专栏大数据智能实战

Spark 2.0 DataFrame map操作中Unable to find encoder for type stored in a Dataset.问题的分析与解决

随着新版本的spark已经逐渐稳定,最近拟将原有框架升级到spark 2.0。还是比较兴奋的,特别是SQL的速度真的快了许多。。 然而,在其中一个操作时却卡住了...

54690
来自专栏行者悟空

java实现Kafka生产者Producer工具类

96960
来自专栏扎心了老铁

java使用spark/spark-sql处理schema数据

1、spark是什么? Spark是基于内存计算的大数据并行计算框架。 1.1 Spark基于内存计算 相比于MapReduce基于IO计算,提高了在大数据环境...

37350
来自专栏码字搬砖

java如何指定外部的配置文件

工作当中很多时候都希望可以把配置文件外放,这样的话就可以做到配置与业务分离,其实有很多种放式,比如xml,properties,这里就说一下如何用propert...

53720
来自专栏听雨堂

从MapX到MapXtreme2004[10]-根据zoom值修改显示范围

        原来在Mapx中只需要修改zoom值即可,现在也是一样。虽然map对象有setview方法,但似乎不太好用,因为需要coordsys。     ...

19470

扫码关注云+社区

领取腾讯云代金券