TensorFlow应用实战-18-Policy Gradient算法

Policy Gradient算法

mark

policy Gradient算法不止一种。

有兴趣的话:

深度增强学习之Policy Gradient方法1

https://zhuanlan.zhihu.com/p/21725498

# -*- coding: UTF-8 -*-

"""
Policy Gradient 算法(REINFORCE)。做决策的部分,相当于机器人的大脑
"""

import numpy as np
import tensorflow as tf

try:
    xrange = xrange  # Python 2
except:
    xrange = range   # Python 3


# 策略梯度 类
class PolicyGradient:
    def __init__(self,
                 lr,      # 学习速率
                 s_size,  # state/observation 的特征数目
                 a_size,  # action 的数目
                 h_size,  # hidden layer(隐藏层)神经元数目
                 discount_factor=0.99  # 折扣因子
    ):
        self.gamma = discount_factor  # Reward 递减率

        # 神经网络的前向传播部分。大脑根据 state 来选 action
        self.state_in = tf.placeholder(shape=[None, s_size], dtype=tf.float32)

        # 第一层全连接层
        hidden = tf.layers.dense(self.state_in, h_size, activation=tf.nn.relu)

        # 第二层全连接层,用 Softmax 来算概率
        self.output = tf.layers.dense(hidden, a_size, activation=tf.nn.softmax)

        # 直接选择概率最大的那个 action
        self.chosen_action = tf.argmax(self.output, 1)

        # 下面主要是负责训练的一些过程
        # 我们给神经网络传递 reward 和 action,为了计算 loss
        # 再用 loss 来调节神经网络的参数
        self.reward_holder = tf.placeholder(shape=[None], dtype=tf.float32)
        self.action_holder = tf.placeholder(shape=[None], dtype=tf.int32)

        self.indexes = tf.range(0, tf.shape(self.output)[0]) * tf.shape(self.output)[1] + self.action_holder
        self.outputs = tf.gather(tf.reshape(self.output, [-1]), self.indexes)

        # 计算 loss(和平时说的 loss 不一样)有一个负号
        # 因为 TensorFlow 自带的梯度下降只能 minimize(最小化)loss
        # 而 Policy Gradient 里面是要让这个所谓的 loss 最大化
        # 因此需要反一下。对负的去让它最小化,就是让它正向最大化
        self.loss = -tf.reduce_mean(tf.log(self.outputs) * self.reward_holder)

        # 得到可被训练的变量
        train_vars = tf.trainable_variables()
        
        self.gradient_holders = []
        
        for index, var in enumerate(train_vars):
            placeholder = tf.placeholder(tf.float32, name=str(index) + '_holder')
            self.gradient_holders.append(placeholder)

        # 对 loss 以 train_vars 来计算梯度
        self.gradients = tf.gradients(self.loss, train_vars)

        optimizer = tf.train.AdamOptimizer(learning_rate=lr)
        # apply_gradients 是 minimize 方法的第二部分,应用梯度
        self.update_batch = optimizer.apply_gradients(zip(self.gradient_holders, train_vars))

    # 计算折扣后的 reward
    # 公式: E = r1 + r2 * gamma + r3 * gamma * gamma + r4 * gamma * gamma * gamma ...
    def discount_rewards(self, rewards):
        discounted_r = np.zeros_like(rewards)
        running_add = 0
        for t in reversed(xrange(0, rewards.size)):
            running_add = running_add * self.gamma + rewards[t]
            discounted_r[t] = running_add
        return discounted_r
# -*- coding: UTF-8 -*-

"""
游戏的主程序,调用机器人的 Policy Gradient 决策大脑
"""

import numpy as np
import gym
import tensorflow as tf

from policy_gradient import PolicyGradient


# 伪随机数。为了能够复现结果
np.random.seed(1)

env = gym.make('CartPole-v0')
env = env.unwrapped    # 取消限制
env.seed(1)   # 普通的 Policy Gradient 方法, 回合的方差比较大, 所以选一个好点的随机种子

print(env.action_space)            # 查看这个环境中可用的 action 有多少个
print(env.observation_space)       # 查看这个环境中 state/observation 有多少个特征值
print(env.observation_space.high)  # 查看 observation 最高取值
print(env.observation_space.low)   # 查看 observation 最低取值

