专栏首页ArrayZoneYour的专栏TensorFlow强化学习入门(3)——构建仿真环境来进行强化学习

TensorFlow强化学习入门(3)——构建仿真环境来进行强化学习

在上一篇文章中,我演示了如何设计一个基于策略的强化学习agent来解决CartPole任务。在本文中,我们将从另一个角度重新审视这个问题——如何构建仿真环境来提升agent在当前环境下的性能。

Model Network : 建模网络,本文中称为仿真环境

如果你还没有阅读本系列之前的文章并且还是强化学习的初学者,我推荐你按照顺序来阅读,文末有之前文章的链接。

仿真环境是什么,我们为什么要引入仿真环境?在上图中,仿真环境是一个用于模拟真实世界中的动态问题的神经网络。拿我们之前CartPole的问题来说,我们需要一个可以根据之前的位置和行动来预测木棒下次位置的模型。在学习得到一个精确的模型之后,我们每次就可以直接用模型来训练我们的agent而不是必须放在真实环境中训练。当然如果原始的环境就是仿真得到的(像CartPole就模拟了真实世界中的物理情景),我们就不必再这样做了。

与计算机模拟不同,真实环境的部署需要时间,而且真实世界中的物理法则也使得环境初始化等一系列操作的可行性大幅下降。相反地,通过模型来模拟环境可以节省时间和成本,agent也可以“假想”自己就在真实环境中运动,我们可以直接在这个虚拟的环境中训练决策网络。只要我们的模拟环境足够优秀,agent即使完全在虚拟环境中训练也可以在真实环境中达到很好的性能。

那么我们如何使用TensorFlow实现这个需求呢?按照我上面所说,我们需要一个能够根据之前的观测和行动转化输出得到新的观测值,收益和状态的神经网络。我们将使用真实环境来训练我们的仿真模型,然后使用仿真模型来训练我们的agent。通过这个方法,我们可以在让agent在不直接接触到真实环境的情况下习得行动策略!下面给出是实现代码(提供版本为评论区重写的版本(译者对代码做了一定修正),作者的原始代码点这里(这个版本有bug,底部有评论指正)查看)

# 译者运行环境为jupyterlab,每个分割线对应一个代码块
import numpy as np
import tensorflow as tf
%matplotlib inline
import matplotlib.pyplot as plt
# --------------------------------------------------
import gym
env = gym.make("CartPole-v0")
# --------------------------------------------------
# 超参数
learning_rate = 1e-2
# 收益的折算因子
gamma = 0.99
# RMSProp中的衰减因子
decay_rate = 0.99

model_batch_size = 3
policy_batch_size = 3

dimen = 4 # 环境中的维度数
# --------------------------------------------------
# 辅助函数
def discount(r, gamma=0.99, standardize=False):
    """
    输入一维的收益数组,输出折算后的收益值,例:f([1, 1, 1], 0.99) -> [1, 0.99, 0.9801],折算后根据要求选择进行归一化
    """
    discounted = np.array([val * (gamma ** i) for i, val in enumerate(r)])
    if standardize:
        discounted -= np.mean(discounted)
        discounted /= np.std(discounted)
    return discounted

def step_model(sess, xs, action):
    """ 使用神经网络模型根据之前的状态和行动来生成新的状态 """
    # 上一状态
    x = xs[-1].reshape(1, -1)
    # 存储行动
    x = np.hstack([x, [[action]]])
    # 预测输出
    output_y = sess.run(predicted_state_m, feed_dict={input_x_m: x})
    # predicted_state_m == [state_0, state_1, state_2, state_3, reward, done]
    output_next_state = output_y[:,:4]
    output_reward = output_y[:,4]
    output_done = output_y[:,5]
    # 限制输出范围
    output_next_state[:,0] = np.clip(output_next_state[:,0], -2.4, 2.4)
    output_next_state[:,2] = np.clip(output_next_state[:,2], -0.4, 0.4)
    # 完成的阀值设置
    output_done = True if output_done > 0.01 or len(xs) > 500 else False
    return output_next_state, output_reward, output_done
# --------------------------------------------------
# 用于仿真的神经网络
# 架构
# 网络中包含两个具有256个神经元的层,relu函数为激活函数。共有三个输出层,分别输出下一个观测值,收益值和游戏结束的标志
tf.reset_default_graph()

