首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何在火花中对每个执行器执行一次操作

如何在火花中对每个执行器执行一次操作
EN

Stack Overflow用户
提问于 2016-10-13 08:20:18
回答 4查看 13.1K关注 0票数 31

我有一个weka模型存储在S3中,大小约为400 of。现在,我有一些记录,我想运行模型并执行预测。

对于预测,我尝试的是,

  1. 下载驱动程序上的模型并将其作为静态对象加载,并将其广播给所有执行者。对预测RDD执行映射操作。
  2. 下载并加载驱动程序上的模型作为静态对象,并在每次映射操作中将其发送给executor。
  3. 下载驱动程序上的模型,并将其加载到每个执行器上,并在那里缓存它。(不知道怎么做)

有人知道如何在每个执行器上加载模型一次并缓存它,以便其他记录不再加载它吗?

EN

回答 4

Stack Overflow用户

发布于 2016-10-14 02:46:36

你有两个选择:

1.使用表示数据的惰性val创建一个单例对象:

代码语言:javascript
运行
复制
    object WekaModel {
        lazy val data = {
            // initialize data here. This will only happen once per JVM process
        }
    }       

然后,您可以在map函数中使用惰性val。lazy val确保每个工作JVM初始化他们自己的数据实例。不会为data执行序列化或广播。

代码语言:javascript
运行
复制
    elementsRDD.map { element =>
        // use WekaModel.data here
    }

优势

  • 效率更高,因为它允许您在每个JVM实例中初始化一次数据。例如,当需要初始化数据库连接池时,这种方法是一个很好的选择。

Disadvantages

  • 对初始化的控制较少。例如,如果需要运行时参数,那么初始化对象就更困难了。
  • 如果需要,您不能真正释放或释放对象。通常,这是可以接受的,因为操作系统会在进程退出时释放资源。

2.在RDD上使用mapPartition (或foreachPartition)方法,而不是只使用map

这允许您初始化整个分区所需的任何内容。

代码语言:javascript
运行
复制
    elementsRDD.mapPartition { elements =>
        val model = new WekaModel()

        elements.map { element =>
            // use model and element. there is a single instance of model per partition.
        }
    }

优势

  • 在对象的初始化和重新初始化方面提供了更多的灵活性。

Disadvantages

  • 每个分区将创建并初始化对象的新实例。根据每个JVM实例有多少分区,它可能是一个问题,也可能不是一个问题。
票数 39
EN

Stack Overflow用户

发布于 2017-05-18 06:24:43

以下是比惰性初始化器对我更有效的方法。我创建了一个初始化为null的对象级指针,并让每个执行器初始化它。在初始化块中,可以运行一次代码。请注意,每个处理批将重置局部变量,而不是对象级变量。

代码语言:javascript
运行
复制
object Thing1 {
  var bigObject : BigObject = null

  def main(args: Array[String]) : Unit = {
    val sc = <spark/scala magic here>
    sc.textFile(infile).map(line => {
      if (bigObject == null) {
         // this takes a minute but runs just once
         bigObject = new BigObject(parameters)  
      }
      bigObject.transform(line)
    })
  }
}

这种方法为每个执行器创建一个大对象,而不是其他方法的每个分区创建一个大对象。

如果将var bigObject : BigObject = null放入主函数命名空间中,它的行为会有所不同。在这种情况下,它在每个分区的开头运行bigObject构造函数(即。(批次)如果您有内存泄漏,那么这将最终杀死执行者。垃圾收集还需要做更多的工作。

票数 2
EN

Stack Overflow用户

发布于 2019-12-20 22:59:11

以下是我们通常做的事情

  1. 定义一个单例客户端,以确保每个执行者中只有一个客户端。
  2. 有一个getorcreate方法来创建或获取客户端信息,让我们让您有一个通用的服务平台,您希望为多个不同的模型服务,然后我们可以使用类似的并发映射来确保线程安全和计算
  3. getorcreate方法将在RDD级别内调用,如transform或foreach分区,因此确保init发生在executor级别。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40015777

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档