3个关键点,把你的TensorFlow代码重构为分布式!

对于机器学习模型,分布式大致分两类:模型分布式和数据分布式:

模型分布式非常复杂和灵活, 它把整个机器学习模型分割,分散在多个节点上,在每个节点上计算模型的各个部分, 最后把结果拼接起来。如果你造了一个并行性很高的深度网络,比如这个,那就更棒了。你只要在每个节点上,计算不同的层,最后把各个层的异步结果通过较为精妙的方式汇总起来。

而我们今天要手把手教大家的是数据分布式。模型把数据拷贝到多个节点上, 每次算Epoch迭代的时候,每个节点对于一个batch的梯度都会有一个计算值,一个batch结束后,所有节点把梯度值汇总起来(ps参数服务器的任务就是汇总所有参数更新),从而进行更新。这就会导致每个batch的计算都比非分布式方法精准。相对非分布式,并行方法下,同样的迭代次数,收敛较快。

如何把自己的单机TensorFlow代码变为分布式的代码?

本文将手把手告诉大家3个关键点,重构自己的TensorFlow代码为分布式代码(开始前请大家前用1分钟了解文末的参考文献,了解基本知识):

关键点1: 定义FLAGS全局变量,获得ps参数服务器,worker工作服务器等分布式全局信息。

# Define parameters
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_float('learning_rate', 0.00003, 'Initial learning rate.')
tf.app.flags.DEFINE_integer('steps_to_validate', 1000,
                            'Steps to validate and print loss')

# For distributed
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("issync", 0, "issync mode")

以上代码是从命令行获得变量的简单方式。使用TensorFlow自带的FLAGS命令行工具。

ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,

上述代码教你如何获得命令行变量到python变量。ps_hosts代表所有参数服务器,work_hosts是所有工作服务器。cluster组装一个分布式集群定义。server代表本地为任务分配的服务器。

关键点2: 在流图Graph定义阶段, 加入“参数服务器”和“工作服务器”的判断,重构Graph定义代码。

if FLAGS.job_name == "ps":
    server.join()
elif FLAGS.job_name == "worker":
    with tf.device(tf.train.replica_device_setter(
                    worker_device="/job:worker/task:%d" % FLAGS.task_index,
                    cluster=cluster)):
        # 这里是各个worker工作服务器下的graph定义。

如果当前服务器是ps参数服务器,当前服务器就要执行join方法汇总更新的参数。

如果当前是工作服务器,构建deVice设备上下文,复制数据到各个设备,并且知道任务号,之后再定义原先的Graph。

关键点3: 最后,重构你原来的graph定义和TensorFlow Session训练的方式细节。

    grads_and_vars = optimizer.compute_gradients(cost)
    correct_prediction = tf.equal(y_pred_cls, y_true_cls)
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    rep_op = tf.train.SyncReplicasOptimizer(optimizer,
                                            replicas_to_aggregate=len(
                                            worker_hosts),
                                            #replica_id=FLAGS.task_index,
                                            total_num_replicas=len(
                                            worker_hosts),
                                            use_locking=True)
    train_op = rep_op.apply_gradients(grads_and_vars,
                                global_step=global_step)
    init_token_op = rep_op.get_init_tokens_op()
    chief_queue_runner = rep_op.get_chief_queue_runner()

    init_op = tf.initialize_all_variables()
    saver = tf.train.Saver()
    train_batch_size = batch_size
    tf.summary.scalar('cost', cost)
    tf.summary.scalar('accuracy', accuracy)
    summary_op = tf.summary.merge_all()
    summary_writer = tf.summary.FileWriter('./summary_log/train')
    summary_writer_test = tf.summary.FileWriter('./summary_log/test')

sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                            logdir="./checkpoint/",
                            init_op=init_op,
                            summary_op=None,
                            saver=saver,
                            global_step=global_step,
                            save_model_secs=60)
session = sv.prepare_or_wait_for_session(server.target)
sv.start_queue_runners(session, [chief_queue_runner])
session.run(init_token_op)