update_frequency = 5   # 更新频率,多少回合更新一次
total_episodes = 3000  # 总回合数

# 创建 PolicyGradient 对象
agent = PolicyGradient(lr=0.01,
                       a_size=env.action_space.n,   # 对 CartPole-v0 是 2, 两个 action,向左/向右
                       s_size=env.observation_space.shape[0],  # 对 CartPole-v0 是 4
                       h_size=8)

with tf.Session() as sess:
    # 初始化所有全局变量
    sess.run(tf.global_variables_initializer())
    
    # 总的奖励
    total_reward = []

    gradient_buffer = sess.run(tf.trainable_variables())
    for index, grad in enumerate(gradient_buffer):
        gradient_buffer[index] = grad * 0

    i = 0  # 第几回合
    while i < total_episodes:
        # 初始化 state(状态)
        s = env.reset()
        
        episode_reward = 0
        episode_history = []

        while True:
            # 更新可视化环境
            env.render()
            
            # 根据神经网络的输出,随机挑选 action
            a_dist = sess.run(agent.output, feed_dict={agent.state_in: [s]})
            a = np.random.choice(a_dist[0], p=a_dist[0])
            a = np.argmax(a_dist == a)

            # 实施这个 action, 并得到环境返回的下一个 state, reward 和 done(本回合是否结束)
            s_, r, done, _ = env.step(a)  # 这里的 r(奖励)不能准确引导学习

            x, x_dot, theta, theta_dot = s_  # 把 s_ 细分开, 为了修改原配的 reward

            # x 是车的水平位移。所以 r1 是车越偏离中心, 分越少
            # theta 是棒子离垂直的角度, 角度越大, 越不垂直。所以 r2 是棒越垂直, 分越高
            r1 = (env.x_threshold - abs(x)) / env.x_threshold - 0.8
            r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5
            r = r1 + r2  # 总 reward 是 r1 和 r2 的结合, 既考虑位置, 也考虑角度, 这样学习更有效率

            episode_history.append([s, a, r, s_])

            episode_reward += r
            s = s_

            # Policy Gradient 是回合更新
            if done:  # 如果此回合结束
                # 更新神经网络
                episode_history = np.array(episode_history)
                
                episode_history[:, 2] = agent.discount_rewards(episode_history[:, 2])
                
                feed_dict = {
                    agent.reward_holder: episode_history[:, 2],
                    agent.action_holder: episode_history[:, 1],
                    agent.state_in: np.vstack(episode_history[:, 0])
                }

                # 计算梯度
                grads = sess.run(agent.gradients, feed_dict=feed_dict)
                
                for idx, grad in enumerate(grads):
                    gradient_buffer[idx] += grad

                if i % update_frequency == 0 and i != 0:
                    feed_dict = dictionary = dict(zip(agent.gradient_holders, gradient_buffer))

                    # 应用梯度下降来更新参数
                    _ = sess.run(agent.update_batch, feed_dict=feed_dict)

                    for index, grad in enumerate(gradient_buffer):
                        gradient_buffer[index] = grad * 0

                total_reward.append(episode_reward)
                break

        # 每 50 回合打印平均奖励
        if i % 50 == 0:
            print("回合 {} - {} 的平均奖励: {}".format(i, i + 50, np.mean(total_reward[-50:])))

        i += 1

A3c实现3d赛车游戏: 成果展示

python play.py --num-workers 2 --log-dir neonrace

numworkders是 客户端。

重制: 任天堂版权问题

TensorKart

https://github.com/kevinhughes27/TensorKart

这个例子是截取屏幕上的截图,使用卷积神经网络训练。

实现步骤

  • 游戏环境,设置
  • 机器人大脑
  • 游戏主程序
# -*- coding: UTF-8 -*-

"""
配置 Neon Race(霓虹赛车)的游戏环境,以方便我们训练
"""

import cv2
import time
import numpy as np
import logging
import gym
from gym import spaces
from gym.spaces.box import Box
import universe
from universe import vectorized
from universe import spaces as vnc_spaces
from universe.spaces.vnc_event import keycode
from universe.wrappers import BlockingReset, GymCoreAction, EpisodeID, Unvectorize, Vectorize, Vision, Logger

