前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark累加器(Accumulator)

Spark累加器(Accumulator)

作者头像
用户1483438
发布2022-07-26 21:36:02
1.6K0
发布2022-07-26 21:36:02
举报
文章被收录于专栏:大数据共享大数据共享

什么是累加器

累加器:分布式共享只写变量。(Executor和Executor之间不能读数据) 累加器用来把Executor端变量信息聚合到Driver端。在Driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回Driver端进行merge。

案例演示

统计列表中的元素之和

代码语言:javascript
复制
  @Test
  def demo: Unit ={
    
    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)

    //定义一个集合,分区为2;方便计算
    val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)
    
    // 统计元素之和
    var sum=0
    
    // 循环累加
    rdd1.foreach(e=>{
      sum =sum+e 
    })

    // 输出结果
   println(s"sum=$sum")
  }

此时 sum 结果为多少? 答案为0

代码语言:javascript
复制
sum=0

为什么是0呢?难道不应该是3+2+5+4+8+6=28吗? 原因很简单,foreach 属于Action算子;算子都是是Executor中执行的,算子外的都在是Driver中执行的。若算子中的若要引入外部变量的数据,就需要进行序列化。 具体的操作如图;

草图

虽然对sum进行累加,但只是作用于分区内而言,对于Driver而言,sum始终是没有改变的。 我们可以打印出来看看,task就是一个线程,使用Thread.currentThread().getName可以获取线程名称

代码语言:javascript
复制
    // 循环累加
    rdd1.foreach(e=>{
      sum =sum+e
      println(s"${Thread.currentThread().getName};sum=$sum, e=$e ")
    })

分区0

代码语言:javascript
复制
Executor task launch worker for task 0;sum=3, e=3 
Executor task launch worker for task 0;sum=5, e=2 
Executor task launch worker for task 0;sum=10, e=5 

分区1

代码语言:javascript
复制
Executor task launch worker for task 1;sum=4, e=4
Executor task launch worker for task 1;sum=12, e=8 
Executor task launch worker for task 1;sum=18, e=6 

当然你可以说,我不用foreach,用其他的算子不行吗?当然可以,比如使用reduce

代码语言:javascript
复制
  @Test
  def demo: Unit ={

    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)

    //定义一个集合,分区为2
    val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)

    // 数据聚集
    val sum=rdd1.reduce(_+_)

    // 输出结果
    println(s"sum=$sum")

  }

输出结果,答案是28

代码语言:javascript
复制
sum=28

条条大路通罗马,实现方式多种多样。

在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark Streaming应用中记录某些事件的数量。

累加器的使用

使用累加器需要使用SparkContext设置 如下:sumAccumulator=累加器取个名

代码语言:javascript
复制
val sumAccumulator=sc.longAccumulator("sumAccumulator")

内置累加器 内置的累加器有三种,LongAccumulator、DoubleAccumulator、CollectionAccumulator LongAccumulator: 数值型累加

代码语言:javascript
复制
LongAccumulator longAccumulator = sc.longAccumulator("long-account");

DoubleAccumulator: 小数型累加

代码语言:javascript
复制
DoubleAccumulator doubleAccumulator = sc.doubleAccumulator("double-account");

CollectionAccumulator:集合累加

代码语言:javascript
复制
CollectionAccumulator<Integer> collectionAccumulator = sc.collectionAccumulator("double-account");

案例演示:

代码语言:javascript
复制
  @Test
  def demo2(): Unit ={

    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)
    // 定义累加器
    val sumAccumulator=sc.longAccumulator("sumAccumulator")

    //定义一个集合,分区为2
    val rdd1: RDD[Int] = sc.parallelize(List(3, 2, 5, 4, 8, 6), 2)

    // 循环累加
    rdd1.foreach(e=>{
      sumAccumulator.add(e)
    })

    // 输出结果
    println(s"sum=${sumAccumulator.value}")

 }

结果

代码语言:javascript
复制
sum=28

其他两种也就不演示了,使用起来都是一样。 add:存放数据 value:获取结果


累加器的作用

累加器:分布式只写变量(Executor端的task不能互相访问累加器的值)。 累加器对信息进行聚合。向Spark传递函数时,通常可以使用Driver端定义的变量,但是在Executor端使用此变量时,每个task中使用的都是此变量的副本。如果变量的值发生了变化,Driver端的变量值却不会改变。 我们可以通过累加器实现分片处理,同时更新变量值 原文链接:https://blog.csdn.net/FlatTiger/article/details/115133641 可以不用,但是不能不会。

