前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Spark的常用算子大总结

Spark的常用算子大总结

作者头像
Maynor
发布于 2021-12-07 01:25:58
发布于 2021-12-07 01:25:58
46200
代码可运行
举报
运行总次数:0
代码可运行

文章目录

Transformation算子Value类型

1 、map(func)案例

作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 2. 需求:创建一个1-10数组的RDD,将所有元素2形成新的RDD (1)创建 scala> var source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :24 (2)打印 scala> source.collect() res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) (3)将所有元素2 scala> val mapadd = source.map(_ * 2) mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at :26 (4)打印最终结果 scala> mapadd.collect() res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

2、mapPartitions(func) 案例

1.作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func 的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。 2.需求:创建一个RDD,使每个元素*2组成新的RDD

(1)创建一个RDD scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :24 (2)使每个元素*2组成新的RDD scala> rdd.mapPartitions(x=>x.map(_*2)) res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at :27 (3)打印新的RDD scala> res3.collect res4: Array[Int] = Array(2, 4, 6, 8)

3、mapPartitionsWithIndex(func) 案例

1.作用:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD 上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]; 2.需求:创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD (1)创建一个RDD scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :24 (2)使每个元素跟所在分区形成一个元组组成一个新的RDD scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))) indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at :26 (3)打 印 新 的 RDD scala> indexRdd.collect res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))

4、map()和mapPartition()的区别

1.map():每次处理一条数据。 2.mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能 释放,可能导致OOM。 3.开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。

5、flatMap(func) 案例

1.作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个 序列,而不是单一元素) 2.需求:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD,新的RDD为原RDD的每个元 素的扩展(1->1,2->1,2……5->1,2,3,4,5) (1)创建 scala> val sourceFlat = sc.parallelize(1 to 5) sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at :24 (2)打印

scala> sourceFlat.collect() res11: Array[Int] = Array(1, 2, 3, 4, 5) (3)根据原RDD创建新RDD(1->1,2->1,2……5->1,2,3,4,5) scala> val flatMap = sourceFlat.flatMap(1 to _) flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at :26 (4)打 印 新 RDD scala> flatMap.collect() res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

6、sortBy(func,[ascending], [numTasks]) 案例

1.作用;使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。 2.需求:创建一个RDD,按照不同的规则进行排序 (1)创建一个RDD scala> val rdd = sc.parallelize(List(2,1,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at :24 (2)按照自身大小排序 scala> rdd.sortBy(x => x).collect() res11: Array[Int] = Array(1, 2, 3, 4) (3)按照与3余数的大小排序 scala> rdd.sortBy(x => x%3).collect() res12: Array[Int] = Array(3, 4, 1, 2)

7、 groupBy(func)案例

1.作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。 2.需求:创建一个RDD,按照元素模以2的值进行分组。 (1)创建 scala> val rdd = sc.parallelize(1 to 4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at :24 (2)按照元素模以2的值进行分组 scala> val group = rdd.groupBy(_%2) group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at :26 (3)打印结果 scala> group.collect res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

8、filter(func) 案例

1.作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。 2.需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串) (1)创建 scala> var sourceFilter = sc.parallelize(Array(“xiaoming”,“xiaojiang”,“xiaohe”,“dazhi”)) sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at :24 (2)打印 scala> sourceFilter.collect() res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi) (3)过滤出含” xiao”子串的形成一个新的RDD scala> val filter = sourceFilter.filter(_.contains(“xiao”)) filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at :26 (4)打 印 新 RDD scala> filter.collect() res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

9、sample(withReplacement, fraction, seed) 案例

1.作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据 是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。 2.需求:创建一个RDD(1-10),从中选择放回和不放回抽样 (1)创建RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at :24 (2)打印 scala> rdd.collect() res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) (3)放回抽样 scala> var sample1 = rdd.sample(true,0.4,2) sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at :26 (4)打印放回抽样结果 scala> sample1.collect() res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9) (5)不放回抽样 scala> var sample2 = rdd.sample(false,0.2,3) sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at :26 (6)打印不放回抽样结果