# 配置日志系统
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
universe.configure_logging()

# 游戏:Neon Race
GAME = "flashgames.NeonRace-v0"


# 创建并配置游戏环境
def create_env(client_id, remotes):
    env = gym.make(GAME)
    env = Vision(env)
    env = Logger(env)
    env = BlockingReset(env)

    reg = universe.runtime_spec('flashgames').server_registry
    height = reg[GAME]["height"]
    width = reg[GAME]["width"]
    env = CropScreen(env, height, width, 84, 18)
    env = Rescale(env)

    # 可用的按键:左,右,上,左上,右上,下,用 Turbo 来加速
    keys = ['left', 'right', 'up', 'left up', 'right up', 'down', 'up x']

    env = DiscreteToFixedKeysVNCActions(env, keys)
    env = EpisodeID(env)
    env = DiagnosticsInfo(env)
    env = Unvectorize(env)

    env.configure(fps=5.0, remotes=remotes, start_timeout=15 * 60, client_id=client_id, vnc_driver='go', vnc_kwargs={
            'encoding': 'tight', 'compress_level': 0,
            'fine_quality_level': 50, 'subsample_level': 3})

    return env


# 给环境加上记录诊断信息的功能
def DiagnosticsInfo(env, *args, **kwargs):
    return vectorized.VectorizeFilter(env, DiagnosticsInfoI, *args, **kwargs)


# 诊断信息的类
class DiagnosticsInfoI(vectorized.Filter):
    def __init__(self, log_interval=503):
        super(DiagnosticsInfoI, self).__init__()

        self._episode_time = time.time()
        self._last_time = time.time()
        self._local_t = 0
        self._log_interval = log_interval
        self._episode_reward = 0
        self._episode_length = 0
        self._all_rewards = []
        self._num_vnc_updates = 0
        self._last_episode_id = -1

    def _after_reset(self, observation):
        logger.info('重置环境中')
        self._episode_reward = 0
        self._episode_length = 0
        self._all_rewards = []
        return observation

    def _after_step(self, observation, reward, done, info):
        to_log = {}
        if self._episode_length == 0:
            self._episode_time = time.time()

        self._local_t += 1
        if info.get("stats.vnc.updates.n") is not None:
            self._num_vnc_updates += info.get("stats.vnc.updates.n")

        if self._local_t % self._log_interval == 0:
            cur_time = time.time()
            elapsed = cur_time - self._last_time
            fps = self._log_interval / elapsed
            self._last_time = cur_time
            cur_episode_id = info.get('vectorized.episode_id', 0)
            to_log["diagnostics/fps"] = fps
            if self._last_episode_id == cur_episode_id:
                to_log["diagnostics/fps_within_episode"] = fps
            self._last_episode_id = cur_episode_id
            if info.get("stats.gauges.diagnostics.lag.action") is not None:
                to_log["diagnostics/action_lag_lb"] = info["stats.gauges.diagnostics.lag.action"][0]
                to_log["diagnostics/action_lag_ub"] = info["stats.gauges.diagnostics.lag.action"][1]
            if info.get("reward.count") is not None:
                to_log["diagnostics/reward_count"] = info["reward.count"]
            if info.get("stats.gauges.diagnostics.clock_skew") is not None:
                to_log["diagnostics/clock_skew_lb"] = info["stats.gauges.diagnostics.clock_skew"][0]
                to_log["diagnostics/clock_skew_ub"] = info["stats.gauges.diagnostics.clock_skew"][1]
            if info.get("stats.gauges.diagnostics.lag.observation") is not None:
                to_log["diagnostics/observation_lag_lb"] = info["stats.gauges.diagnostics.lag.observation"][0]
                to_log["diagnostics/observation_lag_ub"] = info["stats.gauges.diagnostics.lag.observation"][1]

            if info.get("stats.vnc.updates.n") is not None:
                to_log["diagnostics/vnc_updates_n"] = info["stats.vnc.updates.n"]
                to_log["diagnostics/vnc_updates_n_ps"] = self._num_vnc_updates / elapsed
                self._num_vnc_updates = 0
            if info.get("stats.vnc.updates.bytes") is not None:
                to_log["diagnostics/vnc_updates_bytes"] = info["stats.vnc.updates.bytes"]
            if info.get("stats.vnc.updates.pixels") is not None:
                to_log["diagnostics/vnc_updates_pixels"] = info["stats.vnc.updates.pixels"]
            if info.get("stats.vnc.updates.rectangles") is not None:
                to_log["diagnostics/vnc_updates_rectangles"] = info["stats.vnc.updates.rectangles"]
            if info.get("env_status.state_id") is not None:
                to_log["diagnostics/env_state_id"] = info["env_status.state_id"]

        if reward is not None:
            self._episode_reward += reward
            if observation is not None:
                self._episode_length += 1
            self._all_rewards.append(reward)

        if done:
            logger.info('回合结束: 回合奖励=%s 回合长度=%s', self._episode_reward, self._episode_length)
            total_time = time.time() - self._episode_time
            to_log["global/episode_reward"] = self._episode_reward
            to_log["global/episode_length"] = self._episode_length
            to_log["global/episode_time"] = total_time
            to_log["global/reward_per_time"] = self._episode_reward / total_time
            self._episode_reward = 0
            self._episode_length = 0
            self._all_rewards = []

        return observation, reward, done, to_log


