Spark的常用算子大总结

文章目录
  • 🥧 Transformation算子Value类型 🥧
    • 1 、map(func)案例
    • 2、mapPartitions(func) 案例
    • 3、mapPartitionsWithIndex(func) 案例
    • 4、map()和mapPartition()的区别
    • 5、flatMap(func) 案例
    • 6、sortBy(func,[ascending], [numTasks]) 案例
    • 7、 groupBy(func)案例
    • 8、filter(func) 案例
    • 9、sample(withReplacement, fraction, seed) 案例
    • 10、stinct([numTasks])) 案例
    • 11、coalesce(numPartitions) 案例
    • 12、repartition(numPartitions) 案例
  • 🍑 Action算子 🍑
    • 1、 reduce(func)案例
    • 2、countByKey()案例
    • 3、foreach(func)案例
    • 4、 sortByKey([ascending], [numTasks]) 案例
    • 5、collect()案例
    • 6、 count()案例
    • 7、 first()案例
    • 8、 take(n)案例

🥧 Transformation算子Value类型 🥧

1 、map(func)案例

作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 2. 需求:创建一个1-10数组的RDD,将所有元素2形成新的RDD (1)创建 scala> var source = sc.parallelize(1 to 10) source: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at :24 (2)打印 scala> source.collect() res7: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) (3)将所有元素2 scala> val mapadd = source.map(_ * 2) mapadd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at :26 (4)打印最终结果 scala> mapadd.collect() res8: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18, 20)

2、mapPartitions(func) 案例

1.作用:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func 的函数类型必须是Iterator[T] => Iterator[U]。假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。 2.需求:创建一个RDD,使每个元素*2组成新的RDD

(1)创建一个RDD scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :24 (2)使每个元素*2组成新的RDD scala> rdd.mapPartitions(x=>x.map(_*2)) res3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at mapPartitions at :27 (3)打印新的RDD scala> res3.collect res4: Array[Int] = Array(2, 4, 6, 8)

3、mapPartitionsWithIndex(func) 案例

1.作用:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD 上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]; 2.需求:创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD (1)创建一个RDD scala> val rdd = sc.parallelize(Array(1,2,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at :24 (2)使每个元素跟所在分区形成一个元组组成一个新的RDD scala> val indexRdd = rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))) indexRdd: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[5] at mapPartitionsWithIndex at :26 (3)打 印 新 的 RDD scala> indexRdd.collect res2: Array[(Int, Int)] = Array((0,1), (0,2), (1,3), (1,4))

4、map()和mapPartition()的区别

1.map():每次处理一条数据。 2.mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能 释放,可能导致OOM。 3.开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。

5、flatMap(func) 案例

1.作用:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个 序列,而不是单一元素) 2.需求:创建一个元素为1-5的RDD,运用flatMap创建一个新的RDD,新的RDD为原RDD的每个元 素的扩展(1->1,2->1,2……5->1,2,3,4,5) (1)创建 scala> val sourceFlat = sc.parallelize(1 to 5) sourceFlat: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at :24 (2)打印

scala> sourceFlat.collect() res11: Array[Int] = Array(1, 2, 3, 4, 5) (3)根据原RDD创建新RDD(1->1,2->1,2……5->1,2,3,4,5) scala> val flatMap = sourceFlat.flatMap(1 to _) flatMap: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[13] at flatMap at :26 (4)打 印 新 RDD scala> flatMap.collect() res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5)

6、sortBy(func,[ascending], [numTasks]) 案例

