前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Action行动算子

Action行动算子

作者头像
用户1483438
发布2022-05-09 15:42:21
8930
发布2022-05-09 15:42:21
举报
文章被收录于专栏:大数据共享大数据共享

前言

在spark中,有两种算子,Transformation转换算子和 Action行动算子。Transformation转换算子在整个job任务中,都是一个懒加载,只有真正执行了 Action行动算子的时候,整个job任务才会得到正在的运行。 可以把Transformation转换算子理解成工厂中的流水线, Action行动算子相当于总闸,只有拉下总闸,整条流水线便开始了运行。

行动算子有哪些?

  • reduce() 聚合
  • collect() 以数组的形式返回数据集
  • count() 返回RDD中元素个数
  • first() 返回RDD中的第一个元素
  • take() 返回由RDD前n个元素组成的数组
  • takeOrdered() 返回该RDD排序后前n个元素组成的数组
  • aggregate()
  • fold()
  • countByKey() 统计每种key的个数
  • save相关算子
    • saveAsTextFile(path) 保存成Text文件
    • saveAsSequenceFile(path) 保存成Sequencefile文件
    • saveAsObjectFile(path) 序列化成对象保存到文件
  • foreach(f) 遍历RDD中每一个元素

reduce()

根据聚合逻辑聚合数据集中的每个元素。(reduce里面需要具体的逻辑,根据里面的逻辑对相同分区的数据进行计算)

案例演示:计算集合中的总数

代码语言:javascript
复制
  @Test
  def reduce(): Unit ={
    // 创建 sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 创建一个集合
    val list=List(1,2,3,4,5,6,7,8)
    val rdd1: RDD[Int] = sc.parallelize(list, 4)
    // 统计集合中的总数
    val value: Int = rdd1.reduce((v1, v2) => v1 + v2)

    println(value)
    // 关闭资源
    sc.stop()
  }

结果

代码语言:javascript
复制
36

collect()

将计算结果回收到Driver端。当数据量很大时就不要回收了,会造成oom。 一般在使用过滤算子或者一些能返回少量数据集的算子后

案例演示:编写worldCount程序,使用collect收集结果

代码语言:javascript
复制
  @Test
  def collect(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)

    //获取数据结果
    val result: Array[(String, Int)] = rdd4.collect()

    // 输出结果
    result.foreach(e=>{
      e match {
        case (k,v)=>println(k,v)
      }
    })

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

结果

代码语言:javascript
复制
(shell,4)
(wahaha,1)
(hello,2)
(python,1)
(java,5)

count()

返回数据集中的元素数。会在结果计算完成后回收到Driver端。返回行数

案例演示:编写worldCount程序,使用count统计数据总数