训练中稍有不同的是上面这段代码,graph定义完毕后,我们要用optimizer.compute_gradients方法计算梯度得到grads_and_vars对象。通过SyncReplicasOptimizer这个特殊的优化器,进行梯度的计算,即rep_op.apply_gradients(grads_and_vars, global_step=global_step)方法。

计算完毕得到的train_op对象就能在未来想用session.run()的地方使用了:

session.run([train_op, cost, global_step], feed_dict=feed_dict_train)

注意以上三个关键点, 你离TensorFlow并行化已经八九不离十了。

实际重构的例子,请看我github上识别猫狗的基本程序:

分布式版:

https://github.com/yanchao727/tensorflow_kaggle_cat_dog/blob/master/cnn.distributed.py

单机版:

https://github.com/yanchao727/tensorflow_kaggle_cat_dog/blob/master/cnn.py

参考文献

  1. https://github.com/thewintersun/distributeTensorflowExample
  2. https://www.tensorflow.org/deploy/distributed
  3. http://blog.csdn.net/luodongri/article/details/52596780
  4. https://www.slideshare.net/stanleywanguni/distributed-machine-learning
  5. https://www.quora.com/How-is-parallel-computing-used-in-machine-learning

本文转载自:David 9的博客 — 不怕"过拟合"

原文发布于微信公众号 - 进击的Coder(FightingCoder)

原文发表时间:2018-04-20

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏AI科技大本营的专栏

不会用Photoshop抠图?Mask R-CNN助你一键“除”人

翻译 | 林椿眄 编辑 | 费棋 【AI科技大本营导读】:看过英剧《黑镜》吗?圣诞特别版《白色圣诞节》中有这样一个场景:其中一个未来科技有自由屏蔽人像的功能,可...

4107
来自专栏kangvcar

手把手教你用1行代码实现人脸识别 -- Python Face_recognition

1353
来自专栏ATYUN订阅号

TensorFlow:使用Cloud TPU在30分钟内训练出实时移动对象检测器

是否能够更快地训练和提供对象检测模型?我们已经听到了这种的反馈,在今天我们很高兴地宣布支持训练Cloud TPU上的对象检测模型,模型量化以及并添加了包括Ret...

2325
来自专栏人工智能LeadAI

逻辑回归 | TensorFlow深度学习笔记

课程目标:学习简单的数据展示,训练一个Logistics Classifier,熟悉以后要使用的数据 Install Ipython NoteBook 可以参考...

3057
来自专栏iOSDevLog

用scikit-learn开始机器学习

原文:https://www.raywenderlich.com/174-beginning-machine-learning-with-scikit-lear...

1691
来自专栏AI科技大本营的专栏

实战 | 手把手教你用苹果CoreML实现iPhone的目标识别

在WWDC 2017上,苹果首次公布了机器学习方面的动作。iOS系统早已支持Machine Learning 和 Computer Vision ,但这次苹果提...

7068
来自专栏人工智能LeadAI

OpenCV人脸识别之二:模型训练

本系列人脸识别文章用的是opencv2,最新版的opencv3.2的代码请参考文章: OpenCV之识别自己的脸——C++源码放送(请在上一篇文章末尾查看) 在...

1.1K6
来自专栏aCloudDeveloper

Mobility Model and Routing Model about the ONE

ONE主要的功能是节点的移动,节点间的相遇情况,路由情况以及消息的处理机制。下面简要介绍下目前ONE自带的六种移动模型和六种路由模型。 Mobility Mod...

1809
来自专栏专知

【干货】快速上手图像识别:用TensorFlow API实现图像分类实例

【导读】1月17日,Arduino社区的编辑SAGAR SHARMA发布一篇基于TensorFlow API的图像识别实例教程。作者通过TensorFlow A...

7707
来自专栏生信技能树

hpv病毒基因研究调研

2015年有一篇文献中提到了hpv的研究现状 As of May 30, 2015, 201 different HPV types had been comp...

3045

扫码关注云+社区