Spark常用的算子以及Scala函数总结

Spark与Scala

首先,介绍一下scala语言: Scala 是一种把面向对象和函数式编程理念加入到静态类型语言中的混血儿。

为什么学scala? spark提供了R、Python等语言的接口,为什么还要重新学一门新的语言呢? 1、spark本身就是用scala写的,采用与底层框架相同的语言有很多好处,例如以后你要看源码...... 2、性能开销小,scala可以直接编译运行在java的JVM上 3、能用上最新的版本。一般新版本都是最先支持scala,虽然现在python的接口也在不断的丰富 4、到了工作岗位,你的师父(都是有几年相关经验的),前期由于python的支持还没有像scala那样完善,因此会从scala开始使用spark的,你不学scala还让你师父转python啊!

新手学习Spark编程,在熟悉了Scala语言的基础上,首先需要对以下常用的Spark算子或者Scala函数比较熟悉,才能开始动手写能解决实际业务的代码。

简单来说,Spark 算子大致可以分为以下两类:

  1. Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。 Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。
  2. Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。 Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。

从小方向来说,Spark 算子大致可以分为以下三类:

  1. Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
  2. Key-Value数据类型的Transfromation算子,这种变换并不触发提交 作业,针对处理的数据项是Key-Value型的数据对。
  3. Action算子,这类算子会触发SparkContext提交Job作业

下面是我以前总结的一些常用的Spark算子以及Scala函数:

  1. map():将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素。
  2. mapPartitions(function) :map()的输入函数是应用于RDD中每个元素,而mapPartitions()的输入函数是应用于每个分区。
  3. mapValues(function) :�该操作只会��改动value
  4. flatMap(function) :并将生成的 RDD 的每个集合中的元素合并为一个集合
  5. flatMapValues(function):通过上面的例子可知,该操作也是只操作value,不改变key。
  6. reduceByKey(func,numPartitions:用于对每个key对应的多个value进行merge操作
  7. groupByKey(numPartitions):将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。
  8. sortByKey(accending,numPartitions)
  9. cogroup(otherDataSet,numPartitions)
  10. join(otherDataSet,numPartitions):找出左右相同同的记录
  11. LeftOutJoin(otherDataSet,numPartitions):以左边表为准,逐条去右边表找相同字段,如果有多条会依次列出
  12. RightOutJoin(otherDataSet, numPartitions)
  13. lookup():查询指定的key,u返回其对应的value。
  14. filter(): filter 函数功能是对元素进行过滤,对每个 元 素 应 用 f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将被过滤掉。
  15. full outer join()包括两个表的join结果,左边在右边中没找到的结果(NULL),右边在左边没找到的结果,FULL OUTER JOIN 关键字结合了 LEFT JOIN 和 RIGHT JOIN 的结果。
  16. collect():函数可以提取出所有rdd里的数据项:RDD——>数组(collect用于将一个RDD转换成数组。)
  17. reduce():根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。
  18. count():返回RDD内元素的个数
  19. first():返回RDD内的第一个元素,first相当于top(1)
  20. top:top可返回最大的k个元素。
  21. case:匹配,更多用于 PartialFunction(偏函数)中 {case …}
  22. saveAsTextFile:函数将数据输出,存储到 HDFS 的指定目录
  23. cache : cache 将 RDD 元素从磁盘缓存到内存,内部默认会调用persist(StorageLevel.MEMORY_ONLY),也就是说它无法自定义缓存级别的。
  24. persist():与cache一样都是将一个RDD进行缓存,在之后的使用过程汇总不需要重新的计算了。它比cache灵活,可以通过自定义 StorageLevel类型参数,来定义缓存的级别。
  25. coalesce():对RDD的分区进行�在分区,(用于分区数据分布不均匀的情况,利用HashPartitioner函数将数据重新分区)
  26. reparation:与coalesce功能一样,它只是coalesce中shuffle设置为true的简易实现。(数据不经过shuffle是无法将RDD的分区变多的)
  27. distinct():  distinct将RDD中的元素进行去重操作
  28. subtract():  subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。

基于SparkShell的交互式编程

1、map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

val a = sc.parallelize(1 to 9, 3)  
# x =>*2是一个函数,x是传入参数即RDD的每个元素,x*2是返回值
val b = a.map(x => x*2)
a.collect  
# 结果Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)  
b.collect  
# 结果Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)  

list/key--->key-value

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x => (x, 1))
b.collect.foreach(println(_))

