【Spark篇】---Spark中广播变量和累加器

一、前述

Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量。

累机器相当于统筹大变量,常用于计数,统计。

二、具体原理

1、广播变量

  • 广播变量理解图
  • 注意事项

1、能不能将一个RDD使用广播变量广播出去?

       不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。

2、 广播变量只能在Driver端定义,不能在Executor端定义。

3、 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。

5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

val conf = new SparkConf()
conf.setMaster("local").setAppName("brocast")
val sc = new SparkContext(conf)
val list = List("hello xasxt")
val broadCast = sc.broadcast(list)
val lineRDD = sc.textFile("./words.txt")
lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}
sc.stop()

 2、累加器

  • 累加器理解图

 Scala代码:

import org.apache.spark.{SparkConf, SparkContext}

object AccumulatorOperator {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local").setAppName("accumulator")
    val sc = new SparkContext(conf)
    val accumulator = sc.accumulator(0)
    sc.textFile("./records.txt",2).foreach {//两个变量
      x =>{accumulator.add(1)
      println(accumulator)}}
    println(accumulator.value)
    sc.stop()
  }
}

 java代码:

package com.spark.spark.others;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
/**
 * 累加器在Driver端定义赋初始值和读取,在Executor端累加。
 * @author root
 *
 */
public class AccumulatorOperator {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf();
        conf.setMaster("local").setAppName("accumulator");
        JavaSparkContext sc = new JavaSparkContext(conf);
        final Accumulator<Integer> accumulator = sc.accumulator(0);
//        accumulator.setValue(1000);
        sc.textFile("./words.txt",2).foreach(new VoidFunction<String>() {
            
            /**
             * 
             */
            private static final long serialVersionUID = 1L;

            @Override
            public void call(String t) throws Exception {
                accumulator.add(1);
//                System.out.println(accumulator.value());
                System.out.println(accumulator);
            }
        });
        System.out.println(accumulator.value());
        sc.stop();
        
    }
}

 结果:

  • 注意事项

累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏个人分享

最最简单的~WordCount¬

步骤1:textFile先生成HadoopRDD,然后再通过map操作生成MappedRDD.

1061
来自专栏牛肉圆粉不加葱

Spark Task 的执行流程③ - 执行 task

创建、分发 Task一文中我们提到 TaskRunner(继承于 Runnable) 对象最终会被提交到 Executor 的线程池中去执行,本文就将对该执行过...

791
来自专栏岑玉海

Spark编程指南

1、在maven里面添加引用,spark和hdfs的客户端的。 groupId = org.apache.spark artifactId = spark-co...

3729
来自专栏一名叫大蕉的程序员

Spark你一定学得会(一)No.7

我是小蕉。 上一篇大家说没有干货,妈蛋回南天哪来的干货你告诉我!!!还好这几天天气还不错,干货来了。 首先祭上今天关键代码,要做的事情就是从Hive表中取得年龄...

1985
来自专栏知识分享

环形队列

 写完这篇文章想着以后尽量(应该说一定)使用现在正在使用的LPC系列的单片机写程序,其实内心感觉还是LPC做的相当完善,,,,,配置上没有32那么的繁琐.......

4187
来自专栏加米谷大数据

Spark RDD详解 -加米谷大数据

1、RDD是什么 RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这...

4469
来自专栏Albert陈凯

4.2 创建RDD

4.2 创建RDD 由于Spark一切都是基于RDD的,如何创建RDD就变得非常重要,除了可以直接从父RDD转换,还支持两种方式来创建RDD: 1)并行化一个...

3179
来自专栏伦少的博客

Spark性能优化:基于分区进行操作

基于分区对数据进行操作可以让我们避免为每个数据元素进行重复的配置工作。诸如打开数据库连接或创建随机数生成器等操作,都是我们应当尽量避免为每个元素都配置一次的工作...

2801
来自专栏xingoo, 一个梦想做发明家的程序员

[大数据之Spark]——快速入门

本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用。...

1999
来自专栏浪淘沙

SparkStreaming_Kafka_Redis整合

2993

扫码关注云+社区

领取腾讯云代金券