scala> sample2.collect() res17: Array[Int] = Array(1, 9)

10、stinct([numTasks])) 案例

1.作用:对源RDD进行去重后返回一个新的RDD。 2.需求:创建一个RDD,使用distinct()对其去重。 (1)创建一个RDD scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1)) distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at :24 (2)对RDD进行去重 scala> val unionRDD = distinctRdd.distinct() unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at :26 (3)打印去重后生成的新RDD scala> unionRDD.collect() res20: Array[Int] = Array(1, 9, 5, 6, 2) (4)对RDD scala> val unionRDD = distinctRdd.distinct(2) unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at :26 (5)打印去重后生成的新RDD scala> unionRDD.collect() res21: Array[Int] = Array(6, 2, 1, 9, 5)

11、coalesce(numPartitions) 案例

1.作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。 2.需求:创建一个4个分区的RDD,对其缩减分区 (1)创建一个RDD scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at :24 (2)查看RDD的分区数 scala> rdd.partitions.size res20: Int = 4 (3)对RDD重新分区 scala> val coalesceRDD = rdd.coalesce(3) coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at :26 (4)查看新RDD的分区数 scala> coalesceRDD.partitions.size

res21: Int = 3

12、repartition(numPartitions) 案例

1.作用:根据分区数,重新通过网络随机洗牌所有数据。 2.需求:创建一个4个分区的RDD,对其重新分区 (1)创建一个RDD scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at :24 (2)查看RDD的分区数scala> rdd.partitions.size res22: Int = 4 (3)对RDD重新分区 scala> val rerdd = rdd.repartition(2) rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at :26 (4)查看新RDD的分区数 scala> rerdd.partitions.size res23: Int = 2 coalesce和repartition的区别 1.coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。 2.repartition实际上是调用的coalesce,进行shuffle。源码如下:

Action算子

1、 reduce(func)案例

1.作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。 2.需求:创建一个RDD,将所有元素聚合得到结果 (1)创建一个RDD[Int] scala> val rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at :24 (2)聚合RDD[Int]所有元素 scala> rdd1.reduce(+) res50: Int = 55 (3)创建一个RDD[String] scala> val rdd2 = sc.makeRDD(Array((“a”,1),(“a”,3),(“c”,3),(“d”,5))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at :24

(4)聚合RDD[String]所有数据 scala> rdd2.reduce((x,y)=>(x.1 + y.1,x.2 + y.2)) res51: (String, Int) = (adca,12)

2、countByKey()案例

1.作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 2.需求:创建一个PairRDD,统计每种key的个数 (1)创建一个PairRDD scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at :24 (2)统计每种key的个数 scala> rdd.countByKey res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

3、foreach(func)案例

1.作用:在数据集的每一个元素上,运行函数func进行更新。 2.需求:创建一个RDD,对每个元素进行打印 (1)创建一个RDD scala> var rdd = sc.makeRDD(1 to 5,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at :24 (2)对该RDD每个元素进行打印

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
scala> rdd.foreach(println(_)) 3
4
5
1
2

4、 sortByKey([ascending], [numTasks]) 案例

1.作用:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的 RDD 2.需求:创建一个pairRDD,按照key的正序和倒序进行排序 (1)创建一个pairRDD scala> val rdd = sc.parallelize(Array((3,“aa”),(6,“cc”),(2,“bb”),(1,“dd”))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at :24 (2)按照key的正序 scala> rdd.sortByKey(true).collect() res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))

(3)按照key的倒序 scala> rdd.sortByKey(false).collect() res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))

5、collect()案例

1.作用:在驱动程序中,以数组的形式返回数据集的所有元素。 2.需求:创建一个RDD,并将RDD内容收集到Driver端打印 (1)创建一个RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 (2)将结果收集到Driver端 scala> rdd.collect res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

