首页
学习
活动
专区
圈层
工具
发布
50 篇文章
1
2021年大数据Spark(一):框架概述
2
2021年大数据Spark(二):四大特点
3
2021年大数据Spark(三):框架模块初步了解
4
2021年大数据Spark(四):三种常见的运行模式
5
2021年大数据Spark(五):大环境搭建本地模式 Local
6
2021年大数据Spark(六):环境搭建集群模式 Standalone
7
2021年大数据Spark(七):应用架构基本了解
8
2021年大数据Spark(八):环境搭建集群模式 Standalone HA
9
2021年大数据Spark(九):Spark On Yarn两种模式总结
10
2021年大数据Spark(十):环境搭建集群模式 Spark on YARN
11
2021年大数据Spark(十一):应用开发基于IDEA集成环境
12
2021年大数据Spark(十二):Spark Core的RDD详解
13
2021年大数据Spark(十三):Spark Core的RDD创建
14
2021年大数据Spark(十四):Spark Core的RDD操作
15
2021年大数据Spark(十五):Spark Core的RDD常用算子
16
2021年大数据Spark(十六):Spark Core的RDD算子练习
17
2021年大数据Spark(十七):Spark Core的RDD持久化
18
2021年大数据Spark(十八):Spark Core的RDD Checkpoint
19
2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量
20
2021年大数据Spark(二十):Spark Core外部数据源引入
21
2021年大数据Spark(二十一):Spark Core案例-SogouQ日志分析
22
2021年大数据Spark(二十二):内核原理
23
2021年大数据Spark(二十三):SparkSQL 概述
24
2021年大数据Spark(二十四):SparkSQL数据抽象
25
2021年大数据Spark(二十五):SparkSQL的RDD、DF、DS相关操作
26
2021年大数据Spark(二十六):SparkSQL数据处理分析
27
2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount
28
2021年大数据Spark(二十八):SparkSQL案例三电影评分数据分析
29
2021年大数据Spark(二十九):SparkSQL案例四开窗函数
30
2021年大数据Spark(三十):SparkSQL自定义UDF函数
31
2021年大数据Spark(三十一):Spark On Hive
32
2021年大数据Spark(三十二):SparkSQL的External DataSource
33
2021年大数据Spark(三十三):SparkSQL分布式SQL引擎
34
2021年大数据Spark(三十四):Spark Streaming概述
35
2021年大数据Spark(三十五):SparkStreaming数据抽象 DStream
36
2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount
37
2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey
38
2021年大数据Spark(三十八):SparkStreaming实战案例三 状态恢复 扩展
39
2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数
40
2021年大数据Spark(四十):SparkStreaming实战案例五 TopN-transform
41
2021年大数据Spark(四十一):SparkStreaming实战案例六 自定义输出 foreachRDD
42
2021年大数据Spark(四十二):SparkStreaming的Kafka快速回顾与整合说明
43
2021年大数据Spark(四十三):SparkStreaming整合Kafka 0.10 开发使用
44
2021年大数据Spark(四十四):Structured Streaming概述
45
2021年大数据Spark(四十五):Structured Streaming Sources 输入源
46
2021年大数据Spark(四十六):Structured Streaming Operations 操作
47
2021年大数据Spark(四十七):Structured Streaming Sink 输出
48
2021年大数据Spark(四十八):Structured Streaming 输出终端/位置
49
2021年大数据Spark(四十九):Structured Streaming 整合 Kafka
50
2021年大数据Spark(五十):Structured Streaming 案例一实时数据ETL架构

2021年大数据Spark(十九):Spark Core的​​​​​​​共享变量


共享变量

在默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。

为了满足这种需求,Spark提供了两种类型的变量:

 1)、广播变量Broadcast Variables

广播变量用来把变量在所有节点的内存之间进行共享,在每个机器上缓存一个只读的变量,而不是为机器上的每个任务都生成一个副本;

  2)、累加器Accumulators

累加器支持在所有不同节点之间进行累加计算(比如计数或者求和);

官方文档:http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#shared-variables

​​​​​​​广播变量

广播变量允许开发人员在每个节点(Worker or Executor)缓存只读变量,而不是在Task之间传递这些变量。使用广播变量能够高效地在集群每个节点创建大数据集的副本。同时Spark还使用高效的广播算法分发这些变量,从而减少通信的开销。

可以通过调用sc.broadcast(v)创建一个广播变量,该广播变量的值封装在v变量中,可使用获取该变量value的方法进行访问。

​​​​​​​累加器

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,即确提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取Accumulator的值,只有Driver程序可以读取Accumulator的值。创建的Accumulator变量的值能够在Spark Web UI上看到,在创建时应该尽量为其命名。

Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素

当内置的Accumulator无法满足要求时,可以继承AccumulatorV2实现自定义的累加器。实现自定义累加器的步骤:

 第一步、继承AccumulatorV2,实现相关方法;

 第二步、创建自定义Accumulator的实例,然后在SparkContext上注册它;

官方提供实例如下:

​​​​​​​案例演示

     以词频统计WordCount程序为例,假设处理的数据如下所示,包括非单词符合,统计数据词频时过滤非单词的特殊符号并且统计总的格式。

实现功能:

 第一、过滤特殊字符

非单词符合存储列表List中

使用广播变量广播列表

 第二、累计统计非单词符号出现次数

定义一个LongAccumulator累加器,进行计数

示例代码:

代码语言:javascript
复制
package cn.itcast.core

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}

/**
 * 基于Spark框架使用Scala语言编程实现词频统计WordCount程序,将符号数据过滤,并统计出现的次数
 * -a. 过滤标点符号数据
 * 使用广播变量
 * -b. 统计出标点符号数据出现次数
 * 使用累加器
 */
object SparkSharedVariableTest {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf()
      .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .setMaster("local[*]")
    val sc: SparkContext = new SparkContext(sparkConf)
    sc.setLogLevel("WARN")

    // 读取文件数据
    val datasRDD: RDD[String] = sc.textFile("data/input/words2.txt", minPartitions = 2)

    // 字典数据,只要有这些单词就过滤: 特殊字符存储列表List中
    val list: List[String] = List(",", ".", "!", "#", "$", "%")
    // 通过广播变量 将列表list广播到各个Executor内存中,便于多个Task使用
    val listBroadcast: Broadcast[List[String]] = sc.broadcast(list)

    // 定义累加器,记录单词为符号数据的个数
    val accumulator: LongAccumulator = sc.longAccumulator("mycounter")

    // 分割单词,过滤数据
    val wordsRDD = datasRDD
      // 1)、过滤数据,去除空行数据
      .filter(line => line != null && line.trim.length > 0)
      // 2)、分割单词
      .flatMap(_.trim.split("\\s+"))
      // 3)、过滤字典数据:符号数据
      .filter(word => {
        // 获取符合列表 ,从广播变量中获取列表list的值
        val listValue = listBroadcast.value
        // 判断单词是否为符号数据,如果是就过滤掉
        val isCharacter = listValue.contains(word)
        if (isCharacter) {
          // 如果单词为符号数据,累加器加1
          accumulator.add(1L)
        }
        !isCharacter
      })

    val resultRDD: RDD[(String, Int)] = wordsRDD
      // 转换为二元组
      .mapPartitions(iter => {
        iter.map((_, 1))
      })
      // 按照单词聚合统计
      .reduceByKey(_+_)

    resultRDD.foreach(println)
    println(s"过滤符合数据的个数:${accumulator.value}")

    // 应用程序运行结束,关闭资源
    sc.stop()
  }
}

也可以通过WEB UI查看累加器的值

下一篇
举报
领券