自定义累加器

自定义累加器步骤

  1. 定义 1.定义class继承AccumulatorV2 2.重写抽象方法
  2. 使用 1.初始化累加器对象 2.注册累加器 3.在分区中累加数据 4.获取最终结果

案例: 使用累加器实现WroldCount功能

  1. 定义一个class 继承AccumulatorV2 AccumulatorV2需要我们指定两个类型, INT:表示输入的数据类型 OUT:表示返回结果的数据类型。
代码语言:javascript
复制
abstract class AccumulatorV2[IN, OUT]

不太理解没有关系,我们可以看看longAccumulator累加器中 INOUT 指定是什么? 传进去的是一个Long ,返回的也是一个Long;

代码语言:javascript
复制
class LongAccumulator extends AccumulatorV2[jl.Long, jl.Long] {

我们在哪里传入的呢? add 就是传进去的参数(int 可以自动转为long)

代码语言:javascript
复制
// 循环累加
rdd1.foreach(e=>{
  sumAccumulator.add(e)
})

我的思考方式应该是,我们应该给add传入什么类型的数据,该数据类型不就是IN吗? 既然是单词出现的个数,能否指定为String?若只是单纯的指定为String好像不太好计算。

代码语言:javascript
复制
List("python","java","python","java","spark")

我们可以给每个单词分配一个值 1;

代码语言:javascript
复制
List(("python",1),("java",1),("python",1),("java",1),("spark",1))

这样IN 的参数类型就明确了,首先是一个元组,元组类型为(String,Int) 那么OUT的类型呢?看下面的代码片段思考出了什么吗?

代码语言:javascript
复制
// 输出结果
println(s"sum=${sumAccumulator.value}")

value 返回是不是最终的结果?WorldCount程序数据结果是什么? 是否就是这个?

代码语言:javascript
复制
List(("python",2),("java",2),("spark",1))

OUT的类型,我们可以指定成一个List ,里面的元素类型,还是一个元组(String,Int)

还需要重写里面的方法。

代码语言:javascript
复制
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
  
  /**
   * 累加器是否为空
   */
  override def isZero: Boolean = ???
  
  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???
  
  /**
   * 重置累加器
   */
  override def reset(): Unit = ???
  
  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = ???
  
  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???
  
  /**
   * 获取Driver汇总结果
   */
  override def value: List[(String, Int)] = ???
}

先不着急写里面的实现,先调用,这样方便理解。

代码语言:javascript
复制
  @Test
  def demo3(): Unit ={


    val conf=new SparkConf().setMaster("local[2]").setAppName("test")
    val sc =new SparkContext(conf)

    //初始化累加器
    val acc = new CustomAccumulator

    //注册累加器
    sc.register(acc,"CustomAccumulator")

    //读取文件
    val lines=sc.textFile("file:///C:/Users/123456/Desktop/worldCount.txt",2)

    // 列裁剪,数据扁平化
    val value: RDD[String] = lines.flatMap(_.split(" "))

    // 转换成我们需要的数据结构
    val mapList: RDD[(String, Int)] = value.map(e => (e, 1))

    // 循环累加
    mapList.foreach(e=>{
      acc.add(e)
    })

    // 输出结果
    println(s"sum=${acc.value}")

  }

worldCount.txt 内容

代码语言:javascript
复制
hello java shell
python java java
wahaha java shell
hello java shell shell

每一个元素都会交给add,就先完成add函数

代码语言:javascript
复制
  import scala.collection.mutable
  // 定义一个可变map 存储add 传入进来的元素
  val result=mutable.Map[String,Int]()
代码语言:javascript
复制
  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = {
    // 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素

    // 根据key找到map中的元素,修改原来的总数
    val sum=this.result.getOrElse(v._1,0)+v._2
    // 覆盖原来的key
    this.result.put(v._1,sum) 
  }

不太理解也没关系,下面有完整的代码。

value 返回的结果不就是result的结果吗?所以直接maplist

代码语言:javascript
复制
  /**
   * 获取Driver汇总结果
   */
  override def value: List[(String, Int)] = this.result.toList

目前完成代码

