CDA数据分析师学习之路第3期 | Spark RDD的转换操作举例

Spark RDD中的操作非常丰富,有80多种针对数据的操作。其中最重要的是Transformation(转换操作)和Action(执行操作)两类。其中转换操作采用了惰性策略,转换操作只生成元数据,相当于对业务逻辑的一种抽象描述,并不会真正执行,只有提交行动操作,这时候才开始从头到尾依次计算。

最常用的转换操作有map, mapPartitions, mapValues, flatMapValues, filter等。

1. map

针对RDD中的每个元素,经过指定的函数,转换成新的元素,进而得到新RDD

val a =sc.parallelize(1 to 9, 3)
val b = a.map(x=> x*2)
a.collect
b.collect
res10:Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
res11:Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD

通过toDebugString方法来查看RDD间的依赖关系和转换过程

2. mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区

val a =sc.parallelize(1 to 9, 3)
defmyfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res =List[(T, T)]()
var pre =iter.next
while(iter.hasNext) {
val cur =iter.next; res .::= (pre, cur) pre = cur; } res.iterator }
a.mapPartitions(myfunc).collect
res0:Array[(Int, Int)] = Array((1,2), (2,3), (4,5), (5,6) , (7,8), (8,9))

上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。

3. mapValues

mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。

val a =sc.parallelize(List("dog", "tiger", "lion","cat", "panther", " eagle"), 2)
val b = a.map(x=> (x.length, x))
b.mapValues("x"+ _ + "x").collect
res5:Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx),(7,xpantherx), (5,xeaglex))

4. flatMap

与map类似,区别是RDD中的元素经map处理后只能生成一个元素,而RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。

val a =sc.parallelize(1 to 4, 2)
val b =a.flatMap(x => 1 to x)
b.collect
res12:Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

5. flatMapValues

flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

val a =sc.parallelize(List((1,2),(3,4),(3,6)))
val b =a.flatMapValues(x=>x.to(5))
b.collect
res3:Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))

上述例子中原RDD中每个元素的值被转换为一个序列(从其当前值到5),比如第一个KV对(1,2), 其值2被转换为2,3,4,5。然后其再与原KV对中Key组成一系列新的KV对(1,2),(1,3),(1,4),(1,5)。

6. 代码演示reduceByKey

reduceByKey针对KV形式的RDD。顾名思义,他以Key作为元素的分组依据,然后对具有相同Key的Value进行相应的函数计算,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

val a =sc.parallelize(List((1,2),(3,4),(3,6)))
a.reduceByKey((x,y)=> x + y).collect
res7: Array[(Int,Int)] = Array((1,2), (3,10))

上述例子中,对Key相同的元素的值求和,因此Key为3的两个元素被转为了(3,10)。

7. 代码演示filter

对于RDD中的每一个元素,使用指定的函数进行计算,对于返回值为true的元素,筛选出来作为新RDD的元素

valrdd7=sc.makeRDD(1 to 10).filter(_%3==0)
rdd7.collect
res7:Array[(Int, Int)] = Array(3,6,9)

以上就是Spark RDD中比较常见的几个转换操作。Spark中的转换操作是用户基于已有RDD创建新的RDD的一种重要方式。

原文发布于微信公众号 - CDA数据分析师(cdacdacda)

原文发表时间:2016-02-23

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Spark学习技巧

一文详解scala泛型及类型限定

今天知识星球球友,微信问浪尖了一个spark源码阅读中的类型限定问题。这个在spark源码很多处出现,所以今天浪尖就整理一下scala类型限定的内容。希望对大家...

2012
来自专栏待你如初见

对象序列化与反序列化

921
来自专栏Jaycekon

Java 序列化与反序列化

1、什么是序列化?为什么要序列化?     Java 序列化就是指将对象转换为字节序列的过程,而反序列化则是只将字节序列转换成目标对象的过程。     我们都知...

4527
来自专栏LeetCode

LeetCode 191. Number of 1 Bits

Write a function that takes an unsigned integer and returns the number of '1' bi...

760
来自专栏积累沉淀

Spark 算子

RDD算子分类,大致可以分为两类,即: Transformation:转换算子,这类转换并不触发提交作业,完成作业中间过程处理。 Action:行动算子,这类算...

2535
来自专栏彭湖湾的编程世界

【javascript】详解javaScript的深拷贝

前言: 最开始意识到深拷贝的重要性是在我使用redux的时候(react + redux), redux的机制要求在reducer中必须返回一个新的对象,而不能...

1886
来自专栏nothing

在 C++ 中,你甚至可以 b()()()(); ——介绍 ref qualifier

在传统 C++ 中,成员函数通过 this 指针访问。可以处理所有需要左值的情况。

1552
来自专栏mukekeheart的iOS之旅

No.015 3Sum

15. 3Sum Total Accepted: 131800 Total Submissions: 675028 Difficulty: Medium   G...

2438
来自专栏前端知识分享

第67天:面向对象的声明、封装

793
来自专栏静默虚空的博客

Java 序列化

被序列化的类必须属于 Enum、Array 和 Serializable 类型其中的任何一种。

1140

扫码关注云+社区

领取腾讯云代金券