我有一个weka模型存储在S3中,大小约为400 of。现在,我有一些记录,我想运行模型并执行预测。
对于预测,我尝试的是,
有人知道如何在每个执行器上加载模型一次并缓存它,以便其他记录不再加载它吗?
发布于 2016-10-14 02:46:36
你有两个选择:
1.使用表示数据的惰性val创建一个单例对象:
object WekaModel {
lazy val data = {
// initialize data here. This will only happen once per JVM process
}
}
然后,您可以在map
函数中使用惰性val。lazy val
确保每个工作JVM初始化他们自己的数据实例。不会为data
执行序列化或广播。
elementsRDD.map { element =>
// use WekaModel.data here
}
优势
Disadvantages
2.在RDD上使用mapPartition
(或foreachPartition
)方法,而不是只使用map
。
这允许您初始化整个分区所需的任何内容。
elementsRDD.mapPartition { elements =>
val model = new WekaModel()
elements.map { element =>
// use model and element. there is a single instance of model per partition.
}
}
优势
Disadvantages
发布于 2017-05-18 06:24:43
以下是比惰性初始化器对我更有效的方法。我创建了一个初始化为null的对象级指针,并让每个执行器初始化它。在初始化块中,可以运行一次代码。请注意,每个处理批将重置局部变量,而不是对象级变量。
object Thing1 {
var bigObject : BigObject = null
def main(args: Array[String]) : Unit = {
val sc = <spark/scala magic here>
sc.textFile(infile).map(line => {
if (bigObject == null) {
// this takes a minute but runs just once
bigObject = new BigObject(parameters)
}
bigObject.transform(line)
})
}
}
这种方法为每个执行器创建一个大对象,而不是其他方法的每个分区创建一个大对象。
如果将var bigObject : BigObject = null放入主函数命名空间中,它的行为会有所不同。在这种情况下,它在每个分区的开头运行bigObject构造函数(即。(批次)如果您有内存泄漏,那么这将最终杀死执行者。垃圾收集还需要做更多的工作。
发布于 2019-12-20 22:59:11
以下是我们通常做的事情
https://stackoverflow.com/questions/40015777
复制相似问题