代码语言:javascript
复制
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
  import scala.collection.mutable
  // 定义一个可变map 存储add 传入进来的元素
  val result=mutable.Map[String,Int]()


  /**
   * 累加器是否为空
   */
  override def isZero: Boolean = ???

  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] = ???

  /**
   * 重置累加器
   */
  override def reset(): Unit = ???

  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = {
    // 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素

    // 根据key找到map中的元素,修改原来的总数
    val sum=this.result.getOrElse(v._1,0)+v._2
    // 覆盖原来的key
    this.result.put(v._1,sum)

  }

  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = ???

  /**
   * 获取Driver汇总结果
   */
    override def value: List[(String, Int)] = this.result.toList
}

当前累加器的数据都是在result中,所以直接判断 result是否为空即可

代码语言:javascript
复制
/**
   * 累加器是否为空
   */
  override def isZero: Boolean = result.isEmpty

复制累加器;理解起来有点抽象,new CustomAccumulator定义在Driver中,但是整个计算是在每个分区中,所以我们需要创建一个新的累加器给他(后面会有画图,理解起来就不会那么抽象了)。

代码语言:javascript
复制
  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] =  new CustomAccumulator()

重置累加器 : 就是清空数据

代码语言:javascript
复制
  /**
   * 重置累加器
   */
  override def reset(): Unit = this.result.clear()

上面说了,计算都在分区中进行的,所以需要对每个分区的数据进行汇总

代码语言:javascript
复制
  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
    // 获取其他分区的累加器数据结果
    val value: List[(String, Int)] = other.value

    //与result数据合并
    val list: List[(String, Int)] = result.toList
    // 此时 newList 中肯定有重复数据
    val newList: List[(String, Int)] =list++value

    // 分组,聚合
    val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
    println(groupList)


    // e._1 单词
    // e._2 依然还是一个列表
    // e._2.map(_._2).sum  获取里面的单词数
    val newResult: Map[String, Int] =groupList.map(e=>{
      val sum = e._2.map(_._2).sum
      (e._1,sum)
    })
    // 合并map
    result.++=(newResult)

  }

完整代码

代码语言:javascript
复制
class CustomAccumulator extends AccumulatorV2[(String,Int),List[(String,Int)]]{
  import scala.collection.mutable
  // 定义一个可变map 存储add 传入进来的元素
  val result=mutable.Map[String,Int]()


  /**
   * 累加器是否为空
   */
  override def isZero: Boolean = result.isEmpty

  /**
   * 复制累加器
   */
  override def copy(): AccumulatorV2[(String, Int), List[(String, Int)]] =new CustomAccumulator()

  /**
   * 重置累加器
   */
  override def reset(): Unit = this.result.clear()

  /**
   * 累加元素 [在每个task中累加]
   */
  override def add(v: (String, Int)): Unit = {
    // 传入进来的元素存到哪里?可以定义一个可变Map,存储每一个元素


    // 根据key找到map中的元素,修改原来的总数
    val sum=this.result.getOrElse(v._1,0)+v._2
    // 覆盖原来的key
    this.result.put(v._1,sum)

  }

  /**
   * 合并每个task的累加结果【在Driver中合并】
   */
  override def merge(other: AccumulatorV2[(String, Int), List[(String, Int)]]): Unit = {
    // 获取其他分区的累加器数据结果
    val value: List[(String, Int)] = other.value

    //与result数据合并
    val list: List[(String, Int)] = result.toList
    // 此时 newList 中肯定有重复数据
    val newList: List[(String, Int)] =list++value

    // 分组,聚合
    val groupList: Map[String, List[(String, Int)]] = newList.groupBy(e => e._1)
    println(groupList)


    // e._1 单词
    // e._2 依然还是一个列表
    // e._2.map(_._2).sum  获取里面的单词数
    val newResult: Map[String, Int] =groupList.map(e=>{
      val sum = e._2.map(_._2).sum
      (e._1,sum)
    })
    // 合并map
    result.++=(newResult)

  }

  /**
   * 获取Driver汇总结果
   */
    override def value: List[(String, Int)] = this.result.toList
}

数据结果

代码语言:javascript
复制
sum=List((wahaha,1), (java,5), (shell,4), (hello,2), (python,1))

分区二与分区一合并的数据。

代码语言:javascript
复制
Map(shell -> List((shell,2), (shell,2)), wahaha -> List((wahaha,1)), java -> List((java,1), (java,4)), python -> List((python,1)), hello -> List((hello,1), (hello,1)))

本文系转载,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系转载前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 什么是累加器
  • 案例演示
  • 累加器的使用
  • 累加器的作用
  • 自定义累加器
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档