首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021年大数据Spark(十六):Spark Core的RDD算子练习


RDD算子练习

    RDD中的函数有很多,不同业务需求使用不同函数进行数据处理分析,下面仅仅展示出比较常用的函数使用,更多函数在实际中使用体会,多加练习理解。

map 算子

对RDD中的每一个元素进行操作并返回操作的结果。

代码语言:javascript
复制
//通过并行化生成rdd

val rdd1 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))  

//对rdd1里的每一个元素

rdd1.map(_ * 2).collect  //collect方法表示收集,是action操作

//res4: Array[Int] = Array(10, 12, 8, 14, 6, 16, 4, 18, 2, 20)

filter 算子

函数中返回True的被留下,返回False的被过滤掉。

代码语言:javascript
复制
val rdd2 = sc.parallelize(List(5, 6, 4, 7, 3, 8, 2, 9, 1, 10))

val rdd3 = rdd2.filter(_ >= 10) //大于等于10的留下

rdd3.collect //10

flatMap 算子

对RDD中的每一个元素进行先map再压扁,最后返回操作的结果。

对RDD中的每一个元素进行先map再压扁,最后返回操作的结果

代码语言:javascript
复制
val rdd1 = sc.parallelize(Array("a b c", "d e f", "h i j"))

//将rdd1里面的每一个元素先切分再压平

val rdd2 = rdd1.flatMap(_.split(' '))//_是每一个元素,如其中一个:"a b c"   

rdd2.collect

//Array[String] = Array(a, b, c, d, e, f, h, i, j)

​​​​​​​交集、并集、差集、笛卡尔积

类似Scala集合类Set中相关函数,注意类型要一致。

注意类型要一致

代码语言:javascript
复制
val rdd1 = sc.parallelize(List(5, 6, 4, 3))

val rdd2 = sc.parallelize(List(1, 2, 3, 4))

//union并集不会去重

val rdd3 = rdd1.union(rdd2) 

rdd3.collect//Array[Int] = Array(5, 6, 4, 3, 1, 2, 3, 4)

//去重

rdd3.distinct.collect

//求交集

val rdd4 = rdd1.intersection(rdd2)

rdd4.collect

//求差集

val rdd5 = rdd1.subtract(rdd2)

rdd5.collect

//笛卡尔积

val rdd1 = sc.parallelize(List("jack", "tom"))//学生

val rdd2 = sc.parallelize(List("java", "python", "scala"))//课程

val rdd3 = rdd1.cartesian(rdd2)

//可以表示所有学生的所有可能的选课情况

rdd3.collect//Array((jack,java), (jack,python), (jack,scala), (tom,java), (tom,python), (tom,scala))

​​​​​​​distinct 算子

对RDD中元素进行去重,与Scala集合中distinct类似。

代码语言:javascript
复制
val rdd = sc.parallelize(Array(1,2,3,4,5,5,6,7,8,1,2,3,4), 3)

rdd.distinct.collect

​​​​​​​​​​​​​​first、take、top 算子

从RDD中获取某些元素,比如first为第一个元素,take为前N个元素,top为最大的N个元素。

代码语言:javascript
复制
val rdd1 = sc.parallelize(List(3,6,1,2,4,5))

rdd1.top(2)// 6 5

//按照原来的顺序取前N个

rdd1.take(2) //3 6

//按照原来的顺序取前第一个

rdd1.first

​​​​​​​​​​​​​​keys、values 算子

针对RDD中数据类型为KeyValue对时,获取所有key和value的值,类似Scala中Map集合。

代码语言:javascript
复制
val rdd1 = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)

val rdd2 = rdd1.map(x => (x.length, x))

rdd2.collect

//Array[(Int, String)] = Array((3,dog), (5,tiger), (4,lion), (3,cat), (7,panther), (5,eagle))

rdd2.keys.collect

//Array[Int] = Array(3, 5, 4, 3, 7, 5)

rdd2.values.collect

//Array[String] = Array(dog, tiger, lion, cat, panther, eagle)   

​​​​​​​mapValues 算子

代码语言:javascript
复制
mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后。

mapValues表示对RDD中的元素进行操作,Key不变,Value变为操作之后

val rdd1 = sc.parallelize(List((1,10),(2,20),(3,30)))

val rdd2 = rdd1.mapValues(_*2).collect //_表示每一个value ,key不变,将函数作用于value

// Array[(Int, Int)] = Array((1,20), (2,40), (3,60))

​​​​​​​collectAsMap 算子

当RDD中数据类型为Key/Value对时,转换为Map集合。

代码语言:javascript
复制
val rdd = sc.parallelize(List(("a", 1), ("b", 2)))

rdd.collectAsMap

//scala.collection.Map[String,Int] = Map(b -> 2, a -> 1)/Map((b ,2), (a , 1)) //Scala中Map底层就是多个二元组

​​​​​​​mapPartitionsWithIndex 算子

取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的。

功能:取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的

代码语言:javascript
复制
val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)

//该函数的功能是将对应分区中的数据取出来,并且带上分区编号

val func = (index: Int, iter: Iterator[Int]) => {

  iter.map(x => "[partID:" +  index + ", val: " + x + "]")

}



rdd1.mapPartitionsWithIndex(func).collect



//Array[String] = Array(

//[partID:0, val: 1], [partID:0, val: 2], [partID:0, val: 3],

//[partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6],

//[partID:2, val: 7], [partID:2, val: 8], [partID:2, val: 9]
下一篇
举报
领券