前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >详解PLANET代码(tensorflow)如何加入SAC功能

详解PLANET代码(tensorflow)如何加入SAC功能

作者头像
用户1908973
发布2019-07-17 16:58:54
1K0
发布2019-07-17 16:58:54
举报
文章被收录于专栏:CreateAMindCreateAMind

先说为什么要加?

SAC 算法本质是经过熵强化的回报值最大化算法。在我们单独跑的其他实验中,包括SAC + RNN表现出很好的性能,1.replay buffer使它的采样效率增高 2.尤其在高维连续动作空间,对动作的稳定性连续性有比较好的提升。

1.思路:

整理记录一下思路。如果你拿到的原代码盘根错节地耦合在一起,这种复杂的整体就像瑞士手表的内部结构,牵一发而动全身。而你现在需要加入新功能,且不是类比和可模仿的添加,而是加入如sac这种原代码中不存在的功能,那你可以参考如下:

需要对原代码足够清晰,不是大概逻辑,而是从数据收集,存取,使用到模型中,模型如何运转 的每一步细节非常清晰,最好画成精确的图。因为对于tensorflow,你需要始终有一个概念,tensorflow 是一个静态图,它像一个整体的精密的仪器,每个结点之间如何链接(data dependency and control dependency),所有这些关系你先要设计确定好,这一步叫define model. 等图画好了,建立session,feed数据,去run它一次,你要想象数据流在整个仪器中按你设计好的方式流动一次,run 50000 step就是50000个batch的数据一一进到装置中流动。

2. 如何搞定变态

planet代码写的有点变态,它不是模块化清晰地呈现出 数据模型设计session.run,和模型存取。下面就这四个模块来描述下planet代码分别是如何实现这四个模块的:

2.1  session.run部分

如图中横轴(图给自己看的,看不清没关系,下面文字描述):

2.1.1

session.run每run一个step,数据就在装置中流动一次。它设计了phase这个概念,它加入train和test两个phase. train phase 有50000 个step ,test phase 有100个 step. 

加入好phase后,它每一次run,就会判断这是什么阶段,前50000步都是train,所以每一个step都会这么流动: 1会从train_episode目录中读取数据,计算loss,2接着计算gradient, 3做planning给出action 从而得到一个episode 的数据存入train_episodes目录。

50000 step之后是100 step , 这100 step每run step 会这么流动:会从test_episode目录中读取数据,但并没用上。loss为0所以没有反向传播不会计算和更新gradient,只会做planning输入action从而得到一个episode数据存入test_episode目录。

以上这些控制是通过tf.cond阀门来控制数据流向的,阀门决定了哪一条路是通路,run的是最后的结点,与这个最后结点依次建立关系的且通顺的路才能有数据流过,好像电路一样。

代码语言:javascript
复制
train_loss = tf.cond(
    tf.equal(phase, 'train'),
    lambda: loss,
    lambda: 0 * tf.get_variable('dummy_loss', (), tf.float32))

2.1.2

如果一开始没有生成目录,从start函数进入,会进入random_episode,从而分别收集到5个episode存入train_episode和test_episode目录。

代码语言:javascript
复制
def random_episodes(env_ctor, num_episodes, output_dir=None):

综合上述两点,整个代码在Testing=False,即正常训练时是这样的:

train_episode和test_episode目录先各收集到5个episode,然后开始5万步,每一步从train_episode取数据,去求gradient,再planning出一个episode。5万步之后,是100步的test phase,会从test_episode得数据,不求gradient,或者更准备的描述法是:没有数据流动经过装置中的求梯度结点。它会planning出一个episode (这步是由下图代码决定的).

代码语言:javascript
复制
with tf.variable_scope('simulation'):
  sim_returns = []
  for name, params in config.sim_summaries.items():
    # These are expensive and equivalent for train and test phases, so only
    # do one of them.
    sim_summary, sim_return = tf.cond(
        tf.equal(graph.phase, 'test'),
        lambda: utility.simulate_episodes(config, params, graph, name),
        lambda: ('', 0.0),
        name='should_simulate_' + params.task.name)
    summaries.append(sim_summary)
    sim_returns.append(sim_return)

2.2 数据部分:

2.2.1

首先若进入start 函数是先收集随机数据存到目录中,若进入resume函数,是直接从目录中读取 .npz

2.2.2

数据的存:从env得到并存数据到目录:这个env其实是封装好的,具体:它发出命令与env进程间通信,得到反馈。(另外env那个进程本身还有子进程,分为carla server和carla client不展开说)。

代码语言:javascript
复制
def random_episodes(env_ctor, num_episodes, output_dir=None):
  env = env_ctor()  # env is an <ExternalProcess object>.
  env = wrappers.CollectGymDataset(env, output_dir)
  episodes = []
  for _ in range(num_episodes):
    policy = lambda env, obs: env.action_space.sample()
    done = False
    obs = env.reset()
    # cnt = 0
    while not done:
      action = policy(env, obs)
      obs, _, done, info = env.step(action)  # env.step
    #   cnt += 1
    # print(cnt)
    episodes.append(info['episode'])  # if done is True, info stores the 'episode' information and 'episode' is written in a file(e.g. "~/planet/log_debug/00001/test_episodes").
  return episodes

step层层进去得到数据,然后

代码语言:javascript
复制
def _process_step(self, action, observ, reward, done, info):

这个函数是将数据整理放到目录中。而我改的就是这个函数。

我需要的数据是:[o,a,r,o_next,d]. 考虑到restore还有它后面模型结构的情况,要改动最小,所以我决定只加上o_next(即代码中image_next),其他原数据保持不变。

数据的取:

代码语言:javascript
复制
def _read_episodes_scan(
    directory, batch_size, every, max_episodes=None, **kwargs):
  recent = {}
  cache = {}
  while True:
    index = 0
    for episode in np.random.permutation(list(recent.values())):
      yield episode
      index += 1
      if index > every / 2:
        break
    for episode in np.random.permutation(list(cache.values())):
      index += 1
      yield episode
      if index > every:
        break
    # At the end of the epoch, add new episodes to the cache.
    # print(index, len(recent), max_episodes, len(cache))
    cache.update(recent)
    recent = {}
    filenames = tf.gfile.Glob(os.path.join(directory, '*.npz'))
    filenames = [filename for filename in filenames if filename not in cache]
    if max_episodes:
      filenames = filenames[:max_episodes - len(cache)]
    for filename in filenames:
      recent[filename] = _read_episode(filename, **kwargs)

这个函数做了什么:

一开始目录里的数据全读取,之后再每次得到batch数据时,会分为recent和cache,cache是已经读过的文件列表,recent是目录里新出现的文件。最多各读取5个。这样兼顾了新旧数据,有replay buffer的功效。具体新旧如何配置,可以自行调整。

数据读取利用了这个dataset机制,是一个迭代器,每次从生成器取出batchsize个chunk.

代码语言:javascript
复制
train = tf.data.Dataset.from_generator(                                         # train.output_shapes: e.g. 'image' shape(?, 64, 64, 3)
    functools.partial(loader, train_dir, shape[0], **kwargs), dtypes, shapes)

通过这个函数得到batch数据,注意,这些都是整个图中结点,都用最后的run每一step,来真正的生成数据。

代码语言:javascript
复制
data = get_batch(datasets, trainer.phase, trainer.reset)

2.2.3

SAC算法中数据分两部分:随机部分和用policy生成的部分。

随机部分对比下改好的数据和原始数据:

policy生成部分想了两个方案:1把原始代码的planning生成a替换为policy生成a,但我觉得这里面很冗余,细节也不敢随意更改又是个大坑,本着改动最小,因此放弃 2.所以在define model 函数中加入类似如下函数,可以让他在每一个step去生成一个或若干个episode的数据。

代码语言:javascript
复制
def random_episodes(env_ctor, num_episodes, output_dir=None):
  env = env_ctor()  # env is an <ExternalProcess object>.
  env = wrappers.CollectGymDataset(env, output_dir)
  episodes = []
  for _ in range(num_episodes):
    policy = lambda env, obs: env.action_space.sample()
    done = False
    obs = env.reset()
    # cnt = 0
    while not done:
      action = policy(env, obs)
      obs, _, done, info = env.step(action)  # env.step
    #   cnt += 1
    # print(cnt)
    episodes.append(info['episode'])  # if done is True, info stores the 'episode' information and 'episode' is written in a file(e.g. "~/planet/log_debug/00001/test_episodes").
  return episodes

模型设计部分:

不改动它原来的结构,用tf.cond phase去控制数据不往它的loss流动,主要改动会在这个函数:模仿它的写法,1在单独的文件写好sac的模型,2在config 时配置好heads。其他基本就没什么大问题了。

代码语言:javascript
复制
def compute_losses(
    loss_scales, cell, heads, step, target, prior, posterior, mask,
    free_nats=None, debug=False):
  features = cell.features_from_state(posterior)      # [s,h]
  losses = {}
  for key, scale in loss_scales.items():
    # Skip losses with zero or None scale to save computation.
    if not scale:
      continue
    elif key == 'divergence':
      loss = cell.divergence_from_states(posterior, prior, mask)
      if free_nats is not None:
        loss = tf.maximum(tf.cast(free_nats, tf.float32), loss)
      loss = tf.reduce_sum(loss, 1) / tf.reduce_sum(tf.to_float(mask), 1)
    elif key == 'global_divergence':
      global_prior = {
          'mean': tf.zeros_like(prior['mean']),
          'stddev': tf.ones_like(prior['stddev'])}
      loss = cell.divergence_from_states(posterior, global_prior, mask)
      loss = tf.reduce_sum(loss, 1) / tf.reduce_sum(tf.to_float(mask), 1)
    elif key in heads:
      output = heads[key](features)   # decoder is used.
      loss = -tools.mask(output.log_prob(target[key]), mask)
    else:
      message = "Loss scale of head '{}' is not used."
      print(message.format(key))
      continue
    # Average over the batch and normalize by the maximum chunk length.
    loss = tf.reduce_mean(loss)
    losses[key] = tf.check_numerics(loss, key) if debug else loss
  return losses

restore 部分:

注意用到filter variable功能去restore 它原有的参数,致使与我新加的参数无关。

代码语言:javascript
复制
def add_saver(
    self, include=r'.*', exclude=r'.^', logdir=None, load=True, save=True,
    checkpoint=None):
  """Add a saver to save or load variables.

  Args:
    include: One or more regexes to match variable names to include.
    exclude: One or more regexes to match variable names to exclude.
    logdir: Directory for saver to store and search for checkpoints.
    load: Whether to use the saver to restore variables.
    save: Whether to use the saver to save variables.
    checkpoint: Checkpoint name to load; None for newest.
  """
  variables = tools.filter_variables(include, exclude)
  saver = tf.train.Saver(variables, keep_checkpoint_every_n_hours=2)
  if load:

思路和细节就是这样的,接下来就开始加代码吧。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2019-07-15,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 CreateAMind 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
批量计算
批量计算(BatchCompute,Batch)是为有大数据计算业务的企业、科研单位等提供高性价比且易用的计算服务。批量计算 Batch 可以根据用户提供的批处理规模,智能地管理作业和调动其所需的最佳资源。有了 Batch 的帮助,您可以将精力集中在如何分析和处理数据结果上。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档