# 限定的按键状态
class FixedKeyState(object):
    def __init__(self, keys):
        self._keys = [keycode(key) for key in keys]
        self._down_keysyms = set()

    def apply_vnc_actions(self, vnc_actions):
        for event in vnc_actions:
            if isinstance(event, vnc_spaces.KeyEvent):
                if event.down:
                    self._down_keysyms.add(event.key)
                else:
                    self._down_keysyms.discard(event.key)

    def to_index(self):
        action_n = 0
        for key in self._down_keysyms:
            if key in self._keys:
                # 如果按下多个 key(按键),只用第一个 key
                action_n = self._keys.index(key) + 1
                break
        return action_n


# 定义一个确定的 action space(动作空间)
class DiscreteToFixedKeysVNCActions(vectorized.ActionWrapper):
    def __init__(self, env, keys):
        super(DiscreteToFixedKeysVNCActions, self).__init__(env)

        self._keys = keys
        self._generate_actions()
        self.action_space = spaces.Discrete(len(self._actions))

    # 生成 action
    def _generate_actions(self):
        self._actions = []
        uniq_keys = set()
        for key in self._keys:
            for cur_key in key.split(' '):
                uniq_keys.add(cur_key)

        for key in [''] + self._keys:
            split_keys = key.split(' ')
            cur_action = []
            for cur_key in uniq_keys:
                cur_action.append(vnc_spaces.KeyEvent.by_name(cur_key, down=(cur_key in split_keys)))
            self._actions.append(cur_action)
        self.key_state = FixedKeyState(uniq_keys)

    def _action(self, action_n):
        # 每个 action 可能是一个长度为 1 的 np.array
        # 转换成 int 类型,以避免 warning(警告)
        return [self._actions[int(action)] for action in action_n]


# 裁剪屏幕区域
class CropScreen(vectorized.ObservationWrapper):
    """
    从左上角开始裁剪 height(高)x width(宽)大小的区域
    """
    def __init__(self, env, height, width, top=0, left=0):
        super(CropScreen, self).__init__(env)
        self.height = height
        self.width = width
        self.top = top
        self.left = left
        self.observation_space = Box(0, 255, shape=(height, width, 3))

    def _observation(self, observation_n):
        return [ob[self.top:self.top+self.height, self.left:self.left+self.width, :] if ob is not None else None
                for ob in observation_n]


# 处理 Frame(帧)
def _process_frame(frame):
    frame = cv2.resize(frame, (200, 128))
    frame = frame.mean(2).astype(np.float32)
    frame *= (1.0 / 255.0)
    frame = np.reshape(frame, [128, 200, 1])
    return frame


# 调节观测空间的大小
class Rescale(vectorized.ObservationWrapper):
    def __init__(self, env=None):
        super(Rescale, self).__init__(env)
        self.observation_space = Box(0.0, 1.0, [128, 200, 1])

    def _observation(self, observation_n):
        return [_process_frame(observation) for observation in observation_n]

推荐强化学习的博客:

https://morvanzhou.github.io/tutorials/machine-learning/reinforcement-learning/