# /*
# (dog,1)
# (tiger,1)
# (lion,1)
# (cat,1)
# (panther,1)
# ( eagle,1)
# */

val l=sc.parallelize(List((1,'a'),(2,'b')))
var ll=l.map(x=>(x._1,"PV:"+x._2)).collect()
ll.foreach(println)
# (1,PVa)
# (2,PVb)
=================================================================
2、mapPartitions(function) 
map()的输入函数是应用于RDD中每个元素,而mapPartitions()的输入函数是应用于每个分区

package test
import scala.Iterator
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object TestRdd {
  def sumOfEveryPartition(input: Iterator[Int]): Int = {
    var total = 0
    input.foreach { elem =>
      total += elem
    }
    total
  }
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Rdd Test")
    val spark = new SparkContext(conf)
    val input = spark.parallelize(List(1, 2, 3, 4, 5, 6), 2)//RDD有6个元素,分成2个partition
    val result = input.mapPartitions(
      partition => Iterator(sumOfEveryPartition(partition)))//partition是传入的参数,是个list,要求返回也是list,即Iterator(sumOfEveryPartition(partition))
    result.collect().foreach {
      println(_)
      # 6 15,分区计算和
    }
    spark.stop()
  }
}
=================================================================
3、mapValues(function) 
原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD

val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
val b = a.map(x => (x.length, x))
b.mapValues("x" + _ + "x").collect

# //结果 
# Array( 
# (3,xdogx), 
# (5,xtigerx), 
# (4,xlionx), 
# (3,xcatx), 
# (7,xpantherx), 
# (5,xeaglex) 
# )

# val grouped = mds.groupBy(md => md.matched)
# grouped.mapValues(x => x.size).foreach(println)



=================================================================
4、flatMap(function) 
与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素

val a = sc.parallelize(1 to 4, 2)
val b = a.flatMap(x => 1 to x)//每个元素扩展
b.collect
/*
结果    Array[Int] = Array( 1, 
                           1, 2, 
                           1, 2, 3, 
                           1, 2, 3, 4)
*/
===============================================
5、flatMapValues(function)
val a = sc.parallelize(List((1,2),(3,4),(5,6)))
val b = a.flatMapValues(x=>1 to x)
b.collect.foreach(println(_))
/*结果
(1,1)
(1,2)
(3,1)
(3,2)
(3,3)
(3,4)
(5,1)
(5,2)
(5,3)
(5,4)
(5,5)
(5,6)
*/

val list = List(("mobin",22),("kpop",20),("lufei",23))
val rdd = sc.parallelize(list)
val mapValuesRDD = rdd.flatMapValues(x => Seq(x,"male"))
mapValuesRDD.foreach(println)

输出:

(mobin,22)
(mobin,male)
(kpop,20)
(kpop,male)
(lufei,23)
(lufei,male)

如果是mapValues会输出:【对比区别】

(mobin,List(22, male))
(kpop,List(20, male))
(lufei,List(23, male))

=================================================================
6、reduceByKey(func,numPartitions):按Key进行分组,使用给定的func函数聚合value值, numPartitions设置分区数,提高作业并行度

val arr = List(("A",3),("A",2),("B",1),("B",3))
val rdd = sc.parallelize(arr)
val reduceByKeyRDD = rdd.reduceByKey(_ +_)
reduceByKeyRDD.foreach(println)
sc.stop

# (A,5)
# (A,4)
=================================================================
7、groupByKey(numPartitions):按Key进行分组,返回[K,Iterable[V]],numPartitions设置分区数,提高作业并行度【value并不是累加,而是变成一个数组】

//省略
val arr = List(("A",1),("B",2),("A",2),("B",3))
val rdd = sc.parallelize(arr)
val groupByKeyRDD = rdd.groupByKey()
groupByKeyRDD.foreach(println)
sc.stop

# (B,CompactBuffer(2, 3))
# (A,CompactBuffer(1, 2))


# 统计key后面的数组汇总元素的个数
scala> groupByKeyRDD.mapValues(x => x.size).foreach(println)
# (A,2)
# (B,2)
=================================================================
8、sortByKey(accending,numPartitions):返回以Key排序的(K,V)键值对组成的RDD,accending为true时表示升序,为false时表示降序,numPartitions设置分区数,提高作业并行度。