6、 count()案例

1.作用:返回RDD中元素的个数 2.需求:创建一个RDD,统计该RDD的条数 (1)创建一个RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 (2)统计该RDD的条数 scala> rdd.count res1: Long = 10

7、 first()案例

1.作用:返回RDD中的第一个元素 2.需求:创建一个RDD,返回该RDD中的第一个元素 (1)创建一个RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 (2)统计该RDD的条数 scala> rdd.first res2: Int = 1

8、 take(n)案例

1.作用:返回一个由RDD的前n个元素组成的数组 2.需求:创建一个RDD,统计该RDD的条数 (1)创建一个RDD scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :24 (2)统计该RDD的条数 scala> rdd.take(3) res10: Array[Int] = Array(2, 5, 4)

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021/09/10 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Spark之【RDD编程】详细讲解(No2)——《Transformation转换算子》
本篇博客是Spark之【RDD编程】系列第二篇,为大家带来的是RDD的转换的内容。
大数据梦想家
2021/01/27
2K0
Spark之【RDD编程】详细讲解(No2)——《Transformation转换算子》
Spark Core快速入门系列(3) | <Transformation>转换算子
  从一个已知的 RDD 中创建出来一个新的 RDD 例如: map就是一个transformation.
不温卜火
2020/10/28
1.9K0
Spark Core快速入门系列(3) | <Transformation>转换算子
Spark Core快速入门系列(4) | <Action> 行动算子转换算子
  返回一个由RDD的前n个元素组成的数组   take 的数据也会拉到 driver 端, 应该只对小数据集使用
不温卜火
2020/10/28
5040
Spark之【RDD编程】详细讲解(No3)——《Action行动算子》
本篇博客是Spark之【RDD编程】系列第三篇,为大家带来的是Action的内容。
大数据梦想家
2021/01/27
3300
Spark之【RDD编程】详细讲解(No3)——《Action行动算子》
大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Cor
  我们需要一个效率非常快,且能够支持迭代计算和有效数据共享的模型,Spark 应运而生。RDD 是基于工作集的工作模式,更多的是面向工作流。   但是无论是 MR 还是 RDD 都应该具有类似位置感知、容错和负载均衡等特性。
黑泽君
2019/05/10
2.5K0
大数据技术之_19_Spark学习_02_Spark Core 应用解析+ RDD 概念 + RDD 编程 + 键值对 RDD + 数据读取与保存主要方式 + RDD 编程进阶 + Spark Cor
Spark Core入门2【RDD的实质与RDD编程API】
所以RDD不过是对一个函数的封装,当一个函数对数据处理完成后,我们就得到一个RDD的数据集(是一个虚拟的,后续会解释)。
Java架构师必看
2021/05/14
1.1K0
Spark Core入门2【RDD的实质与RDD编程API】
[大数据之Spark]——Transformations转换入门经典实例
Spark相比于Mapreduce的一大优势就是提供了很多的方法,可以直接使用;另一个优势就是执行速度快,这要得益于DAG的调度,想要理解这个调度规则,还要理解函数之间的依赖关系。 本篇就着重描述
用户1154259
2018/01/17
1.1K0
[大数据之Spark]——Transformations转换入门经典实例
Spark常用的算子以及Scala函数总结
上海站 | 高性能计算之GPU CUDA培训 4月13-15日 三天密集式学习 快速带你晋级 阅读全文 > 正文共11264个字,7张图,预计阅读时间28分钟。 Spark与Scala 首先,介绍一
用户1332428
2018/04/17
1.9K0
Spark常用的算子以及Scala函数总结
Spark RDD篇
RDD是一个抽象,会记录一些信息,他并不是一个真正的集合,但可以像集合一样操作,降低了开发难度。
算法之名
2019/08/20
8910
Spark Core 学习笔记
1:Spark Core:内核,也是Spark中最重要的部分,相当于Mapreduce                 SparkCore 和 Mapreduce都是进行离线数据分析                 SparkCore的核心:RDD(弹性分布式数据集),由分区组成 2:Spark Sql:相当于Hive                 支持Sql和DSL语句 -》Spark任务(RDD)-》运行
曼路
2018/10/18
2.2K0
Spark算子总结
由于计算过程是在内存进行,然后spill出来,每到达一个checkpoint就会将内存中的数据写入到磁盘,这个功能就是手动使其到达checkpoint
vincentbbli
2021/08/18
8940
Spark的RDDs相关内容
通常使用parallelize()函数可以创建一个简单的RDD,测试用(为了方便观察结果)。
ZONGLYN
2019/08/08
5650
Spark的RDDs相关内容
Spark Core快速入门系列(2) | Spark Core中编程模型的理解与RDD的创建
  在 Spark 中,RDD 被表示为对象,通过对象上的方法调用来对 RDD 进行转换。   经过一系列的transformations定义 RDD 之后,就可以调用 actions 触发 RDD 的计算   action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。   在Spark中,只有遇到action,才会执行 RDD 的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。   要使用 Spark,开发者需要编写一个 Driver 程序,它被提交到集群以调度运行 Worker   Driver 中定义了一个或多个 RDD,并调用 RDD 上的 action,Worker 则执行 RDD 分区计算任务。