https://github.com/MorvanZhou/Reinforcement-learning-with-tensorflow

深度增强学习(DRL) 漫谈 从AC 到 A3C

https://blog.csdn.net/jinzhuojun/article/details/72851548

演员评论家

结合了 基于策略 和 基于价值

Actor-Critic 是演员-评论家 算法 结合了 Poilcy Gradient 和 Q 学习

Actor 负责表演 基于Policy(策略) 的算法。选取Action来执行

Critic负责给Actor打分,基于Value(值)的算法。Q-learning

mark

什么是A3C?

https://medium.com/emergent-future/simple-reinforcement-learning-with-tensorflow-part-8-asynchronous-actor-critic-agents-a3c-c88f72a5e9f2

Asynchronous Actor-Critic Agents (A3C)

异步 优势 演员-评论家

几个人同时玩游戏,玩游戏的经验上传到一个中央大脑。

几个人从中央大脑中获取最新最强的玩游戏方法

A3C 算法英文:

mark

mark

mark

分布式TensorFlow,有效利用分布式

分布式(Distributed) TensorFlow

可以高效利用Cpu多核,达到近似GPU的训练速度

PS: Parameter Server 缩写。类似于全局的网络服务器

worker:客户端。从ps拉取pull参数/推送(push)参数到ps

https://www.tensorflow.org/deploy/distributed

TensorFlow分布式全套(原理 部署 实例)

https://blog.csdn.net/luodongri/article/details/52596780

https://blog.csdn.net/CodeMaster_/article/details/76223835

基于Google的gRPC通信框架来做的。

TensorFlow学习笔记

https://blog.csdn.net/u012436149/article/details/53140869

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏CVer

TensorFlow.js人脸识别—玩转吃豆豆小游戏

谷歌TenosrFlow开发者峰会2018上,发布了面向JavaScript开发者的全新机器学习框架 TensorFlow.js。这里介绍一个TensorFlo...

49012
来自专栏IT派

干货 | 机器学习在web攻击检测中的应用实践

岳良, 携程信息安全部高级安全工程师。2015年加入携程,主要负责渗透测试,安全评审,安全产品设计。 一、背景 在web应用攻击检测的发展历史中,到目前为止...

52111
来自专栏深度学习

机器学习在web攻击检测中的应用实践

一、背景 通俗地讲,任何一个的机器学习问题都可以等价于一个寻找合适变换函数的问题。例如语音识别,就是在求取合适的变换函数,将输入的一维时序语音信号变换到语义空间...

3725
来自专栏新智元

解决关系推理,从图网络入手!DeepMind图网络库开源了!

DeepMind提出的简单而强大的关系推理网络“graph network”终于开源了!

3812
来自专栏新智元

【AI可能真的要代替插画师了】复旦同济用cGAN生成动画人物

【新智元导读】复旦大学、同济、CMU等的研究者使用cGAN生成各种属性的二次元人物头像,效果非常令人印象深刻。生成的图片质量非常之高,本文作者认为这项工作如果加...

3615
来自专栏企鹅号快讯

教你快速使用OpenCV/Python/dlib进行眨眼检测识别!

摘要: 图像识别的新思路:眼睛纵横比,看看大牛如果用这种思路玩转识别眨眼动作! ? 今天我们来使用面部标志和OpenCV 检测和计算视频流中的眨眼次数。为了构建...

61410
来自专栏华章科技

核心算法:谷歌如何从网络的大海里捞到针

作 者: David Austin,Grand Valley State University

1028
来自专栏机器学习之旅

python开发:特征工程代码模版(二)

正题开始: 这篇文章是入门级的特征处理的打包解决方案的python实现汇总,如果想get一些新鲜血液的朋友可以叉了,只是方便玩数据的人进行数据特征筛选的代码集...

1373

Scikit-Learn的简介:Python机器学习库

如果你是一名Python程序员,或者你正在寻找一个强大的库,可以将机器学习运用到实际系统中,那么你要认真考虑一下scikit-learn。

9017
来自专栏量子位

谷歌最强NLP模型BERT如约开源,12小时GitHub标星破1500,即将支持中文

BERT终于来了!今天,谷歌研究团队终于在GitHub上发布了万众期待的BERT。

1353

扫码关注云+社区