//省略sc
val arr = List(("A",1),("B",2),("A",2),("B",3))
val rdd = sc.parallelize(arr)
val sortByKeyRDD = rdd.sortByKey()
sortByKeyRDD.foreach(println)
sc.stop

# (A,1)
# (A,2)
# (B,2)
# (B,3)


# 统计单词的词频
val rdd = sc.textFile("/home/scipio/README.md")
val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)
val wcsort = wordcount.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1))
wcsort.saveAsTextFile("/home/scipio/sort.txt")

# 升序的话,sortByKey(true)

=================================================================

9、cogroup(otherDataSet,numPartitions):对两个RDD(如:(K,V)和(K,W))相同Key的元素先分别做聚合,最后返回(K,Iterator<V>,Iterator<W>)形式的RDD,numPartitions设置分区数,提高作业并行度

val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd1 = sc.parallelize(arr, 3)
val rdd2 = sc.parallelize(arr1, 3)
val groupByKeyRDD = rdd1.cogroup(rdd2)
groupByKeyRDD.foreach(println)
sc.stop

# (B,(CompactBuffer(2, 3),CompactBuffer(B1, B2)))
# (A,(CompactBuffer(1, 2),CompactBuffer(A1, A2)))

=================================================================

10、join(otherDataSet,numPartitions):对两个RDD先进行cogroup操作形成新的RDD,再对每个Key下的元素进行笛卡尔积,numPartitions设置分区数,提高作业并行度

//省略
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd = sc.parallelize(arr, 3)
val rdd1 = sc.parallelize(arr1, 3)
val groupByKeyRDD = rdd.join(rdd1)
groupByKeyRDD.foreach(println)

# (B,(2,B1))
# (B,(2,B2))
# (B,(3,B1))
# (B,(3,B2))
 
# (A,(1,A1))
# (A,(1,A2))
# (A,(2,A1))
# (A,(2,A2

=================================================================
11、LeftOutJoin(otherDataSet,numPartitions):左外连接,包含左RDD的所有数据,如果右边没有与之匹配的用None表示,numPartitions设置分区数,提高作业并行度

//省略
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3),("C",1))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"))
val rdd = sc.parallelize(arr, 3)
val rdd1 = sc.parallelize(arr1, 3)
val leftOutJoinRDD = rdd.leftOuterJoin(rdd1)
leftOutJoinRDD .foreach(println)
sc.stop

# (B,(2,Some(B1)))
# (B,(2,Some(B2)))
# (B,(3,Some(B1)))
# (B,(3,Some(B2)))
# (C,(1,None))
# (A,(1,Some(A1)))
# (A,(1,Some(A2)))
# (A,(2,Some(A1)))
# (A,(2,Some(A2)))

=================================================================
12、RightOutJoin(otherDataSet, numPartitions):右外连接,包含右RDD的所有数据,如果左边没有与之匹配的用None表示,numPartitions设置分区数,提高作业并行度

//省略
val arr = List(("A", 1), ("B", 2), ("A", 2), ("B", 3))
val arr1 = List(("A", "A1"), ("B", "B1"), ("A", "A2"), ("B", "B2"),("C","C1"))
val rdd = sc.parallelize(arr, 3)
val rdd1 = sc.parallelize(arr1, 3)
val rightOutJoinRDD = rdd.rightOuterJoin(rdd1)
rightOutJoinRDD.foreach(println)
sc.stop

# (B,(Some(2),B1))
# (B,(Some(2),B2))
# (B,(Some(3),B1))
# (B,(Some(3),B2))
# (C,(None,C1))
# (A,(Some(1),A1))
# (A,(Some(1),A2))
# (A,(Some(2),A1))
# (A,(Some(2),A2))

=================================================================
13、lookup()
var rdd1=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
# rdd1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[81] at parallelize at 
rdd1.lookup(1)
# res34: Seq[String] = WrappedArray(a)

=================================================================
14、filter()
val filterRdd = sc.parallelize(List(1,2,3,4,5)).map(_*2).filter(_>5)
filterRdd.collect
# res5: Array[Int] = Array(6, 8, 10)

=================================================================
16、collect()
scala> var rdd1 = sc.makeRDD(1 to 10,2)
# rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
 
scala> rdd1.collect
# res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

=================================================================
17、reduce()

