首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >CDA数据分析师学习之路第3期 | Spark RDD的转换操作举例

CDA数据分析师学习之路第3期 | Spark RDD的转换操作举例

作者头像
CDA数据分析师
发布2018-02-11 17:17:31
5450
发布2018-02-11 17:17:31
举报
文章被收录于专栏:CDA数据分析师CDA数据分析师

Spark RDD中的操作非常丰富,有80多种针对数据的操作。其中最重要的是Transformation(转换操作)和Action(执行操作)两类。其中转换操作采用了惰性策略,转换操作只生成元数据,相当于对业务逻辑的一种抽象描述,并不会真正执行,只有提交行动操作,这时候才开始从头到尾依次计算。

最常用的转换操作有map, mapPartitions, mapValues, flatMapValues, filter等。

1. map

针对RDD中的每个元素,经过指定的函数,转换成新的元素,进而得到新RDD

val a =sc.parallelize(1 to 9, 3)
val b = a.map(x=> x*2)
a.collect
b.collect
res10:Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
res11:Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD

通过toDebugString方法来查看RDD间的依赖关系和转换过程

2. mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区

val a =sc.parallelize(1 to 9, 3)
defmyfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
var res =List[(T, T)]()
var pre =iter.next
while(iter.hasNext) {
val cur =iter.next; res .::= (pre, cur) pre = cur; } res.iterator }
a.mapPartitions(myfunc).collect
res0:Array[(Int, Int)] = Array((1,2), (2,3), (4,5), (5,6) , (7,8), (8,9))

上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。

3. mapValues

mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。

val a =sc.parallelize(List("dog", "tiger", "lion","cat", "panther", " eagle"), 2)
val b = a.map(x=> (x.length, x))
b.mapValues("x"+ _ + "x").collect
res5:Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx),(7,xpantherx), (5,xeaglex))

4. flatMap

与map类似,区别是RDD中的元素经map处理后只能生成一个元素,而RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。

val a =sc.parallelize(1 to 4, 2)
val b =a.flatMap(x => 1 to x)
b.collect
res12:Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

5. flatMapValues

flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

val a =sc.parallelize(List((1,2),(3,4),(3,6)))
val b =a.flatMapValues(x=>x.to(5))
b.collect
res3:Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))

上述例子中原RDD中每个元素的值被转换为一个序列(从其当前值到5),比如第一个KV对(1,2), 其值2被转换为2,3,4,5。然后其再与原KV对中Key组成一系列新的KV对(1,2),(1,3),(1,4),(1,5)。

6. 代码演示reduceByKey

reduceByKey针对KV形式的RDD。顾名思义,他以Key作为元素的分组依据,然后对具有相同Key的Value进行相应的函数计算,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

val a =sc.parallelize(List((1,2),(3,4),(3,6)))
a.reduceByKey((x,y)=> x + y).collect
res7: Array[(Int,Int)] = Array((1,2), (3,10))

上述例子中,对Key相同的元素的值求和,因此Key为3的两个元素被转为了(3,10)。

7. 代码演示filter

对于RDD中的每一个元素,使用指定的函数进行计算,对于返回值为true的元素,筛选出来作为新RDD的元素

valrdd7=sc.makeRDD(1 to 10).filter(_%3==0)
rdd7.collect
res7:Array[(Int, Int)] = Array(3,6,9)

以上就是Spark RDD中比较常见的几个转换操作。Spark中的转换操作是用户基于已有RDD创建新的RDD的一种重要方式。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2016-02-23,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CDA数据分析师 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档