在 Spark 中实现单例模式的技巧

单例模式是一种常用的设计模式,但是在集群模式下的 Spark 中使用单例模式会引发一些错误。我们用下面代码作例子,解读在 Spark 中使用单例模式遇到的问题。

object Example{
  var instance:Example = new Example("default_name");
  def getInstance():Example = {
    return instance
  }
  def init(name:String){
    instance = new Example(name)
  }
}
class Example private(name1:String) extends  Serializable{
  var name = name1
}

object Main{
  def main(args:Array[String]) = {
    Example.init("To create happiness with money")
    val sc =  new SparkContext(new SparkConf().setAppName("test"))

    val rdd = sc.parallelize(1 to 10, 3)
    rdd.map(x=>{
      x + "_"+ Example.getInstance().name
    }).collect.foreach(println)
  }
}

我们预期结果是数字和腾讯游戏座右铭,然后实际的结果确实数字和默认名字,如下所示

就像 Example.init(“To create happiness with money”) 没有执行一样。在 Stackoverflow 上,有不少人也碰到这个错误,比如 问题1问题2问题3

这是由什么原因导致的呢?Spark 执行算子之前,会将算子需要东西准备好并打包(这就是闭包的概念),分发到不同的 executor,但这里不包括类。类存在 jar 包中,随着 jar 包分发到不同的 executors 中。当不同的 executors 执行算子需要类时,直接从分发的 jar 包取得。这时候在 driver 上对类的静态变量进行改变,并不能影响 executors 中的类。拿上面的程序做例子,jar 包存的 Example.instance = new Example(“default_name”),分发到不同的 executors。这时候不同 executors 中 Example.getInstance().name 等于 “default_name”。

这个部分涉及到 Spark 底层原理,很难堂堂正正地解决,只能采取取巧的办法。不能再 executors 使用类,那么我们可以用对象嘛。我们可以把 Example 的实例对象塞进算子的闭包,随着闭包分发到不同的 executors。修改之后的代码如下所示。

object Main{
  def main(args:Array[String]) = {
    Example.init(""To create happiness with money"")
    val sc =  new SparkContext(new SparkConf().setAppName("test"))
    
    val instance = Example.getInstance()
    val rdd = sc.parallelize(1 to 10, 3)
    rdd.map(x=>{
      x + "_"+ instance.name
    }).collect.foreach(println)
  }
}

上面代码在集群模式下的 Spark 运行结果是数字和腾讯游戏座右铭。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Linyb极客之路

java 中的序列化是什么意思?有什么好处?

简单说就是为了保存在内存中的各种对象的状态,并且可以把保存的对象状态再读出来。虽然你可以用你自己的各种各样的方法来保存Object States,但是Java给...

753
来自专栏牛肉圆粉不加葱

[源码剖析]Spark读取配置Spark读取配置

我们知道,有一些配置可以在多个地方配置。以配置executor的memory为例,有以下三种方式:

973
来自专栏cloudskyme

jbpm5.1介绍(5)

看几个jbpm5中带的示例程序吧,包括了很多我们在日常生活中的场景 循环示例 本示例是一个在外部传入的变量,通过传入的变量来判断循环次数的演示程序,看一下流程定...

3537
来自专栏大内老A

ASP.NET MVC Model元数据及其定制: Model元数据的定制

在《上篇》我们已经提到过了,Model元数据的定制是通过在作为Model的数据类型极其属性成员上应用相应的特性来实现,这些用于声明式元数据定义的特性大都定义在S...

3284
来自专栏SpringBoot

modal类(JavaBean)什么时候用到序列化(Serializable)

版权声明:本文为博主原创文章,未经博主允许不得转载。

1162
来自专栏加米谷大数据

Redis数据存储优化机制详解

将一个对象存储在hash类型中会占用更少的内存,并且可以更方便的存取整个对象。省内存的原因是新建一个hash对象时开始是用zipmap来存储的。这个zipmap...

982
来自专栏牛肉圆粉不加葱

[Spark源码剖析] JobWaiter

来创建容纳job结果的数据,数组的每个元素对应与之下标相等的partition的计算结果;并将结果处理函数(index, res) => results(ind...

622
来自专栏Jerry的SAP技术分享

什么是Java Marker Interface(标记接口)

先看看什么是标记接口?标记接口有时也叫标签接口(Tag interface),即接口不包含任何方法。在Java里很容易找到标记接口的例子,比如JDK里的Seri...

1305
来自专栏Flutter入门到实战

推特开源的Serial,轻量级,快速的json解析框架

你还在用Gson,fastjson吗?最近几个月推特开源了她们的json解析和序列化框架 Serial,这是一个轻量级框架,操作起来也很简单。下面简单的介绍一下...

1381
来自专栏Spark生态圈

[spark streaming] 状态管理 updateStateByKey&mapWithState

SparkStreaming 7*24 小时不间断的运行,有时需要管理一些状态,比如wordCount,每个batch的数据不是独立的而是需要累加的,这时就需要...

1752

扫码关注云+社区