num_hidden_m = 256
# 由于要输入决策网络输出的行动,维度+1
dimen_m = dimen + 1
# 输入占位符
input_x_m = tf.placeholder(tf.float32, [None, dimen_m])
# 第一层
W1_m = tf.get_variable("W1_m", shape=[dimen_m, num_hidden_m], initializer=tf.contrib.layers.xavier_initializer())
B1_m = tf.Variable(tf.zeros([num_hidden_m]), name="B1M")
layer1_m = tf.nn.relu(tf.matmul(input_x_m, W1_m) + B1_m)
# 第二层
W2_m = tf.get_variable("W2_m", shape=[num_hidden_m, num_hidden_m], initializer=tf.contrib.layers.xavier_initializer())
B2_m = tf.Variable(tf.zeros([num_hidden_m]), name="B2_m")
layer2_m = tf.nn.relu(tf.matmul(layer1_m, W2_m) + B2_m)
# 第三层(输出层)
# 注意这里有三个单独的输出层
W_obs_m = tf.get_variable("W_obs_m", shape=[num_hidden_m, 4], initializer=tf.contrib.layers.xavier_initializer())
B_obs_m = tf.Variable(tf.zeros([4]), name="B_obs_m")
W_reward_m = tf.get_variable("W_reward_m", shape=[num_hidden_m, 1], initializer=tf.contrib.layers.xavier_initializer())
B_reward_m = tf.Variable(tf.zeros([1]), name="B_reward_m")
W_done_m = tf.get_variable("W_done_m", shape=[num_hidden_m, 1], initializer=tf.contrib.layers.xavier_initializer())
B_done_m = tf.Variable(tf.zeros([1]), name="B_done_m")

output_obs_m = tf.matmul(layer2_m, W_obs_m) + B_obs_m
output_reward_m = tf.matmul(layer2_m, W_reward_m) + B_reward_m
output_done_m = tf.sigmoid(tf.matmul(layer2_m, W_done_m) + B_done_m)

# 训练所需的输入占位符
actual_obs_m = tf.placeholder(tf.float32, [None, dimen_m], name="actual_obs")
actual_reward_m = tf.placeholder(tf.float32, [None, 1], name="actual_reward")
actual_done_m = tf.placeholder(tf.float32, [None, 1], name="actual_done")

# 整合输出
predicted_state_m = tf.concat([output_obs_m, output_reward_m, output_done_m], axis=1)

# 损失函数
loss_obs_m = tf.square(actual_obs_m[-1, 0:4] - output_obs_m)
loss_reward_m = tf.square(actual_reward_m - output_reward_m)
loss_done_m = -tf.log(actual_done_m * output_done_m + (1 - actual_done_m) * (1 - output_done_m))

# 模型损失为三个输出损失的平均值
loss_m = tf.reduce_max(loss_obs_m + loss_reward_m + loss_done_m)

adam_m = tf.train.AdamOptimizer(learning_rate=learning_rate)
update_m = adam_m.minimize(loss_m)
# --------------------------------------------------
# 决策网络
num_hidden_p = 10 # 决策网络中隐藏层神经元个数

input_x_p = tf.placeholder(tf.float32, [None, dimen], name="input_x")

# 第一层
W1_p = tf.get_variable("W1", shape=[dimen,num_hidden_p], 
                     initializer=tf.contrib.layers.xavier_initializer())
layer1_p = tf.nn.relu(tf.matmul(input_x_p, W1_p))

# 第二层
W2_p = tf.get_variable("W2", shape=[num_hidden_p, 1], 
                     initializer=tf.contrib.layers.xavier_initializer())
output_p = tf.nn.sigmoid(tf.matmul(layer1_p, W2_p))

# 训练所需的输入占位符
input_y_p = tf.placeholder(tf.float32, shape=[None, 1], name="input_y")
advantages_p = tf.placeholder(tf.float32, shape=[None,1], name="reward_signal")

# 损失函数
# 下面表达式等价于 0 if input_y_p == output_p else 1
log_lik_p = tf.log(input_y_p * (input_y_p - output_p) + 
                 (1 - input_y_p) * (input_y_p + output_p))

# We'll be trying to maximize log liklihood
loss_p = -tf.reduce_mean(log_lik_p * advantages_p)

# 梯度
W1_grad_p = tf.placeholder(tf.float32,name="W1_grad")
W2_grad_p = tf.placeholder(tf.float32,name="W2_grad")
batch_grad_p = [W1_grad_p, W2_grad_p]
trainable_vars_p = [W1_p, W2_p]
grads_p = tf.gradients(loss_p, trainable_vars_p)

# 优化器
adam_p = tf.train.AdamOptimizer(learning_rate=learning_rate)

