前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >为Spark Deep Learning 集成

为Spark Deep Learning 集成

作者头像
用户2936994
发布2018-08-27 14:37:36
3530
发布2018-08-27 14:37:36
举报
文章被收录于专栏:祝威廉祝威廉

前言

昨晚睡了12小时,早上起来神清气爽,索性把之前提的一个Issue:Is there any plan to port TensorframeOnSpark(From yahoo) 给尝试着集成进来。 前两天已经添加了一个 TFTextEstimator:为Spark Deep Learning 添加NLP处理实现,不过只能做hyper parameter tuning,做不了真正的分布式训练,所以正好把这个特性加到了这个Estimator里。

使用方法

建议看这篇文章之前,先看为Spark Deep Learning 添加NLP处理实现。 我给TFTextFileEstimator 添加了一个新的参数叫做 runningMode。目前只有两个值: Normal 和 TFoS。

代码语言:javascript
复制
# create a estimator to training where map_fun contains tensorflow's code
estimator = TFTextFileEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds",
                                fitParam=[{"epochs": 1, "cluster_size": 2, "batch_size": 1, "model": "/tmp/model"}],
                                runningMode="TFoS",
                                mapFnParam=map_fun)

如果使用TFoS model参数是必须的。并且 map_fun方法也需要做些改造。主要是tensorflow 分布式training 和 单机多device 还是有区别的。

原理

在TFTextEstimator里,通过参数runningMode控制:

代码语言:javascript
复制
        if self.getRunningMode() == "TFoS":
            return self._fitInCluster(dataset, paramMaps)
        else:
            return self._fitInParallel(dataset, paramMaps)

如果是,则走集群模式,否则走并行训练。 我们来看看_fitInCluster:

代码语言:javascript
复制
def _fitInCluster(self, dataset, paramMaps):
        sc = JVMAPI._curr_sc()

        temp_item = dataset.take(1)[0]
        vocab_s = temp_item["vocab_size"]
        embedding_size = temp_item["embedding_size"]

        baseParamMap = self.extractParamMap()
        baseParamDict = dict([(param.name, val) for param, val in baseParamMap.items()])

        args = self._clusterModelDefaultValue(sc, paramMaps[0])
        args["feature"] = self.getInputCol()
        args["label"] = self.getLabelCol()
        args["vacab_size"] = vocab_s
        args["embedding_size"] = embedding_size
        args["params"] = baseParamDict

        cluster = TFCluster.run(sc, self.getMapFnParam(), args, args['cluster_size'], args['num_ps'],
                                args['tensorboard'],
                                TFCluster.InputMode.SPARK)
        cluster.train(dataset.rdd, args["epochs"])
        cluster.shutdown()

很简单,创建 TFCluster对象,并且调用其train方法。 最核心的还是 map_fun函数,这里实现了所有的tf逻辑(除了数据以外)。我后面会单独一个篇幅来讲。在做实现的过程,发现两个问题:

  1. TFoS 最好一个批次的数据会丢失 ,我对应提了一个IssueWhen training, the data of last batch will not be trained
  2. TFoS 没有办法跑在Local模式,所以调试麻烦些,需要跑在spark standalone模式下。

可运行的实例代码在: TFoSTest.py

mapfun函数解析

TFoSTest.py 里的代码兼容单机和集群模式运行。

代码语言:javascript
复制
def map_fun(args={}, ctx=None, _read_data=None):

如果ctx为None,则是单机模式,否则为集群模式。如果是集群模式则直接使用

代码语言:javascript
复制
TFNode.DataFeed(ctx.mgr, True)

获取数据,否则使用 _read_data 获取数据。具体详细参看示例中代码。

结束语

这个只是Demo性质的,单机和集群模式的融合度还不够好,map_fun编写难度还有些。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.10.14 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 使用方法
  • 原理
    • mapfun函数解析
      • 结束语
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档