首页
学习
活动
专区
工具
TVP
发布
社区首页 >问答首页 >TensorFlow:图表已完成,无法修改?

TensorFlow:图表已完成,无法修改?

提问于 2020-06-25 23:06:18
回答 0关注 0查看 157

我在进行TensorFlow的分布式训练,想通过筛选梯度的方式,来实现较少通信时长的目的。这是我的代码:

import time

import tensorflow as tf

import numpy as np

from tensorflow.examples.tutorials.mnist import input_data # 数据的获取不是本章重点,这里直接导入

FLAGS = tf.app.flags.FLAGS

tf.app.flags.DEFINE_integer('thread_steps', 0, 'Steps run before sync gradients.')

tf.app.flags.DEFINE_string('data_dir', '/tmp/mnist-data', 'Directory for storing mnist data')

tf.app.flags.DEFINE_string("job_name", "worker", "ps or worker")

tf.app.flags.DEFINE_integer("task_id", 0, "Task ID of the worker/ps running the train")

tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222", "ps机")

tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", "worker机,用逗号隔开")

全局变量

MODEL_DIR = "./distribute_model_ckpt/"

DATA_DIR = "./data/mnist/"

BATCH_SIZE = 32

THREAD_STEPS = FLAGS.thread_steps

main函数

def main(self):

代码语言:txt
复制
# ==========  STEP1: 读取数据  ========== #
代码语言:txt
复制
mnist = input\_data.read\_data\_sets(FLAGS.data\_dir, one\_hot=True)    # 读取数据

代码语言:txt
复制
# ==========  STEP2: 声明集群  ========== #
代码语言:txt
复制
# 构建集群ClusterSpec和服务声明
代码语言:txt
复制
ps\_hosts = FLAGS.ps\_hosts.split(",")
代码语言:txt
复制
worker\_hosts = FLAGS.worker\_hosts.split(",")
代码语言:txt
复制
cluster = tf.train.ClusterSpec({"ps":ps\_hosts, "worker":worker\_hosts})    # 构建集群名单
代码语言:txt
复制
server = tf.train.Server(cluster, job\_name=FLAGS.job\_name, task\_index=FLAGS.task\_id)    # 声明服务
代码语言:txt
复制
n\_workers = len(worker\_hosts)    # worker机的数量

代码语言:txt
复制
# ==========  STEP3: ps机内容  ========== #
代码语言:txt
复制
# 分工,对于ps机器不需要执行训练过程,只需要管理变量。server.join()会一直停在这条语句上。
代码语言:txt
复制
if FLAGS.job\_name == "ps":
代码语言:txt
复制
    with tf.device("/cpu:0"):
代码语言:txt
复制
        server.join()

代码语言:txt
复制
# ==========  STEP4: worker机内容  ========== #
代码语言:txt
复制
# 下面定义worker机需要进行的操作
代码语言:txt
复制
is\_chief = (FLAGS.task\_id == 0)    # 选取task\_id=0的worker机作为chief

代码语言:txt
复制
# 通过replica\_device\_setter函数来指定每一个运算的设备。
代码语言:txt
复制
# replica\_device\_setter会自动将所有参数分配到参数服务器上,将计算分配到当前的worker机上。
代码语言:txt
复制
device\_setter = tf.train.replica\_device\_setter(
代码语言:txt
复制
    worker\_device="/job:worker/task:%d" % FLAGS.task\_id,
代码语言:txt
复制
    cluster=cluster)

代码语言:txt
复制
# 这一台worker机器需要做的计算内容
代码语言:txt
复制
with tf.device(device\_setter):
代码语言:txt
复制
    # 输入数据
代码语言:txt
复制
    x = tf.placeholder(name="x-input",shape=[None,28\*28],dtype=tf.float32)    # 输入样本像素为28\*28
代码语言:txt
复制
    # x\_shape = x.get\_shape().as\_list()
代码语言:txt
复制
    # length = x\_shape[1]
代码语言:txt
复制
    # x\_reshaped = tf.reshape(x, [-1,length])
代码语言:txt
复制
    y\_ = tf.placeholder(name="y-input", shape=[None,10],dtype=tf.float32)      # MNIST是十分类
代码语言:txt
复制
    # 第一层(隐藏层)
