前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Spark Core快速入门系列(12) | 变量与累加器问题

Spark Core快速入门系列(12) | 变量与累加器问题

作者头像
不温卜火
发布2020-10-28 16:56:52
5060
发布2020-10-28 16:56:52
举报
文章被收录于专栏:不温卜火不温卜火

一. 共享变量

  • 1.代码
package Demo
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 **
@author 不温卜火
 **
 * @create 2020-08-01 12:18
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object AccDemo1 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val p1 = Person(10)
    // 将来会把对象序列化之后传递到每个节点上
    val rdd1 = sc.parallelize(Array(p1))
    val rdd2: RDD[Person] = rdd1.map(p => {p.age = 100; p})

    rdd2.count()
    // 仍然是 10
    println(p1.age)
  }

}
case class Person(var age:Int)
  • 2.结果:
3
3

  正常情况下, 传递给 Spark 算子(比如: map, reduce 等)的函数都是在远程的集群节点上执行, 函数中用到的所有变量都是独立的拷贝.

  这些变量被拷贝到集群上的每个节点上, 都这些变量的更改不会传递回驱动程序.

支持跨 task 之间共享变量通常是低效的, 但是 Spark 对共享变量也提供了两种支持:

  1. 累加器
  2. 广播变量

二. 累加器

  累加器用来对信息进行聚合,通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,所以更新这些副本的值不会影响驱动器中的对应变量。

  如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。

  累加器是一种变量, 仅仅支持“add”, 支持并发. 累加器用于去实现计数器或者求和. Spark 内部已经支持数字类型的累加器, 开发者可以添加其他类型的支持.

2.1 内置累加器

需求:计算文件中空行的数量

  • 1. 代码
package Demo
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

/**
 **
@author 不温卜火
 **
 * @create 2020-08-01 12:22
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object AccDemo2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val rdd: RDD[String] = sc.textFile("D:/words.txt")
    // 得到一个 Long 类型的累加器.  将从 0 开始累加
    val emptyLineCount: LongAccumulator = sc.longAccumulator
    rdd.foreach(s => if (s.trim.length == 0) emptyLineCount.add(1))
    println(emptyLineCount.value)
  }

}
  • 2. 运行结果
4
4
5
5
  • 3. 说明
  1. 在驱动程序中通过sc.longAccumulator得到Long类型的累加器, 还有Double类型的
  2. 可以通过value来访问累加器的值.(与sum等价). avg得到平均值
  3. 只能通过add来添加值.
  4. 累加器的更新操作最好放在action中, Spark 可以保证每个 task 只执行一次. 如果放在 transformations 操作中则不能保证只更新一次.有可能会被重复执行.

2.2 自定义累加器

通过继承类AccumulatorV2来自定义累加器.

  下面这个累加器可以用于在程序运行过程中收集一些文本类信息,最终以List[String]的形式返回。

  • 1. 累加器
package Demo
import java.util
import java.util.{ArrayList, Collections}

import org.apache.spark.util.AccumulatorV2

/**
 **
@author 不温卜火
 **
 * @create 2020-08-01 12:56
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object MyAccDemo {
  def main(args: Array[String]): Unit = {

  }
}

class MyAcc extends AccumulatorV2[String, java.util.List[String]] {
  private val _list: java.util.List[String] = Collections.synchronizedList(new ArrayList[String]())
  override def isZero: Boolean = _list.isEmpty

  override def copy(): AccumulatorV2[String, util.List[String]] = {
    val newAcc = new MyAcc
    _list.synchronized {
      newAcc._list.addAll(_list)
    }
    newAcc
  }

  override def reset(): Unit = _list.clear()

  override def add(v: String): Unit = _list.add(v)

  override def merge(other: AccumulatorV2[String, util.List[String]]): Unit =other match {
    case o: MyAcc => _list.addAll(o.value)
    case _ => throw new UnsupportedOperationException(
      s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
  }

  override def value: util.List[String] = java.util.Collections.unmodifiableList(new util.ArrayList[String](_list))
}
  • 2. 测试
package Demo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


/**
 **
 *
*@author 不温卜火
 **
 * @create 2020-08-01 12:57
 **
 *         MyCSDN :  https://buwenbuhuo.blog.csdn.net/
 *
 */
object MyAccDemo1 {
  def main(args: Array[String]): Unit = {
    val pattern = """^\d+$"""
    val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")
    val sc = new SparkContext(conf)
    // 统计出来非纯数字, 并计算纯数字元素的和
    val rdd1 = sc.parallelize(Array("abc", "a30b", "aaabb2", "60", "20"))

    val acc = new MyAcc
    sc.register(acc)
    val rdd2: RDD[Int] = rdd1.filter(x => {
      val flag: Boolean = x.matches(pattern)
      if (!flag) acc.add(x)
      flag
    }).map(_.toInt)
    println(rdd2.reduce(_ + _))
    println(acc.value)
  }

}

注意:   在使用自定义累加器的不要忘记注册sc.register(acc)

  • 3. 运行结果
6
6

三. 广播变量

  广播变量在每个节点上保存一个只读的变量的缓存, 而不用给每个 task 来传送一个 copy.

  例如, 给每个节点一个比较大的输入数据集是一个比较高效的方法. Spark 也会用该对象的广播逻辑去分发广播变量来降低通讯的成本.

  广播变量通过调用SparkContext.broadcast(v)来创建. 广播变量是对v的包装, 通过调用广播变量的 value方法可以访问.

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
  • 说明
  1. 通过对一个类型T的对象调用SparkContext.broadcast创建出一个Broadcast[T]对象。任何可序列化的类型都可以这么实现。
  2. 通过value属性访问该对象的值(在Java中为value()方法)。
  3. 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)。

  本次的分享就到这里了

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-08-02 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. 共享变量
  • 二. 累加器
    • 2.1 内置累加器
      • 2.2 自定义累加器
      • 三. 广播变量
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档