TF Boy 之初筵 - 会话封装

在“TF Boy 之初筵 - 分布十三式”里面我们给了一个分布式MNIST训练的例子, 然后在“TF Boy 之初筵 - 机器簇” 我们对第四式: ClusterSpec定义进行了解释。 这里我们对第十式:Session配置进行解释。

前言

在“TF Boy 之初筵 - 机器簇”里面,我们讲了TF的Client-Master结构。 分布式需要用到Worker的配置。

但是不管是不是分布式, TF都默认为Client-Master结构的管理。 所谓会话Session,就是这个Client-Master之间的对话。 从下图, 我们可以看到通过对一个Tensor图(网)的配置, 然后由Session提交到Master运行。

但是Session都是要跑到具体的设备上面的。 所以和Session密切相关的是图Graph (下图左)和 设备Device(下图右)。

但是,我们看到“TF Boy 之初筵 - 机器簇” 和 “TF Boy 之初筵 - 分布十三式” 里对Session的使用却截然不同。

1)在分布式数学表达式计算中:

with tf.Session("grpc://localhost:2223") as sess:
    result = sess.run(z)
    print(result)

2)在分布式MNIST例子里面:

if FLAGS.existing_servers:
    server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
    print("Using existing server at: %s" % server_grpc_url)

    sess = sv.prepare_or_wait_for_session(server_grpc_url,
                                          config=sess_config)
else:
    sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)

print("Worker %d: Session initialization complete." % FLAGS.task_index)

if FLAGS.sync_replicas and is_chief:
    # Chief worker will start the chief queue runner and call the init op.
    sess.run(sync_init_op)
    sv.start_queue_runners(sess, [chief_queue_runner])

到底是怎样变化而来的呢?

Session的变化

从HelloWorld开始

Session是TF必不可少的部分, 就算HelloWorld里面就要定义一个tf.Session()。 然后提交运行sess.run( ... ) !

最原始的Session

最原始的会话有3部分, 初始化, 运行和关闭。 在HelloWorld里面, 由于程序结束, 一切都会Close, 所以把sess.close(),省略了。

# Using the `close()` method.
sess = tf.Session()
sess.run(...)
sess.close()

但是写close()的确不方便。 我们知道Python里面对省略close()常用的是一个with 上下文模块化。

with后的Session

通过with上下文设置, 直接隐藏了close(), 而且方便了阅读!

# Using the context manager.
with tf.Session() as sess:
  sess.run(...)

这样就很容易使用session了。

Session的两种使用方式

简单来说Session有两个运行接口:run( ) 和 eval( )

run(
    feed_dict=None,
    session=None
)
eval(
    feed_dict=None,
    session=None
)

它们分别属于不同的功能, 一个是跑一个Tensor图 tf.Operation.run , 一个是看看一个Tensor的值tf.Tensor.eval。

这两种方法都支持对默认Session的提交, 也就是说, 如果设置了默认Session, 那么这两个接口都不用输入session作为参数了,直接省略了。 这种设置有点像默认浏览器一样。

默认Default Session

默认Session可以通过Session的函数

as_default()

来设置, 一旦设置了, 也可以通过ft的接口直接获取。

tf.get_default_session()

有了这个默认Session的设置, 使用就更方便了。 所以,tensor.eval()本质上就是 tf.get_default_session().run(tensor).

c = tf.constant(...)
sess = tf.Session()
with sess.as_default():
  print(c.eval())

每次都要手动设置默认很麻烦。 能不能直接声明成默认的呢?

交互Interactive Sessino

尤其在IPython被广泛使用下, 直接声明为默认的Session可以在IPthon里面随意使用eval() 接口, 随时查看Tensor的值, 这个还是蛮酷的。 这种情况下直接声明一个交互会话。

sess = tf.InteractiveSession()
a = tf.constant(5.0)
b = tf.constant(6.0)
c = a * b
# We can just use 'c.eval()' without passing 'sess'
print(c.eval())

到现在为止, 我们理解了分布式算术表达式计算里面的Session使用。 但是分布式MNIST的例子还是不明白。 对的, 这得进入下一个境界, 封装。 TensorFlow受到Scikit-Learn超级好用的接口的抽象化的影响。 也开启了超级抽象化的流程。 但是抽象化是一个双刃剑, 好用是好用了, 但是看懂和修改就难喽, Caffe的作者贾帅哥就提出TF的抽象化太重了。

Session的封装

Session除了变化外的进一步封装, 莫过于管理了, 对的。 这个管理就是对应到Session Manager。 不过,有点不一样的是, 这个Session Manager不是独立的, 而是合并在一个叫Supervisor的监管器中。

监管器Supervisor

如果我们再看一下分布式MINIST的代码就能找到这个 tf.train.Supervisor。

if FLAGS.sync_replicas:
    sv = tf.train.Supervisor(
        is_chief=is_chief,
        logdir=train_dir,
        init_op=init_op,
        local_init_op=local_init_op,
        ready_for_local_init_op=ready_for_local_init_op,
        recovery_wait_secs=1,
        global_step=global_step)