1.作用;使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。 2.需求:创建一个RDD,按照不同的规则进行排序 (1)创建一个RDD scala> val rdd = sc.parallelize(List(2,1,3,4)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at :24 (2)按照自身大小排序 scala> rdd.sortBy(x => x).collect() res11: Array[Int] = Array(1, 2, 3, 4) (3)按照与3余数的大小排序 scala> rdd.sortBy(x => x%3).collect() res12: Array[Int] = Array(3, 4, 1, 2)

7、 groupBy(func)案例

1.作用:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。 2.需求:创建一个RDD,按照元素模以2的值进行分组。 (1)创建 scala> val rdd = sc.parallelize(1 to 4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[65] at parallelize at :24 (2)按照元素模以2的值进行分组 scala> val group = rdd.groupBy(_%2) group: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at :26 (3)打印结果 scala> group.collect res0: Array[(Int, Iterable[Int])] = Array((0,CompactBuffer(2, 4)), (1,CompactBuffer(1, 3)))

8、filter(func) 案例

1.作用:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。 2.需求:创建一个RDD(由字符串组成),过滤出一个新RDD(包含”xiao”子串) (1)创建 scala> var sourceFilter = sc.parallelize(Array(“xiaoming”,“xiaojiang”,“xiaohe”,“dazhi”)) sourceFilter: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at :24 (2)打印 scala> sourceFilter.collect() res9: Array[String] = Array(xiaoming, xiaojiang, xiaohe, dazhi) (3)过滤出含” xiao”子串的形成一个新的RDD scala> val filter = sourceFilter.filter(_.contains(“xiao”)) filter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at filter at :26 (4)打 印 新 RDD scala> filter.collect() res10: Array[String] = Array(xiaoming, xiaojiang, xiaohe)

9、sample(withReplacement, fraction, seed) 案例

1.作用:以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据 是否放回,true为有放回的抽样,false为无放回的抽样,seed用于指定随机数生成器种子。 2.需求:创建一个RDD(1-10),从中选择放回和不放回抽样 (1)创建RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at :24 (2)打印 scala> rdd.collect() res15: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) (3)放回抽样 scala> var sample1 = rdd.sample(true,0.4,2) sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at :26 (4)打印放回抽样结果 scala> sample1.collect() res16: Array[Int] = Array(1, 2, 2, 7, 7, 8, 9) (5)不放回抽样 scala> var sample2 = rdd.sample(false,0.2,3) sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at :26 (6)打印不放回抽样结果

scala> sample2.collect() res17: Array[Int] = Array(1, 9)

10、stinct([numTasks])) 案例

1.作用:对源RDD进行去重后返回一个新的RDD。 2.需求:创建一个RDD,使用distinct()对其去重。 (1)创建一个RDD scala> val distinctRdd = sc.parallelize(List(1,2,1,5,2,9,6,1)) distinctRdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[34] at parallelize at :24 (2)对RDD进行去重 scala> val unionRDD = distinctRdd.distinct() unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[37] at distinct at :26 (3)打印去重后生成的新RDD scala> unionRDD.collect() res20: Array[Int] = Array(1, 9, 5, 6, 2) (4)对RDD scala> val unionRDD = distinctRdd.distinct(2) unionRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[40] at distinct at :26 (5)打印去重后生成的新RDD scala> unionRDD.collect() res21: Array[Int] = Array(6, 2, 1, 9, 5)

11、coalesce(numPartitions) 案例

1.作用:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。 2.需求:创建一个4个分区的RDD,对其缩减分区 (1)创建一个RDD scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[54] at parallelize at :24 (2)查看RDD的分区数 scala> rdd.partitions.size res20: Int = 4 (3)对RDD重新分区 scala> val coalesceRDD = rdd.coalesce(3) coalesceRDD: org.apache.spark.rdd.RDD[Int] = CoalescedRDD[55] at coalesce at :26 (4)查看新RDD的分区数 scala> coalesceRDD.partitions.size

res21: Int = 3

12、repartition(numPartitions) 案例

1.作用:根据分区数,重新通过网络随机洗牌所有数据。 2.需求:创建一个4个分区的RDD,对其重新分区 (1)创建一个RDD scala> val rdd = sc.parallelize(1 to 16,4) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[56] at parallelize at :24 (2)查看RDD的分区数scala> rdd.partitions.size res22: Int = 4 (3)对RDD重新分区 scala> val rerdd = rdd.repartition(2) rerdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[60] at repartition at :26 (4)查看新RDD的分区数 scala> rerdd.partitions.size res23: Int = 2 coalesce和repartition的区别 1.coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。 2.repartition实际上是调用的coalesce,进行shuffle。源码如下:

🍑 Action算子 🍑

1、 reduce(func)案例

1.作用:通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。 2.需求:创建一个RDD,将所有元素聚合得到结果 (1)创建一个RDD[Int] scala> val rdd1 = sc.makeRDD(1 to 10,2) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at :24 (2)聚合RDD[Int]所有元素 scala> rdd1.reduce(+) res50: Int = 55 (3)创建一个RDD[String] scala> val rdd2 = sc.makeRDD(Array((“a”,1),(“a”,3),(“c”,3),(“d”,5))) rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at :24

(4)聚合RDD[String]所有数据 scala> rdd2.reduce((x,y)=>(x.1 + y.1,x.2 + y.2)) res51: (String, Int) = (adca,12)

2、countByKey()案例

1.作用:针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 2.需求:创建一个PairRDD,统计每种key的个数 (1)创建一个PairRDD scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3) rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at :24 (2)统计每种key的个数 scala> rdd.countByKey res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

3、foreach(func)案例

1.作用:在数据集的每一个元素上,运行函数func进行更新。 2.需求:创建一个RDD,对每个元素进行打印 (1)创建一个RDD scala> var rdd = sc.makeRDD(1 to 5,2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at :24 (2)对该RDD每个元素进行打印

scala> rdd.foreach(println(_)) 3
4
5
1
2

4、 sortByKey([ascending], [numTasks]) 案例

1.作用:在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的 RDD 2.需求:创建一个pairRDD,按照key的正序和倒序进行排序 (1)创建一个pairRDD scala> val rdd = sc.parallelize(Array((3,“aa”),(6,“cc”),(2,“bb”),(1,“dd”))) rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at :24 (2)按照key的正序 scala> rdd.sortByKey(true).collect() res9: Array[(Int, String)] = Array((1,dd), (2,bb), (3,aa), (6,cc))

(3)按照key的倒序 scala> rdd.sortByKey(false).collect() res10: Array[(Int, String)] = Array((6,cc), (3,aa), (2,bb), (1,dd))

5、collect()案例

1.作用:在驱动程序中,以数组的形式返回数据集的所有元素。 2.需求:创建一个RDD,并将RDD内容收集到Driver端打印 (1)创建一个RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 (2)将结果收集到Driver端 scala> rdd.collect res0: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

6、 count()案例

1.作用:返回RDD中元素的个数 2.需求:创建一个RDD,统计该RDD的条数 (1)创建一个RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 (2)统计该RDD的条数 scala> rdd.count res1: Long = 10

7、 first()案例

1.作用:返回RDD中的第一个元素 2.需求:创建一个RDD,返回该RDD中的第一个元素 (1)创建一个RDD scala> val rdd = sc.parallelize(1 to 10) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at :24 (2)统计该RDD的条数 scala> rdd.first res2: Int = 1

8、 take(n)案例

1.作用:返回一个由RDD的前n个元素组成的数组 2.需求:创建一个RDD,统计该RDD的条数 (1)创建一个RDD scala> val rdd = sc.parallelize(Array(2,5,4,6,8,3))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at :24 (2)统计该RDD的条数 scala> rdd.take(3) res10: Array[Int] = Array(2, 5, 4)

本文参与 腾讯云自媒体分享计划 ,欢迎热爱写作的你一起参与!
本文分享自作者个人站点/博客:https://blog.csdn.net/xianyu120复制
如有侵权,请联系 cloudcommunity@tencent.com 删除。
登录 后参与评论
0 条评论

相关文章

  • Spark的常用算子大总结

    作用:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 2. 需求:创建一个1-10数组的RDD,将所有元素2形成新的RDD (1)创...

    ChinaManor
  • Spark常用的算子以及Scala函数总结

    上海站 | 高性能计算之GPU CUDA培训 4月13-15日 ? 三天密集式学习 快速带你晋级 阅读全文 > 正文共11264个字,7张图,预计阅读时间28...

    用户1332428
  • Spark常用的算子以及Scala函数总结

    首先,介绍一下scala语言: Scala 是一种把面向对象和函数式编程理念加入到静态类型语言中的混血儿。

    流川枫
  • Spark算子总结

    由于计算过程是在内存进行,然后spill出来,每到达一个checkpoint就会将内存中的数据写入到磁盘,这个功能就是手动使其到达checkpoint

    vincentbbli
  • Spark常用Actions算子

    介绍以下Actions算子: foreach foreachPatition reduce collect count first take t...

    CoderJed
  • 2021年大数据Spark(十五):Spark Core的RDD常用算子

    RDD中包含很多函数,主要可以分为两类:Transformation转换函数和Action函数。

    Lanson
  • Spark常用Transformations算子(二)

    介绍以下Transformations算子: aggregateByKey join cogroup cartesian pipe repartit...

    CoderJed
  • Spark常用Transformations算子(一)

    介绍以下Transformations算子: map flatMap mapPartitions mapPartitionsWithIndex fil...

    CoderJed
  • 2021年大数据Spark(十六):Spark Core的RDD算子练习

        RDD中的函数有很多,不同业务需求使用不同函数进行数据处理分析,下面仅仅展示出比较常用的函数使用,更多函数在实际中使用体会,多加练习理解。

    Lanson
  • Spark性能调优九之常用算子调优

            前面介绍了很多关于Spark性能的调优手段,今天来介绍一下Spark性能调优的最后一个点,就是关于Spark中常用算子的调优。废话不多说,直接进...

    z小赵
  • Spark系列课程-0030Spark 简单的算子

    我们这节课开始讲Spark的算子, 我们说transformation类算子是转换算子 我们称Action类算子是行动算子 ? image.png 我们回顾...

    Albert陈凯
  • 2021年大数据Flink(三十九):​​​​​​​Table与SQL ​​​​​​总结 Flink-SQL常用算子

    SELECT 用于从 DataSet/DataStream 中选择数据,用于筛选出某些列。

    Lanson
  • Hadoop与Spark常用配置参数总结

    背景 MapReduce和Spark对外提供了上百个配置参数,用户可以为作业定制这些参数以更快,更稳定的运行应用程序。本文梳理了最常用的一些MapReduce和...

    小小科
  • python常用函数大总结

       repr(obj) 得到obj的表示字符串,可以利用这个字符串eval重建该对象的一个拷贝

    用户4962466
  • spark RDD算子(九)之基本的Action操作

    各元素在 RDD 中出现的次数 返回{(key1,次数),(key2,次数),…(keyn,次数)} scala

    天涯泪小武
  • 2021年大数据Spark(九):Spark On Yarn两种模式总结

    包含两个部分:应用管理者AppMaster和运行应用进程Process(如MapReduce程序MapTask和ReduceTask任务),如下图所示:

    Lanson
  • 5 大常用机器学习模型类型总结

    本文介绍了 5 大常用机器学习模型类型:集合学习算法,解释型算法,聚类算法,降维算法,相似性算法,并简要介绍了每种类型中最广泛使用的算法模型。我们希望本文可以做...

    张俊红
  • Spark中普通集合与RDD算子的sortBy()有什么区别

    如图所示,我这调用的sortby()是List集合的方法了,不是算子,所以不能加false参数指定降序排,只能默认的升序排了,但是用reverse()反转就...

    孙晨c
  • 统计学 常用的数据分析方法大总结!

    描述统计是通过图表或数学方法,对数据资料进行整理、分析,并对数据的分布状态、数字特征和随机变量之间关系进行估计和描述的方法。描述统计分为集中趋势分析和离中趋势分...

    石晓文

扫码关注腾讯云开发者

领取腾讯云代金券