不温卜火
2020/10/28
6690
Spark Core快速入门系列(2) | Spark Core中编程模型的理解与RDD的创建
Spark之【RDD编程】详细讲解(No6)——《RDD缓存与CheckPoint》
本篇博客是Spark之【RDD编程】系列第六篇,为大家介绍的是RDD缓存与CheckPoint。
大数据梦想家
2021/01/27
7120
Spark之【RDD编程】详细讲解(No6)——《RDD缓存与CheckPoint》
RDD 编程
spark 遇到 RDD action 时才会真正的开始执行,遇到转换的时候,只是记录下来,并不真正执行
Michael阿明
2021/09/06
4580
[大数据之Spark]——Actions算子操作入门实例
Actions reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. 这个方法会传入两个参数,计算这两个参数返回一个结果。
用户1154259
2018/01/17
7160
Spark2.x学习笔记:3、 Spark核心概念RDD
程裕强
2018/01/02
1.4K0
Spark2.x学习笔记:3、 Spark核心概念RDD
Spark之【RDD编程】详细讲解(No1)——《编程模型的理解与RDD的创建》
上一篇博客《什么是RDD?带你快速了解Spark中RDD的概念!》为大家带来了RDD的概述之后。本篇博客,博主将继续前进,为大家带来RDD编程系列。 该系列第一篇,为大家带来的是编程模
大数据梦想家
2021/01/27
6300
Spark之【RDD编程】详细讲解(No1)——《编程模型的理解与RDD的创建》
RDD操作—— 行动(Action)操作
行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
羊羽shine
2019/08/20
1.5K0
Spark函数讲解: combineByKey
1、背景 在数据分析中,处理Key,Value的Pair数据是极为常见的场景,例如我们可以针对这样的数据进行分组、聚合或者将两个包含Pair数据的RDD根据key进行join。从函数的抽象层面看,这些操作具有共同的特征,都是将类型为RDD[(K,V)]的数据处理为RDD[(K,C)]。这里的V和C可以是相同类型,也可以是不同类型。这种数据处理操作并非单纯的对Pair的value进行map,而是针对不同的key值对原有的value进行联合(Combine)。因而,不仅类型可能不同,元素个数也可能不同。 com
用户1177713
2018/02/24
3.4K0
推荐阅读
相关推荐
Spark之【RDD编程】详细讲解(No2)——《Transformation转换算子》
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
查看详情【社区公告】 技术创作特训营有奖征文
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验