对于机器学习模型,分布式大致分两类:模型分布式和数据分布式:
模型分布式非常复杂和灵活, 它把整个机器学习模型分割,分散在多个节点上,在每个节点上计算模型的各个部分, 最后把结果拼接起来。如果你造了一个并行性很高的深度网络,比如这个,那就更棒了。你只要在每个节点上,计算不同的层,最后把各个层的异步结果通过较为精妙的方式汇总起来。
而我们今天要手把手教大家的是数据分布式。模型把数据拷贝到多个节点上, 每次算Epoch迭代的时候,每个节点对于一个batch的梯度都会有一个计算值,一个batch结束后,所有节点把梯度值汇总起来(ps参数服务器的任务就是汇总所有参数更新),从而进行更新。这就会导致每个batch的计算都比非分布式方法精准。相对非分布式,并行方法下,同样的迭代次数,收敛较快。
如何把自己的单机TensorFlow代码变为分布式的代码?
本文将手把手告诉大家3个关键点,重构自己的TensorFlow代码为分布式代码(开始前请大家前用1分钟了解文末的参考文献,了解基本知识):
关键点1: 定义FLAGS全局变量,获得ps参数服务器,worker工作服务器等分布式全局信息。
# Define parameters
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_float('learning_rate', 0.00003, 'Initial learning rate.')
tf.app.flags.DEFINE_integer('steps_to_validate', 1000,
'Steps to validate and print loss')
# For distributed
tf.app.flags.DEFINE_string("ps_hosts", "",
"Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("worker_hosts", "",
"Comma-separated list of hostname:port pairs")
tf.app.flags.DEFINE_string("job_name", "", "One of 'ps', 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("issync", 0, "issync mode")
以上代码是从命令行获得变量的简单方式。使用TensorFlow自带的FLAGS命令行工具。
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
server = tf.train.Server(
cluster,
job_name=FLAGS.job_name,
上述代码教你如何获得命令行变量到python变量。ps_hosts代表所有参数服务器,work_hosts是所有工作服务器。cluster组装一个分布式集群定义。server代表本地为任务分配的服务器。
关键点2: 在流图Graph定义阶段, 加入“参数服务器”和“工作服务器”的判断,重构Graph定义代码。
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# 这里是各个worker工作服务器下的graph定义。
如果当前服务器是ps参数服务器,当前服务器就要执行join方法汇总更新的参数。
如果当前是工作服务器,构建deVice设备上下文,复制数据到各个设备,并且知道任务号,之后再定义原先的Graph。
关键点3: 最后,重构你原来的graph定义和TensorFlow Session训练的方式细节。
grads_and_vars = optimizer.compute_gradients(cost)
correct_prediction = tf.equal(y_pred_cls, y_true_cls)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
rep_op = tf.train.SyncReplicasOptimizer(optimizer,
replicas_to_aggregate=len(
worker_hosts),
#replica_id=FLAGS.task_index,
total_num_replicas=len(
worker_hosts),
use_locking=True)
train_op = rep_op.apply_gradients(grads_and_vars,
global_step=global_step)
init_token_op = rep_op.get_init_tokens_op()
chief_queue_runner = rep_op.get_chief_queue_runner()
init_op = tf.initialize_all_variables()
saver = tf.train.Saver()
train_batch_size = batch_size
tf.summary.scalar('cost', cost)
tf.summary.scalar('accuracy', accuracy)
summary_op = tf.summary.merge_all()
summary_writer = tf.summary.FileWriter('./summary_log/train')
summary_writer_test = tf.summary.FileWriter('./summary_log/test')
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir="./checkpoint/",
init_op=init_op,
summary_op=None,
saver=saver,
global_step=global_step,
save_model_secs=60)
session = sv.prepare_or_wait_for_session(server.target)
sv.start_queue_runners(session, [chief_queue_runner])
session.run(init_token_op)
训练中稍有不同的是上面这段代码,graph定义完毕后,我们要用optimizer.compute_gradients方法计算梯度得到grads_and_vars对象。通过SyncReplicasOptimizer这个特殊的优化器,进行梯度的计算,即rep_op.apply_gradients(grads_and_vars, global_step=global_step)方法。
计算完毕得到的train_op对象就能在未来想用session.run()的地方使用了:
session.run([train_op, cost, global_step], feed_dict=feed_dict_train)
注意以上三个关键点, 你离TensorFlow并行化已经八九不离十了。
实际重构的例子,请看我github上识别猫狗的基本程序:
分布式版:
https://github.com/yanchao727/tensorflow_kaggle_cat_dog/blob/master/cnn.distributed.py
单机版:
https://github.com/yanchao727/tensorflow_kaggle_cat_dog/blob/master/cnn.py
本文转载自:David 9的博客 — 不怕"过拟合"