前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark中累加器的陷阱

Spark中累加器的陷阱

作者头像
俺也想起舞
发布2019-11-27 16:09:22
9140
发布2019-11-27 16:09:22
举报

Spark中在使用累加器时出的一些问题的记录

累加器(Accumulator)简介

累加器(Accumulator)是Spark提供的累加器,顾名思义,该变量只能够增加。由Driver端进行初始变量,Task再对声明的变量进行累加操作。

可以为Accumulator命名,这样就会在Spark web ui中看到每个节点的计数,以及累加后的值,可以帮助你了解程序运行的情况。

累加器使用的陷阱

在前段时间写项目时用累加器稽核数据量,结果发现稽核的数据输入量和输出量明显不同,此时要么是程序存在问题,要么是累加器使用有问题,从最终生成的结果文件中可以看出,是累加器的使用问题

下面来看一个Demo

代码语言:javascript
复制
val conf = new SparkConf()
      .setAppName("Accumulator Demo")
      .setMaster("local")
    val sc = new SparkContext(conf)
    sc.setLogLevel("WARN")
    val example = sc.longAccumulator("Example")

    val byKey = sc
      .parallelize(1 to 10)
      .map(x=>{
        if(x%2==1){
          example.add(-1)
          ("奇数",1)
        }else{
          ("偶数",1)
        }
      })

    byKey.foreach(println(_))
    println("累加后的值:"+example.value)
    println(byKey.count())
    println("累加后的值:"+example.value)

结果:

可以看出,如果一个算子在最终计算两次,则累加器也会同样增加两次

那我们如果将涉及到累加的算子缓存会怎么样呢,修改部分代码

代码语言:javascript
复制
val byKey = sc
      .parallelize(1 to 10)
      .map(x=>{
        if(x%2==1){
          example.add(1)
          ("奇数",1)
        }else{
          ("偶数",1)
        }
      }).persist() //将计算结果进行缓存

结果:

原因分析&解决方案

官方对这个问题的解释如下描述:

For accumulator updates performed inside actions only, Spark guarantees that each task’s update to the accumulator will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware of that each task’s update may be applied more than once if tasks or job stages are re-executed.

我们都知道,spark中的一系列transform操作会构成一串长的任务链,此时需要通过一个action操作来触发,accumulator也是一样。因此在一个action操作之前,你调用value方法查看其数值,肯定是没有任何变化的。

所以在第一次foreach(action操作)之后,我们发现累加器的数值变成了5,是我们要的答案。

之后又对新产生的的byKey进行了一次count(action操作),其实这个时候又执行了一次map(transform)操作,所以累加器又增加了5。最终获得的结果变成了10。

既然已经知道了造成的原因,那就是使用累加器的过程中只能使用一次action的操作才能保证结果的准确性。当然也可以通过切断依赖关系,例如触发一次Shuffle,Spark 会自动缓存Shuffle后生成的RDD(使用的Spark2.1,其他版本暂时不清楚),当然也可以通过Cache()、Persist()进行切断

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 累加器(Accumulator)简介
  • 累加器使用的陷阱
  • 原因分析&解决方案
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档