单例模式是一种常用的设计模式,但是在集群模式下的 Spark 中使用单例模式会引发一些错误。我们用下面代码作例子,解读在 Spark 中使用单例模式遇到的问题。
object Example{
var instance:Example = new Example("default_name");
def getInstance():Example = {
return instance
}
def init(name:String){
instance = new Example(name)
}
}
class Example private(name1:String) extends Serializable{
var name = name1
}
object Main{
def main(args:Array[String]) = {
Example.init("To create happiness with money")
val sc = new SparkContext(new SparkConf().setAppName("test"))
val rdd = sc.parallelize(1 to 10, 3)
rdd.map(x=>{
x + "_"+ Example.getInstance().name
}).collect.foreach(println)
}
}
我们预期结果是数字和腾讯游戏座右铭,然后实际的结果确实数字和默认名字,如下所示
就像 Example.init(“To create happiness with money”) 没有执行一样。在 Stackoverflow 上,有不少人也碰到这个错误,比如 问题1、问题2和问题3。
这是由什么原因导致的呢?Spark 执行算子之前,会将算子需要东西准备好并打包(这就是闭包的概念),分发到不同的 executor,但这里不包括类。类存在 jar 包中,随着 jar 包分发到不同的 executors 中。当不同的 executors 执行算子需要类时,直接从分发的 jar 包取得。这时候在 driver 上对类的静态变量进行改变,并不能影响 executors 中的类。拿上面的程序做例子,jar 包存的 Example.instance = new Example(“default_name”),分发到不同的 executors。这时候不同 executors 中 Example.getInstance().name 等于 “default_name”。
这个部分涉及到 Spark 底层原理,很难堂堂正正地解决,只能采取取巧的办法。不能再 executors 使用类,那么我们可以用对象嘛。我们可以把 Example 的实例对象塞进算子的闭包,随着闭包分发到不同的 executors。修改之后的代码如下所示。
object Main{
def main(args:Array[String]) = {
Example.init(""To create happiness with money"")
val sc = new SparkContext(new SparkConf().setAppName("test"))
val instance = Example.getInstance()
val rdd = sc.parallelize(1 to 10, 3)
rdd.map(x=>{
x + "_"+ instance.name
}).collect.foreach(println)
}
}
上面代码在集群模式下的 Spark 运行结果是数字和腾讯游戏座右铭。