# 更新函数
update_grads_p = adam_p.apply_gradients(zip(batch_grad_p, [W1_p, W2_p]))
# --------------------------------------------------
# 初始化并测试模型运行情况
init = tf.global_variables_initializer()
sess = tf.Session()
sess.run(init)
random_obs = np.random.random(size=[1, env.observation_space.shape[0]])
random_action = env.action_space.sample()

print("obs: {}\naction: {}\noutput obs: {}\nouput reward: {}\noutput done: {}\noutput policy: {}".format(
        random_obs,
        random_action,
        sess.run(output_obs_m,feed_dict={input_x_m: np.hstack([random_obs, [[random_action]]])}),
        sess.run(output_reward_m,feed_dict={input_x_m: np.hstack([random_obs, [[random_action]]])}),
        sess.run(output_done_m,feed_dict={input_x_m: np.hstack([random_obs, [[random_action]]])}),
        sess.run(output_p,feed_dict={input_x_p: random_obs})))
# --------------------------------------------------
# 在真实环境中训练
real_rewards = []
num_episodes = 5000

# Trigger used to decide whether we should train from model or from real environment
train_from_model = False
train_first_steps = 500

# 初始化变量跟踪观测值,收益和行动
observations = np.empty(0).reshape(0,dimen)
rewards = np.empty(0).reshape(0,1)
actions = np.empty(0).reshape(0,1)

# 梯度
grads = np.array([np.zeros(var.get_shape().as_list()) for var in trainable_vars_p])

num_episode = 0

observation = env.reset()

while num_episode < num_episodes:
    observation = observation.reshape(1,-1)
    
    # 输出决策
    policy = sess.run(output_p, feed_dict={input_x_p: observation})
    
    # 根据策略选定行为,引入一定概率的随机决策
    action = 0 if policy > np.random.uniform() else 1

    # 跟踪观测值和行动
    observations = np.vstack([observations, observation])
    actions = np.vstack([actions, action])
    
    # 从仿真环境或者真实环境中获取下一个观测值
    if train_from_model:
        observation, reward, done = step_model(sess, observations, action)
    else:
        observation, reward, done, _ = env.step(action)
        
    # 跟踪收益
    rewards = np.vstack([rewards, reward])
    dones = np.zeros(shape=(len(observations),1))
    
    # 游戏结束或者迭代次数够多
    if done or len(observations) > 300:
        print("\r{} / {} ".format(num_episode, num_episodes),end="")

        # 判断训练环境
        if not train_from_model:
             # 训练模型的上一个状态
            states = np.hstack([observations, actions])
            prev_states = states[:-1,:]
            next_states = states[1:, :]
            next_rewards = rewards[1:, :]
            next_dones = dones[1:, :]

            feed_dict = {input_x_m: prev_states.astype(np.float32), 
                         actual_obs_m: next_states.astype(np.float32),
                        actual_done_m: next_dones.astype(np.float32),
                        actual_reward_m: next_rewards.astype(np.float32)}

            loss, _ = sess.run([loss_m, update_m], feed_dict=feed_dict)
            
            real_rewards.append(sum(rewards))
            
        
        # 折算收益
        disc_rewards = discount(rewards, standardize=True)
        
        # 计算梯度
        grads += sess.run(grads_p, feed_dict={input_x_p: observations,
                                            input_y_p: actions,
                                            advantages_p: disc_rewards})
        
        num_episode += 1
        
        observation = env.reset()

        # 重置变量
        observations = np.empty(0).reshape(0,dimen)
        rewards = np.empty(0).reshape(0,1)
        actions = np.empty(0).reshape(0,1)
        
        # Toggle between training from model and from real environment allowing sufficient time 
        # to train the model before its used for learning policy
        if num_episode > train_first_steps:
            train_from_model = not train_from_model 

        # If batch full
        if num_episode % policy_batch_size == 0:
            
            # 更新梯度
            sess.run(update_grads_p, feed_dict={W1_grad_p: grads[0], W2_grad_p: grads[1]})
            
            # 重置梯度
            grads = np.array([np.zeros(var.get_shape().as_list()) for var in trainable_vars_p])
            
            # 周期性输出提示信息
            if (num_episode % (100 * policy_batch_size) == 0):
                print("Episode {} last batch rewards: {}".format(
                        num_episode, sum(real_rewards[-policy_batch_size:])/policy_batch_size))
            
            # 模型性能足够好时退出
            if (sum(real_rewards[-10:]) / 10. >= 190): # 可以调至199等更高的值(200为满分)
                print("Episode {} Training complete with total score of: {}".format(
                        num_episode, sum(real_rewards[-policy_batch_size:])/policy_batch_size))
                break