else:
    sv = tf.train.Supervisor(
        is_chief=is_chief,
        logdir=train_dir,
        init_op=init_op,
        recovery_wait_secs=1,
        global_step=global_step)

Supervisor是对三个东西的封装, 一个是我们这里要说的SessionManager, 另外两个是Coordinator 协调器 和Savor 保存器。 Coordinator是对多线程运行进行协调的。 Savor是对运行的Summary进行保存,以后方便TensorBoard查看。 所以你看到Supervisor初始化的时候, 对日志地址,初始化, 本地初始化和全局计数器都很看重。

这里专注于SessionManager的封装部分,Coordinator和Savor就提一下,不展开了。

Supervisor提供了两个重要的接口来协助SessionManager:

managed_session ( ... )

prepare_or_wait_for_session ( ... )

异步接口 managed_session

所谓异步,主要是针对下面的同步来说的。 其实本质上就是取代默认的 with 上下文的Session。 这里不用初始化了, 直接从Supervisor获取它管理的那个会话。 或者说经由Supervisor来初始化Session。

  ...create graph...
  my_train_op = ...

  sv = tf.train.Supervisor(logdir="/my/training/directory")
  with sv.managed_session() as sess:
      sess.run(my_train_op)

我们知道如果local模式,就是上面的写法, 如果要分布式模式的时候。 Session的master的协议接口就不能忽视了。

  sv = Supervisor(logdir='/tmp/mydir')
  # Get a TensorFlow session managed by the supervisor.
  with sv.managed_session(FLAGS.master) as sess:
    # Use the session to train the graph.
    while not sv.should_stop():
      sess.run(<my_train_op>)

我们发现, 上面这个managed_session还是和MNIST例子里面的不一样。 对的, 因为MNIST例子里用的是同步的优化器,所以需要建立同步的Session机制。

同步接口 prepare_or_wait_for_session

在同步的情况下, 多个Worker或者Device之间需要等待协同之后,才能建立起会话Session来。

这样, 我们就能把前面MNIST分布式的例子对上去了。

if FLAGS.existing_servers:
    server_grpc_url = "grpc://" + worker_spec[FLAGS.task_index]
    print("Using existing server at: %s" % server_grpc_url)
    sess = sv.prepare_or_wait_for_session(server_grpc_url,
                                          config=sess_config)
else:
    sess = sv.prepare_or_wait_for_session(server.target, config=sess_config)

print("Worker %d: Session initialization complete." % FLAGS.task_index)
if FLAGS.sync_replicas and is_chief:
    # Chief worker will start the chief queue runner and call the init op.
    sess.run(sync_init_op)
    sv.start_queue_runners(sess, [chief_queue_runner])

从上面我们可以看到根据gRPC协议链接初始化sess后, 就可以使用了。

使用 prepare_or_wait_for_session 注意

因为同步prepare的时候, 对于Chief Worker来说, 你可以写成with上下文形式:

with sv.prepare_or_wait_for_session(server.target) as sess:
    '''
    # is chief
    if FLAGS.task_index == 0:
        sv.start_queue_runners(sess, [chief_queue_runner])
        sess.run(init_token_op)
    '''

但是对于非Chief的Worker, 需要等待的, 我们深入看一下 prepare_or_wait_for_session()的实现,就能清楚的看到这一点。 因此不太适合写成with block形式。