代码语言:txt
复制
    with tf.variable\_scope("layer1"):
代码语言:txt
复制
        weight1 = tf.get\_variable(name="weight1", shape=[28\*28, 10], initializer=tf.glorot\_normal\_initializer())
代码语言:txt
复制
        biases1 = tf.get\_variable(name="biases1", shape=[10], initializer=tf.glorot\_normal\_initializer())
代码语言:txt
复制
        layer1 = tf.nn.relu(tf.matmul(x, weight1) + biases1, name="layer1")
代码语言:txt
复制
    # 第二层(输出层)
代码语言:txt
复制
    with tf.variable\_scope("layer2"):
代码语言:txt
复制
        weight2 = tf.get\_variable(name="weight2", shape=[10, 10], initializer=tf.glorot\_normal\_initializer())
代码语言:txt
复制
        biases2 = tf.get\_variable(name="biases2", shape=[10], initializer=tf.glorot\_normal\_initializer())
代码语言:txt
复制
        y = tf.add(tf.matmul(layer1, weight2), biases2, name="y")
代码语言:txt
复制
    pred = tf.argmax(y, axis=1, name="pred")
代码语言:txt
复制
    global\_step = tf.contrib.framework.get\_or\_create\_global\_step()    # 必须手动声明global\_step否则会报错
代码语言:txt
复制
    # 损失和优化
代码语言:txt
复制
    cross\_entropy = tf.nn.sparse\_softmax\_cross\_entropy\_with\_logits(logits=y, labels=tf.argmax(y\_, axis=1))
代码语言:txt
复制
    loss = tf.reduce\_mean(cross\_entropy)
代码语言:txt
复制
    with tf.name\_scope('train'):
代码语言:txt
复制
        optimizer = tf.train.GradientDescentOptimizer(0.01)
代码语言:txt
复制
    with tf.name\_scope('gradient'):
代码语言:txt
复制
        gradient\_all = optimizer.compute\_gradients(loss,weight2)
代码语言:txt
复制
        gradients\_node=tf.gradients(loss,weight2)
代码语言:txt
复制
        grads\_holder = [(tf.placeholder(tf.float32,shape=g.get\_shape()), v) 
代码语言:txt
复制
                        for (g, v) in gradient\_all]
代码语言:txt
复制
    # \*\*通过tf.train.SyncReplicasOptimizer函数实现函数同步更新\*\*
代码语言:txt
复制
    opt = tf.train.SyncReplicasOptimizer(
代码语言:txt
复制
        tf.train.GradientDescentOptimizer(0.01),
代码语言:txt
复制
        replicas\_to\_aggregate=n\_workers,
代码语言:txt
复制
        total\_num\_replicas=n\_workers
代码语言:txt
复制
    )
代码语言:txt
复制
    sync\_replicas\_hook = opt.make\_session\_run\_hook(is\_chief)
代码语言:txt
复制
    train\_op = opt.apply\_gradients(grads\_holder, global\_step=global\_step)
代码语言:txt
复制
    if is\_chief:
代码语言:txt
复制
        train\_op = tf.no\_op()
代码语言:txt
复制
    hooks = [sync\_replicas\_hook, tf.train.StopAtStepHook(last\_step=10000)]    # 把同步更新的hook加进来
代码语言:txt
复制
    config = tf.ConfigProto(
代码语言:txt
复制
        allow\_soft\_placement=True,    # 设置成True,那么当运行设备不满足要求时,会自动分配GPU或者CPU。
代码语言:txt
复制
        log\_device\_placement=False,   # 设置为True时,会打印出TensorFlow使用了哪种操作
代码语言:txt
复制
    )

代码语言:txt
复制
    # ==========  STEP5: 打开会话  ========== #
代码语言:txt
复制
    # 对于分布式训练,打开会话时不采用tf.Session(),而采用tf.train.MonitoredTrainingSession()
代码语言:txt
复制
    # 详情参考:https://www.cnblogs.com/estragon/p/10034511.html
代码语言:txt
复制
    with tf.train.MonitoredTrainingSession(
代码语言:txt
复制
            master=server.target,
代码语言:txt
复制
            is\_chief=is\_chief,
代码语言:txt
复制
            # checkpoint\_dir=MODEL\_DIR,
代码语言:txt
复制
            hooks=hooks,
代码语言:txt
复制
            # save\_checkpoint\_secs=10,
代码语言:txt
复制
            config=config) as sess:
代码语言:txt
复制
        print("session started!")
代码语言:txt
复制
        start\_time = time.time()
代码语言:txt
复制
        step = 0
代码语言:txt
复制
        while not sess.should\_stop(): 
代码语言:txt
复制
            # for step in range(THREAD\_STEPS):   
代码语言:txt
复制
            xs,ys= mnist.train.next\_batch(BATCH\_SIZE)    # batch\_size=32
代码语言:txt
复制
            #求每个梯度
代码语言:txt
复制
            grads = sess.run(gradients\_node, feed\_dict={x:xs, y\_: ys})
代码语言:txt
复制
            grads=np.array(grads)
代码语言:txt
复制
            grads=grads.reshape((10,10))
代码语言:txt
复制
            print(grads)
代码语言:txt
复制
            grad\_abs=np.abs(grads)
代码语言:txt
复制
            variance = np.var(grad\_abs, axis=1)
代码语言:txt
复制
            print('variance:',variance)
代码语言:txt
复制
            #取方差最大的几组值
代码语言:txt
复制
            topk\_var=tf.constant(variance)
代码语言:txt
复制
            k=1
代码语言:txt
复制
            output1 = tf.nn.top\_k(topk\_var, k)
代码语言:txt
复制
            with tf.Session() as sess1:
代码语言:txt
复制
                print(sess1.run(output1))
代码语言:txt
复制
                a=output1.indices[-1]
代码语言:txt
复制
                # print(sess1.run(a))  #a是所在TOPK个方差最大的索引值
代码语言:txt
复制
                x=a.eval()
代码语言:txt
复制
                a=int(x)
代码语言:txt
复制
            g\_a=grads[a,:]
代码语言:txt
复制
                # print('g\_a=',g\_a) 
代码语言:txt
复制
                # print('\n')
代码语言:txt
复制
            #取方差最大的一组值中的前几个大的梯度值,设置梯度阈值
代码语言:txt
复制
            g\_a\_abs=np.abs(g\_a)        
代码语言:txt
复制
            k=3
代码语言:txt
复制
            output2 = tf.nn.top\_k(g\_a\_abs, k)
代码语言:txt
复制
            with tf.Session() as sess2:
代码语言:txt
复制
                print(sess2.run(output2))
代码语言:txt
复制
                b=output2.indices[-1]
代码语言:txt
复制
                # print(sess2.run(b))  #a是所在TOPK个方差最大的索引值
代码语言:txt
复制
                x=b.eval()
代码语言:txt
复制
                b=int(x) 
代码语言:txt
复制
                threshold=g\_a\_abs[b]
代码语言:txt
复制
            grad\_end=np.where(grad\_abs<threshold,0,grads)
代码语言:txt
复制
            grad\_end=[grad\_end]
代码语言:txt
复制
            grad\_var={}
代码语言:txt
复制
            for i in range(len(grads\_holder)):
代码语言:txt
复制
                    k = grads\_holder[i][0]
代码语言:txt
复制
                    if k is not None:
代码语言:txt
复制
                          # grad\_var[k] =np.var([tf.reshape(g, [-1]) for g in grads])
代码语言:txt
复制
                          grad\_var[k] =[g[i][0] for g in grad\_end]

代码语言:txt
复制
            \_, loss\_value, global\_step\_value = sess.run([train\_op, loss, global\_step], feed\_dict=grad\_var)
代码语言:txt
复制
            if step > 0 and step % 100 == 0:
代码语言:txt
复制
                duration = time.time() - start\_time
代码语言:txt
复制
                sec\_per\_batch = duration / global\_step\_value
代码语言:txt
复制
                print("After %d training steps(%d global steps), loss on training batch is %g (%.3f sec/batch)" % (step, global\_step\_value, loss\_value, sec\_per\_batch))
代码语言:txt
复制
                print('Training elapsed time:%f s' % duration)
代码语言:txt
复制
            step += 1

if __name__ == "__main__":

代码语言:txt
复制
tf.app.run()

这是我的报错信息:

回答

和开发者交流更多问题细节吧,去 写回答
相关文章

相似问题

相关问答用户
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档