# --------------------------------------------------
# 测试模型效果

observation = env.reset()
reward_sum = 0

model_losses = []

while True:
    env.render()
    
    observation = np.reshape(observation, [1, -1])
    policy = sess.run(output_p, feed_dict={input_x_p: observation})
    action = 0 if policy > 0.5 else 1
    observation, reward, done, _ = env.step(action)
    reward_sum += reward
    
    if done:
        print("Total score: {}".format(reward_sum))
        break
299 / 5000 Episode 300 last batch rewards: [34.66666667]
599 / 5000 Episode 600 last batch rewards: [75.66666667]
899 / 5000 Episode 900 last batch rewards: [61.]
1199 / 5000 Episode 1200 last batch rewards: [200.]
1499 / 5000 Episode 1500 last batch rewards: [194.33333333]
1799 / 5000 Episode 1800 last batch rewards: [169.33333333]
1979 / 5000 Episode 1980 Training complete with total score of: [200.]

到这里,我们引入了两个神经网络,有很多超参数,鼓励读者尝试自己调整超参数来使模型训练更好更快。在下一节我们会探究如何使用卷积神经网络来在更复杂的环境(如雅达利游戏)中学习。

系列文章(翻译进度):

  1. (0) Q-Learning的查找表实现和神经网络实现
  2. (1) 双臂赌博机
  3. (1.5) — 上下文赌博机
  4. (2) —— 基于策略的Agents
  5. (3) —— 构建仿真环境来进行强化学习
  6. Part 4 — Deep Q-Networks and Beyond
  7. Part 5 — Visualizing an Agent’s Thoughts and Actions
  8. Part 6 — Partial Observability and Deep Recurrent Q-Networks
  9. Part 7 — Action-Selection Strategies for Exploration
  10. Part 8 — Asynchronous Actor-Critic Agents (A3C)

原文链接:https://medium.com/@awjuliani/simple-reinforcement-learning-with-tensorflow-part-3-model-based-rl-9a6fe0cce99

原文作者:Arthur Juliani

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 使用TensorFlow实现股票价格预测深度学习模型

    Sebastian Heinz. A simple deep learning model for stock price prediction using T...

    ArrayZoneYour
  • 微服务革命:应用,数据的容器化

    近几年来,微服务架构和基于容器的虚拟化技术已经越来越多地在软件开发社区中被提及。Adrian Cockcroft就是这方面公认的极有远见者之一,他在2014年欧...

    ArrayZoneYour
  • TensorFlow强化学习入门(2)——基于策略的Agents

    在本教程系列的(1)中,我演示了如何构建一个agent来在多个选择中选取最有价值的一个。在本文中,我将讲解如何得到一个从现实世界中获取 观测值 ,并作出 长期收...

    ArrayZoneYour
  • tf21: 身份证识别——识别身份证号

    上一篇: 身份证识别——生成身份证号和汉字 代码直接参考,验证码识别 #!/usr/bin/env python2 # -*- coding: utf-8 -*...

    MachineLP
  • tf API 研读3:Building Graphs

    tensorflow是通过计算图的方式建立网络。 比喻说明: 结构:计算图建立的只是一个网络框架。编程时框架中不会出现任何的实际值,所有权重(weight)和偏...

    MachineLP
  • Tensorflow基础入门十大操作总结

    TensorFlow 是一个开源的、基于 Python 的机器学习框架,它由 Google 开发,提供了 Python,C/C++、Java、Go、R 等多种编...

    Datawhale
  • Tensorflow基础入门十大操作总结

    TensorFlow 是一个开源的、基于 Python 的机器学习框架,它由 Google 开发,提供了 Python,C/C++、Java、Go、R 等多种编...

    石晓文
  • 中阶API示范

    TensorFlow有5个不同的层次结构:即硬件层,内核层,低阶API,中阶API,高阶API。本章我们将以线性回归为例,直观对比展示在低阶API,中阶API,...

    lyhue1991
  • 低阶API示范

    TensorFlow有5个不同的层次结构:即硬件层,内核层,低阶API,中阶API,高阶API。本章我们将以线性回归为例,直观对比展示在低阶API,中阶API,...

    lyhue1991
  • TensorFlow基础:常量

    例如 tf.zeros,tf.ones,tf.zeros_like,tf.diag ...

    lyhue1991

扫码关注云+社区

领取腾讯云代金券