代码语言:javascript
复制
  @Test
  def count(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    // 计算单词个数
    val rdd4: RDD[(String, Int)] = rdd3.reduceByKey((v1, v2) => v1 + v2)

    //获取数据结果
    val count: Long = rdd4.count()

    println(count)

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

结果

代码语言:javascript
复制
5

first()

first=take(1) 返回数据集中的第一个元素。

代码语言:javascript
复制
val stuList=List(("张三","男","16",97),("李四","男","16",89),("春娇","女","15",99),("尼古拉斯","男","18",100),("王富贵","男","17",70))

案例演示:返回成绩第一名的学生信息

代码语言:javascript
复制
  @Test
  def first(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 学生信息
    val stuList=List(("张三","男","16",97),("李四","男","16",89),("春娇","女","15",99),("尼古拉斯","男","18",100),("王富贵","男","17",70))

    val rdd1: RDD[(String, String, String, Int)] = sc.parallelize(stuList, 4)

    // 排名 按照成绩排名
    val sortList: RDD[(String, String, String, Int)] = rdd1.sortBy(_._4,ascending = false)

    // 返回第一名学生信息
    val stuInfo: (String, String, String, Int) = sortList.first()

    println(stuInfo)

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

结果

代码语言:javascript
复制
(尼古拉斯,男,18,100)

take()

返回一个包含数据集前n个元素的集合。是一个(array)有几个partiotion 会有几个job触发

案例演示:返回前三名的学生信息

代码语言:javascript
复制
  @Test
  def take(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 学生信息
    val stuList=List(("张三","男","16",97),("李四","男","16",89),("春娇","女","15",99),("尼古拉斯","男","18",100),("王富贵","男","17",70))

    val rdd1: RDD[(String, String, String, Int)] = sc.parallelize(stuList, 4)

    // 排名 按照成绩排名
    val sortList: RDD[(String, String, String, Int)] = rdd1.sortBy(_._4,ascending = false)

    // 返回前三名学生信息
    val top3: Array[(String, String, String, Int)] = sortList.take(3)


    // 遍历打印
    for(e <- top3 ){
      e match {
        case (name,sex,age,score)=>println(name,sex,age,score)
      }
    }


    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

结果

代码语言:javascript
复制
(尼古拉斯,男,18,100)
(春娇,女,15,99)
(张三,男,16,97)

takeOrdered()

作用和 take 类似,takeOrdered取数据前会对数据进行排序,默认按照降序

案例演示:返回前三名的学生信息

代码语言:javascript
复制
  @Test
  def takeOrdered(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 学生信息
    val stuList=List(("张三","男","16",97),("李四","男","16",89),("春娇","女","15",99),("尼古拉斯","男","18",100),("王富贵","男","17",70))

    val rdd1: RDD[(String, String, String, Int)] = sc.parallelize(stuList, 4)

    // 排名 按照成绩排名
    //val sortList: RDD[(String, String, String, Int)] = rdd1.sortBy(_._4,ascending = false)

    // 返回排完序后的前三条数据信息
    val top3: Array[(String, String, String, Int)] = rdd1.takeOrdered(3)


    // 遍历打印
    for(e <- top3 ){
      e match {
        case (name,sex,age,score)=>println(name,sex,age,score)
      }
    }

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

结果

代码语言:javascript
复制
(尼古拉斯,男,18,100)
(张三,男,16,97)
(春娇,女,15,99)

takeOrdered 源码

代码语言:javascript
复制
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
    if (num == 0) {
      Array.empty
    } else {
      val mapRDDs = mapPartitions { items =>
        // Priority keeps the largest elements, so let's reverse the ordering.
        val queue = new BoundedPriorityQueue[T](num)(ord.reverse)
        queue ++= collectionUtils.takeOrdered(items, num)(ord)
        Iterator.single(queue)
      }
      if (mapRDDs.partitions.length == 0) {
        Array.empty
      } else {
        mapRDDs.reduce { (queue1, queue2) =>
          queue1 ++= queue2
          queue1
        }.toArray.sorted(ord)
      }
    }
  }

指定排序规则

代码语言:javascript
复制
  @Test
  def takeOrdered(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 学生信息
    val stuList=List(("张三","男",16,97),("李四","男",16,89),("春娇","女",15,99),("尼古拉斯","男",18,100),("王富贵","男",17,70))

    val rdd1: RDD[(String, String, Int, Int)] = sc.parallelize(stuList, 4)

    // 排名 按照成绩排名
    //val sortList: RDD[(String, String, Int, Int)] = rdd1.sortBy(_._4,ascending = false)

    // 按照年龄排名
    val top3: Array[(String, String, Int, Int)] = rdd1.takeOrdered(3)(Ordering.by(_._3))


    // 遍历打印
    for(e <- top3 ){
      e match {
        case (name,sex,age,score)=>println(name,sex,age,score)
      }
    }

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }
代码语言:javascript
复制
(春娇,女,15,99)
(李四,男,16,89)
(张三,男,16,97)

aggregate()

aggregate 源码

代码语言:javascript
复制
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
    val cleanSeqOp = sc.clean(seqOp)
    val cleanCombOp = sc.clean(combOp)
    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
    val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
    sc.runJob(this, aggregatePartition, mergeResult)
    jobResult
  }

(zeroValue: U):表示设置一个初始值 (U, T):在分区内计算的逻辑;U表示初始值,T表示待处理的元素 (U, U) :在Driver内计算的逻辑;第一个U表示初始值,第二个U表示分区内的元素结果

案例说明:

代码语言:javascript
复制
  @Test
  def aggregate(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)
    // 六个元素
    val list=List(1,5,7,8,9,4)
    // 为了方便计算,分为两个区
    val rdd1: RDD[Int] = sc.parallelize(list, 2)

    // 计算
    val result=rdd1.aggregate(100)((u,t)=>u-t,(u1,u2)=>u1+u2)
    // 结果应该是多少
    println(result)

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

答案

代码语言:javascript
复制
266

266是怎么来的呢? 首先有两个元素

代码语言:javascript
复制
val list=List(1,5,7,8,9,4)

两个分区

代码语言:javascript
复制
val rdd1: RDD[Int] = sc.parallelize(list, 2)

假设 分区0的元素为

代码语言:javascript
复制
1,5,7

分区1的元素为

代码语言:javascript
复制
8,9,4

(U, T):是用于计算分区分区内的。 分区0的计算逻辑

代码语言:javascript
复制
第一次 U=100(初始值)T=1(待处理的元素)进行运算 100-1=99
第二次 U=99(上一次的运算结果)T=5(待处理的元素)进行运算 99-5=94
第二次 U=94(上一次的运算结果)T=7(待处理的元素)进行运算 94-7=87

分区0的最终结果为87

分区1的计算逻辑

代码语言:javascript
复制
第一次 U=100(初始值)T=8(待处理的元素)进行运算 100-8=92
第二次 U=92(上一次的运算结果)T=9(待处理的元素)进行运算 92-9=83
第二次 U=83(上一次的运算结果)T=4(待处理的元素)进行运算 93-4=79

分区1的最终结果为79

(U, U) :在Driver内计算的逻辑;这里开始对各个分区的结果进行汇总

代码语言:javascript
复制
第一次 U1=100(初始值)u2=87(待处理的分区元素)进行运算 100+87=187
第er次 U1=187(上一次的运算结果)u2=79(待处理的分区元素)进行运算 187+79=266

最终结果就是266

fold()

fold 源码

代码语言:javascript
复制
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
    // Clone the zero value since we will also be serializing it as part of tasks
    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
    val cleanOp = sc.clean(op)
    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
    val mergeResult = (_: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
    sc.runJob(this, foldPartition, mergeResult)
    jobResult
  }

(zeroValue: T):表示设置一个初始值 (op: (T, T) => T):在Driver内计算的逻辑;第一个T表示初始值,第二个T表示待处理的元素

代码语言:javascript
复制
  @Test
  def fold(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    val list=List(1,5,7,8,9,4)
    val rdd1: RDD[Int] = sc.parallelize(list, 2)

    // 计算
    val result=rdd1.fold(100)((t1,t2)=>t1+t2)
    // 结果
    println(result)

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

我最开始的以为过程是这样的。

代码语言:javascript
复制
第一次 T1=100(初始值)T2=1(待处理的元素)进行运算 100+1=101
第二次 T1=101(初始值)T2=5(待处理的分区元素)进行运算 101+5=106
第三次 T1=106(初始值)T2=7(待处理的分区元素)进行运算 106+7=113
第四次 T1=113(初始值)T2=8(待处理的分区元素)进行运算 113+8=121
第五次 T1=121(初始值)T2=9(待处理的分区元素)进行运算 121+9=130
第六次 T1=130(初始值)T2=4(待处理的分区元素)进行运算 130+4=134

最终结果为:134 ;但是这个答案是错误的(哭脸)。


其实fold运算的逻辑就是aggregate的运算逻辑

代码语言:javascript
复制
val result=rdd1.fold(100)((t1,t2)=>t1+t2)

这两句代码是等价的。

代码语言:javascript
复制
val result=rdd1.aggregate(100)((u,t)=>u+t,(u1,u2)=>u1+u2)

foldaggregate区别在于 aggregate 分区的逻辑 可以和Driver内的运算逻辑不一致。 fold分区的逻辑 与Driver内的运算逻辑是一致的。

运算流程就复制上面的改改把,原理都是一样的。 假设 分区0的元素为

代码语言:javascript
复制
1,5,7

分区1的元素为

代码语言:javascript
复制
8,9,4

(T, T):是用于计算分区分区内的。 分区0的计算逻辑

代码语言:javascript
复制
第一次 T1=100(初始值)T2=1(待处理的元素)进行运算 100+1=101
第二次 T1=101(上一次的运算结果)T2=5(待处理的元素)进行运算 101+5=106
第二次 T1=106(上一次的运算结果)T2=7(待处理的元素)进行运算 106+7=113

分区0的最终结果为113

分区1的计算逻辑

代码语言:javascript
复制
第一次 T1=100(初始值)T2=8(待处理的元素)进行运算 100+8=108
第二次 T1=108(上一次的运算结果)T2=9(待处理的元素)进行运算 108+9=117
第二次 T1=117(上一次的运算结果)T2=4(待处理的元素)进行运算 117+4=121

分区1的最终结果为121

(T, T) :在Driver内计算的逻辑;这里开始对各个分区的结果进行汇总

代码语言:javascript
复制
第一次 T1=100(初始值)T2=108(待处理的分区元素)进行运算 100+113=213
第er次 T1=213(上一次的运算结果)T2=121(待处理的分区元素)进行运算 213+121=334

最终结果就是334

countByKey()

作用到K,V格式的RDD上,根据Key计数相同Key的数据集元素。(也就是个数)

上面使用collect做一个WroldCount程序,这里使用countByKey的方式实现(主要是懒,写得太累了)

代码语言:javascript
复制
  @Test
  def countByKey(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 读取文件
    val rdd1: RDD[String] =sc.textFile(path = "file:///C:/Users/123456/Desktop/worldCount.txt",minPartitions = 4)

    // 数据扁平化,
    val rdd2: RDD[String] =rdd1.flatMap(e=>e.split(" "))

    // 映射
    val rdd3: RDD[(String, Int)] =rdd2.map(e=>(e,1))

    //获取数据结果
    val result: collection.Map[String, Long] = rdd3.countByKey()

    // 输出结果
    result.foreach(e=>{
      e match {
        case (k,v)=>println(k,v)
      }
    })

    // 关闭资源;养成良好编码习惯
    sc.stop()
  }

结果

代码语言:javascript
复制
(shell,4)
(wahaha,1)
(java,5)
(python,1)
(hello,2)

save相关算子

可以看看我之前写的数据读取与保存,里面有简书sava相关算子的操作或用法。

foreach(f)

循环遍历数据集中的每个元素,运行相应的逻辑。

案例演示:

代码语言:javascript
复制
  @Test
  def foreach(): Unit ={
    // 创建sc
    val conf =new SparkConf().setMaster("local[4]").setAppName("test")
    val sc=new SparkContext(conf)

    // 学生信息
    val stuList=List(("张三","男",16,97),("李四","男",16,89),("春娇","女",15,99),("尼古拉斯","男",18,100),("王富贵","男",17,70))

    val rdd1: RDD[(String, String, Int, Int)] = sc.parallelize(stuList, 4)

    //遍历
    rdd1.foreach(stu=>{
      stu match {
        case (name,sex,age,score)=>println(name,sex,age,score)
      }
    })

    // 关闭资源;养成良好编码习惯
    sc.stop()

  }

结果

代码语言:javascript
复制
(李四,男,16,89)
(春娇,女,15,99)
(尼古拉斯,男,18,100)
(张三,男,16,97)
(王富贵,男,17,70)

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 行动算子有哪些?
  • reduce()
  • collect()
  • count()
  • first()
  • take()
  • takeOrdered()
  • aggregate()
  • fold()
  • countByKey()
  • save相关算子
  • foreach(f)
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档