scala> var rdd1 = sc.makeRDD(1 to 10,2)
# rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
# 求和
scala> rdd1.reduce(_ + _)
# res18: Int = 55
 
scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
# rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21
# 分项求和
scala> rdd2.reduce((x,y) => {
     |       (x._1 + y._1,x._2 + y._2)
     |     })
res21: (String, Int) = (CBBAA,6)

=================================================================
18、count()
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
# rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21
 
scala> rdd1.count
# res15: Long = 3


=================================================================
19、first()
scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
# rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
 
scala> rdd1.first
# res14: (String, String) = (A,1)

=================================================================
21、case

scala> val aa=List(1,2,3,"asa")
# aa: List[Any] = List(1, 2, 3, asa)
scala> aa. map {
     |   case i: Int => i + 1
     |   case s: String => s.length
     | }
# res16: List[Int] = List(2, 3, 4, 3)

补充:reduceByKeyt与groupByKey的区别?

[优化代码的最基本思路]

(1)当采用reduceByKeyt时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合。借助下图可以理解在reduceByKey里究竟发生了什么。 注意在数据对被搬移前同一机器上同样的key是怎样被组合的(reduceByKey中的lamdba函数)。然后lamdba函数在每个区上被再次调用来将所有值reduce成一个最终结果。整个过程如下:

ReduceByKey

(2)当采用groupByKey时,由于它不接收函数,spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时。整个过程如下:

groupByKey

因此,在对大数据进行复杂计算时,reduceByKey优于groupByKey。 另外,如果仅仅是group处理,那么以下函数应该优先于 groupByKey :   (1)combineByKey 组合数据,但是组合之后的数据类型与输入时值的类型不一样。   (2)foldByKey合并每一个 key 的所有值,在级联函数和“零值”中使用。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏菩提树下的杨过

java:POI导出excel

POI是一个开源项目,专用于java平台上操作MS OFFICE,企业应用开发中可用它方便导出Excel. 下面是使用示例: 1、maven中先添加依赖项 1 ...

3255
来自专栏数据之美

Spark函数讲解: combineByKey

1、背景 在数据分析中,处理Key,Value的Pair数据是极为常见的场景,例如我们可以针对这样的数据进行分组、聚合或者将两个包含Pair数据的RDD根据ke...

2656
来自专栏Java开发者杂谈

java序列化反序列化深入探究

When---什么时候需要序列化和反序列化: 简单的写一个hello world程序,用不到序列化和反序列化。写一个排序算法也用不到序列化和反序列化。但是当你想...

3689
来自专栏从零开始学自动化测试

Selenium2+python自动化42-判断元素(expected_conditions)

前言 经常有小伙伴问,如何判断一个元素是否存在,如何判断alert弹窗出来了,如何判断动态的元素等等一系列的判断,在selenium的expected_cond...

4537
来自专栏SeanCheney的专栏

《Pandas Cookbook》第07章 分组聚合、过滤、转换1. 定义聚合2. 用多个列和函数进行分组和聚合3. 分组后去除多级索引4. 自定义聚合函数5. 用 *args 和 **kwargs

第01章 Pandas基础 第02章 DataFrame运算 第03章 数据分析入门 第04章 选取数据子集 第05章 布尔索引 第06章 索引对齐 ...

4232
来自专栏二进制文集

Thrift 对象序列化、反序列化-字节数组分析

本篇博客仅分析Thrift对象的序列化、反序列化的字节数组,以及Thrift对象的序列化、反序列化原理。其他源码分析会另开章节~

2392
来自专栏用户2442861的专栏

Java的 transient关键字

哎,虽然自己最熟的是Java,但很多Java基础知识都不知道,比如transient关键字以前都没用到过,所以不知道它的作用是什么,今天做笔试题时发现有一题是...

922
来自专栏Spark学习技巧

Java transient关键字使用小记

1. transient的作用及使用方法 我们都知道一个对象只要实现了Serilizable接口,这个对象就可以被序列化,java的这种序列化模式为开发者提供了...

3176
来自专栏chenssy

【死磕 Spring】—– IOC 之构造函数实例化 bean

createBeanInstance() 用于实例化 bean,它会根据不同情况选择不同的实例化策略来完成 bean 的初始化,主要包括:

1384
来自专栏jeremy的技术点滴

py3_cookbook_notes_01

3338

扫码关注云+社区

领取腾讯云代金券