"""
# For users who recreate the session with prepare_or_wait_for_session(), we
# need to clear the coordinator's stop_event so that threads managed by the
# coordinator can run.
self._coord.clear_stop()
if self._summary_writer:
  self._summary_writer.reopen()

if self._is_chief:
  sess = self._session_manager.prepare_session(
      master, init_op=self.init_op, saver=self.saver,
      checkpoint_dir=self._logdir, wait_for_checkpoint=wait_for_checkpoint,
      max_wait_secs=max_wait_secs, config=config,
      init_feed_dict=self._init_feed_dict, init_fn=self._init_fn)
  self._write_graph()
  if start_standard_services:
    self.start_standard_services(sess)
else:
  sess = self._session_manager.wait_for_session(master,
                                                config=config,
                                                max_wait_secs=max_wait_secs)
if start_standard_services:
  self.start_queue_runners(sess)
return sess

因为python的with block模块, 自带调用python的特殊函数__enter__ and __exit__ 函数的! 但是非Chief的Worker需要等待的, 如果被with block调用了__exit__ 就无法完成后续的工作了。 这点是特别要注意的!!!

对比 prepare_or_wait_for_session 和 managed_session

如果,你深入细看managed_session的Python实现, 你会惊奇的发现, 它只是对prepare_or_wait_for_session的一个调用。 如下所示:

try:
  sess = self.prepare_or_wait_for_session(
      master=master, config=config,
      start_standard_services=start_standard_services)
  yield sess
except Exception as e:
  self.request_stop(e)
finally:

你会大叫, 我是不是被忽悠了, 明明managed_session就是prepare_or_wait_for_session, 错! 你再仔细看看,它返回用的不是return sess, 而是yield sess。 这就是Python另外一个神奇的关键词。 经常被用来做很高明的处理。 尤其在异步的世界里面。 我一直认为:一个人的Python的水平高低,就看他用没有用过yield,用的溜不溜。

譬如这里有个很简单的异步生成器的例子, 大家有兴趣可以研究一下, 这里就不深入展开了。

import asyncio


async def async_generator():
    for i in range(3):
        await asyncio.sleep(1)
        yield i*i


async def main():
    async for i in async_generator():
        print(i)

所以, 异步是通过yield同步函数来实验的。 这就是managed_session和prepare_or_wait_for_session他们之间的差异!

我们看完了Supervisor对Session的封装, 再稍微提一下SessionManager本身的功能!

SessionManager 会话管理器

SessionManager主要提供准备,等待和恢复Session的功能! 这样在循环iteration的时候,就可以加入检查点checkpoint,方便后续记录跟踪优化过程了。

with tf.Graph().as_default():
   ...add operations to the graph...
  # Create a SessionManager that will checkpoint the model in '/tmp/mydir'.
  sm = SessionManager()
  sess = sm.prepare_session(master, init_op, saver, checkpoint_dir)
  # Use the session to train the graph.
  while True:
    sess.run(<my_train_op>)
with tf.Graph().as_default():
   ...add operations to the graph...
  # Create a SessionManager that will wait for the model to become ready.
  sm = SessionManager()
  sess = sm.wait_for_session(master)
  # Use the session to train the graph.
  while True:
    sess.run(<my_train_op>)

所以, Session管理的功能很重要的还是为了增加检查点, 方便保存日志, 跟踪。 这也是为什么Supervisor要把SessionManager 和 Savor 、 Coordinator封装在一起。 他们都是围绕检查点checkpoint为中心的。

小结:

这里我们详细的解释了Session的变化和封装。 尤其提醒注意了 Session的同步异步初始化, prepare_or_wait_for_session的使用。 跟这个有关, 顺便解释了: 有两个Python的关键词with 和 yield的神奇。

原文发布于微信公众号 - AI2ML人工智能to机器学习(mloptimization)

原文发表时间:2017-07-11

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏Linyb极客之路

一份超详细的Java问题排查工具单

平时的工作中经常碰到很多疑难问题的处理,在解决问题的同时,有一些工具起到了相当大的作用,在此书写下来,一是作为笔记,可以让自己后续忘记了可快速翻阅,二是分享,希...

902
来自专栏小詹同学

人脸识别(二)——训练分类器的补充说明

之前训练分类器时利用的是一个csv文件的读取,这里仅仅用几句话介绍一种简单易行的方法。 说到底,这类问题可以归类于读取指定文件夹...

4086
来自专栏owent

ptmalloc,tcmalloc和jemalloc内存分配策略研究

最近看了glibc的ptmaoolc,Goolge的tcmalloc和jemalloc,顺便做了一点记录。可能有些地方理解地不太对,如有发现还请大神指出。

8261
来自专栏ChaMd5安全团队

网鼎杯 第四场 部分WriteUp

http://2e8c0fad02d147a6b3f23624556a2fe49a4b7d2b64484f3c.game.ichunqiu.com//.git/

1302
来自专栏cs

linux 学习笔记七

来自实验楼的学习笔记,文字基本复制,粘贴。 ? 下载了一个录制gif图的软件,还不错 参考与:在Linux(Ubuntu)下超好用的录屏gif软件!!...

3405
来自专栏微信公众号:Java团长

一份超详细的Java问题排查工具单

平时的工作中经常碰到很多疑难问题的处理,在解决问题的同时,有一些工具起到了相当大的作用,在此书写下来,一是作为笔记,可以让自己后续忘记了可快速翻阅,二是分享,希...

1511
来自专栏陈福荣的专栏

十问 Linux 虚拟内存管理 ( 二 )

最近在做 MySQL 版本升级时( 5.1->5.5 ) , 发现了 mysqld 疑似“内存泄露”现象,但通过 valgrind 等工具检测后,并没发现类似的...

3.6K2
来自专栏owent

pbc的proto3接入

Protobuf 的 proto3发布也有挺长一段时间了。现在很多新项目慢慢转变用proto3来开发。这篇文章主要记录一下我在给pbc写对proto3支持时的一...

1531
来自专栏用户2442861的专栏

操作系统的几种地址详解

http://bbs.chinaunix.net/thread-2083672-1-1.html

2131
来自专栏牛客网

百度Android开发面经(共三面)

今天早上现场面的,一共三面,由于问的问题确实太多了,所以有些遗漏,把记得的问题记录了下。每面差不多一小时。 一面: 1、聊项目 2、MVP模式的优缺点 3、图片...

3935

扫码关注云+社区