前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >3个关键点,把你的TensorFlow代码重构为分布式!

3个关键点,把你的TensorFlow代码重构为分布式!

作者头像
崔庆才
发布2018-06-25 12:07:58
6790
发布2018-06-25 12:07:58
举报
文章被收录于专栏:进击的Coder

对于机器学习模型,分布式大致分两类:模型分布式和数据分布式:

模型分布式非常复杂和灵活, 它把整个机器学习模型分割,分散在多个节点上,在每个节点上计算模型的各个部分, 最后把结果拼接起来。如果你造了一个并行性很高的深度网络,比如这个,那就更棒了。你只要在每个节点上,计算不同的层,最后把各个层的异步结果通过较为精妙的方式汇总起来。

而我们今天要手把手教大家的是数据分布式。模型把数据拷贝到多个节点上, 每次算Epoch迭代的时候,每个节点对于一个batch的梯度都会有一个计算值,一个batch结束后,所有节点把梯度值汇总起来(ps参数服务器的任务就是汇总所有参数更新),从而进行更新。这就会导致每个batch的计算都比非分布式方法精准。相对非分布式,并行方法下,同样的迭代次数,收敛较快。

如何把自己的单机TensorFlow代码变为分布式的代码?

本文将手把手告诉大家3个关键点,重构自己的TensorFlow代码为分布式代码(开始前请大家前用1分钟了解文末的参考文献,了解基本知识):

关键点1: 定义FLAGS全局变量,获得ps参数服务器,worker工作服务器等分布式全局信息。

代码语言:javascript
复制
# Define parameters
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_float('learning_rate', 0.00003, 'Initial learning rate.')
tf.app.flags.DEFINE_integer('steps_to_validate', 1000,
                            'Steps to validate and print loss')

# For distributed
tf.app.flags.DEFINE_string("ps_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
                           "Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("issync", 0, "issync mode")

以上代码是从命令行获得变量的简单方式。使用TensorFlow自带的FLAGS命令行工具。

代码语言:javascript
复制
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(
    cluster,
    job_name=FLAGS.job_name,

上述代码教你如何获得命令行变量到python变量。ps_hosts代表所有参数服务器,work_hosts是所有工作服务器。cluster组装一个分布式集群定义。server代表本地为任务分配的服务器。

关键点2: 在流图Graph定义阶段, 加入“参数服务器”和“工作服务器”的判断,重构Graph定义代码。

代码语言:javascript
复制
if FLAGS.job_name == "ps":
    server.join()
elif FLAGS.job_name == "worker":
    with tf.device(tf.train.replica_device_setter(
                    worker_device="/job:worker/task:%d" % FLAGS.task_index,
                    cluster=cluster)):
        # 这里是各个worker工作服务器下的graph定义。

如果当前服务器是ps参数服务器,当前服务器就要执行join方法汇总更新的参数。

如果当前是工作服务器,构建deVice设备上下文,复制数据到各个设备,并且知道任务号,之后再定义原先的Graph。

关键点3: 最后,重构你原来的graph定义和TensorFlow Session训练的方式细节。

代码语言:javascript
复制
    grads_and_vars = optimizer.compute_gradients(cost)
    correct_prediction = tf.equal(y_pred_cls, y_true_cls)
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))

    rep_op = tf.train.SyncReplicasOptimizer(optimizer,
                                            replicas_to_aggregate=len(
                                            worker_hosts),
                                            #replica_id=FLAGS.task_index,
                                            total_num_replicas=len(
                                            worker_hosts),
                                            use_locking=True)
    train_op = rep_op.apply_gradients(grads_and_vars,
                                global_step=global_step)
    init_token_op = rep_op.get_init_tokens_op()
    chief_queue_runner = rep_op.get_chief_queue_runner()

    init_op = tf.initialize_all_variables()
    saver = tf.train.Saver()
    train_batch_size = batch_size
    tf.summary.scalar('cost', cost)
    tf.summary.scalar('accuracy', accuracy)
    summary_op = tf.summary.merge_all()
    summary_writer = tf.summary.FileWriter('./summary_log/train')
    summary_writer_test = tf.summary.FileWriter('./summary_log/test')

sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
                            logdir="./checkpoint/",
                            init_op=init_op,
                            summary_op=None,
                            saver=saver,
                            global_step=global_step,
                            save_model_secs=60)
session = sv.prepare_or_wait_for_session(server.target)
sv.start_queue_runners(session, [chief_queue_runner])
session.run(init_token_op)

训练中稍有不同的是上面这段代码,graph定义完毕后,我们要用optimizer.compute_gradients方法计算梯度得到grads_and_vars对象。通过SyncReplicasOptimizer这个特殊的优化器,进行梯度的计算,即rep_op.apply_gradients(grads_and_vars, global_step=global_step)方法。

计算完毕得到的train_op对象就能在未来想用session.run()的地方使用了:

代码语言:javascript
复制
session.run([train_op, cost, global_step], feed_dict=feed_dict_train)

注意以上三个关键点, 你离TensorFlow并行化已经八九不离十了。

实际重构的例子,请看我github上识别猫狗的基本程序:

分布式版:

https://github.com/yanchao727/tensorflow_kaggle_cat_dog/blob/master/cnn.distributed.py

单机版:

https://github.com/yanchao727/tensorflow_kaggle_cat_dog/blob/master/cnn.py

参考文献

  1. https://github.com/thewintersun/distributeTensorflowExample
  2. https://www.tensorflow.org/deploy/distributed
  3. http://blog.csdn.net/luodongri/article/details/52596780
  4. https://www.slideshare.net/stanleywanguni/distributed-machine-learning
  5. https://www.quora.com/How-is-parallel-computing-used-in-machine-learning

本文转载自:David 9的博客 — 不怕"过拟合"

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2018-04-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 进击的Coder 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 参考文献
相关产品与服务
命令行工具
腾讯云命令行工具 TCCLI 是管理腾讯云资源的统一工具。使用腾讯云命令行工具,您可以快速调用腾讯云 API 来管理您的腾讯云资源。此外,您还可以基于腾讯云的命令行工具来做自动化和脚本处理,以更多样的方式进行组合和重用。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档