前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark常用Actions算子

Spark常用Actions算子

作者头像
CoderJed
发布2018-09-13 10:42:23
5060
发布2018-09-13 10:42:23
举报
文章被收录于专栏:Jed的技术阶梯

介绍以下Actions算子: foreach foreachPatition reduce collect count first take takeSample top takeOrdered saveAsTextFile saveAsSequenceFile saveAsObjectFile countByKey countByValue aggregate


(1) foreach、foreachPatition

  • foreach:遍历RDD中的元素
  • foreachPatition:按照分区遍历RDD中的元素
代码语言:javascript
复制
val arr = Array(1,2,3,4,5,6)
val rdd = sc.makeRDD(arr,2)

rdd.foreach(x => {
  println("===========")
  println(x)
})
/*
===========
1
===========
2
===========
3
===========
4
===========
5
===========
6
 */

rdd.foreachPartition(x => {
  println("===========")
  while(x.hasNext) {
    println(x.next())
  }
})
/*
===========
1
2
3
===========
4
5
6
 */

}

(2) reduce:按照指定规则聚合RDD中的元素

代码语言:javascript
复制
val numArr = Array(1,2,3,4,5)
val rdd = sc.parallelize(numArr)
val sum = rdd.reduce(_+_)
println(sum)
/*
15
*/

(3) collect:计算结果拉取回Driver端

代码语言:javascript
复制
val numArr = Array((1,1),(1,2),(1,3),(2,1),(2,2),(2,3))
val rdd = sc.parallelize(numArr)
val sum = rdd.reduceByKey(_+_)

sum.collect().foreach(println)
/*
(1,6)
(2,6)
 */

(4) count、countByKey、countByValue

count:统计RDD中元素个数 countByKey:统计每个Key中的元素的个数 countByValue:统计每个value的个数

代码语言:javascript
复制
// -- count
val arr = Array("Tom","Jack","Tony","Bob","Kate")
val rdd = sc.makeRDD(arr)
println(rdd.count())
/*
5
 */

// -- countByKey
val rdd = sc.parallelize(Array(
        ("销售部","Tom"), ("销售部","Jack"),("销售部","Bob"),("销售部","Terry"),
        ("后勤部","Jack"),("后勤部","Selina"),("后勤部","Hebe"),
        ("人力部","Ella"),("人力部","Harry"),
        ("开发部","Allen")
    ))
val result = rdd.countByKey();
result.foreach(println)
/*
(后勤部,3)
(开发部,1)
(销售部,4)
(人力部,2)

// -- countByValue
val rdd = sc.parallelize(Array(
      "Tom","Jed","Tom",
      "Tom","Jed","Jed",
      "Tom","Tony","Jed"
    ))
val result = rdd.countByValue();
result.foreach(println)
/*
(Tom,4)
(Tony,1)
(Jed,4)
*/

(5) first、take、takeSample

take(n):取RDD中前n条数据 first:= take(1) takeSample(withReplacement,num,[seed]):随机抽取RDD中的元素

代码语言:javascript
复制
withReplacement : 是否是放回式抽样  
    true代表如果抽中A元素,之后还可以抽取A元素
    false代表如果抽中了A元素,之后都不在抽取A元素  
fraction : 抽样的比例  
seed : 抽样算法的随机数种子,不同的数值代表不同的抽样规则,可以手动设置,默认为long的随机数
代码语言:javascript
复制
val arr = Array(("Tom",88),("Bob",92),("Allen",86),("Kate",100),("Sandy",97))
val rdd = sc.makeRDD(arr)

// 排序后去前三个
rdd.sortBy(_._2,false).take(3).foreach(println)
/*
(Kate,100)
(Sandy,97)
(Bob,92)
 */

// 排序后取top1
rdd.sortBy(_._2,false).take(1).foreach(println) // (Kate,100)
println(rdd.sortBy(_._2,false).first()) // (Kate,100)

// 随机抽取2个元素
rdd.takeSample(false, 2).foreach(println)

(6) top、takeOrdered

top(n):从RDD中,按照默认(降序)或者指定的排序规则,返回前n个元素 takeOrdered(n):从RDD中,按照默认(升序)或者指定的排序规则,返回前n个元素

代码语言:javascript
复制
var rdd = sc.makeRDD(Array(10, 4, 2, 12, 3))

rdd.top(3).foreach(println) // 12 10 4(降序取)

rdd.takeOrdered(3).foreach(println) // 2 3 4(升序取)

(7) saveAsTextFile、saveAsSequenceFile 、saveAsObjectFile

  • saveAsTextFile:把结果文件保存为textFile
  • saveAsSequenceFile:把结果文件保存为SequenceFile
  • saveAsObjectFile:把结果文件保存为ObjectFile
代码语言:javascript
复制
val line = sc.textFile("hdfs://repo:9000/user/spark/wordcount/input/wordcount.txt")
line.flatMap(_.split(" "))
  .map((_,1))
  .reduceByKey(_+_)
  .sortBy(_._2,false)
  // .foreach(t => println(t._1 + " " + t._2))
  .saveAsTextFile("hdfs://repo:9000/user/spark/wordcount/output/")

(8) aggregate 搞清楚原理在再补充吧...

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2018.01.12 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • (1) foreach、foreachPatition
  • (2) reduce:按照指定规则聚合RDD中的元素
  • (3) collect:计算结果拉取回Driver端
  • (4) count、countByKey、countByValue
  • (5) first、take、takeSample
  • (6) top、takeOrdered
  • (7) saveAsTextFile、saveAsSequenceFile 、saveAsObjectFile
  • (8) aggregate 搞清楚原理在再补充吧...
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档