前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >强化学习-A3C

强化学习-A3C

作者头像
luxuantao
发布2021-02-24 11:29:13
3720
发布2021-02-24 11:29:13
举报
文章被收录于专栏:Fdu弟中弟Fdu弟中弟

这应该是最后一篇关于强化学习的内容了,使用A3C算法玩平衡杆游戏。

代码参考自龙良曲的tensorflow2开源书籍。

代码语言:javascript
复制
import  matplotlib
from    matplotlib import pyplot as plt
matplotlib.rcParams['font.size'] = 18
matplotlib.rcParams['figure.titlesize'] = 18
matplotlib.rcParams['figure.figsize'] = [9, 7]
matplotlib.rcParams['font.family'] = ['KaiTi']
matplotlib.rcParams['axes.unicode_minus']=False

plt.figure()

import os
os.environ["CUDA_VISIBLE_DEVICES"] = ""
import  threading
import  gym
import  multiprocessing
import  numpy as np
from    queue import Queue
import  matplotlib.pyplot as plt

import  tensorflow as tf
from    tensorflow import keras
from    tensorflow.keras import layers,optimizers,losses


tf.random.set_seed(1231)
np.random.seed(1231)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
assert tf.__version__.startswith('2.')


class ActorCritic(keras.Model):
    # Actor-Critic模型
    def __init__(self, state_size, action_size):
        super(ActorCritic, self).__init__()
        self.state_size = state_size # 状态向量长度
        self.action_size = action_size # 动作数量
        # 策略网络Actor
        self.dense1 = layers.Dense(128, activation='relu')
        self.policy_logits = layers.Dense(action_size)
        # V网络Critic
        self.dense2 = layers.Dense(128, activation='relu')
        self.values = layers.Dense(1)

    def call(self, inputs):
        # 获得策略分布Pi(a|s)
        x = self.dense1(inputs)
        logits = self.policy_logits(x)
        # 获得v(s)
        v = self.dense2(inputs)
        values = self.values(v)
        return logits, values


def record(episode, episode_reward, worker_idx, global_ep_reward, result_queue, total_loss, num_steps):
    # 统计工具函数
    if global_ep_reward == 0:
        global_ep_reward = episode_reward
    else:
        global_ep_reward = global_ep_reward * 0.99 + episode_reward * 0.01
    print(
        f"{episode} | "
        f"Average Reward: {int(global_ep_reward)} | "
        f"Episode Reward: {int(episode_reward)} | "
        f"Loss: {int(total_loss / float(num_steps) * 1000) / 1000} | "
        f"Steps: {num_steps} | "
        f"Worker: {worker_idx}"
    )
    result_queue.put(global_ep_reward) # 保存回报,传给主线程
    return global_ep_reward

class Memory:
    def __init__(self):
        self.states = []
        self.actions = []
        self.rewards = []

    def store(self, state, action, reward):
        self.states.append(state)
        self.actions.append(action)
        self.rewards.append(reward)

    def clear(self):
        self.states = []
        self.actions = []
        self.rewards = []

class Agent:
    # 智能体,包含了中央参数网络server
    def __init__(self):
        # server优化器,client不需要,直接从server拉取参数
        self.opt = optimizers.Adam(1e-3)
        # 中央模型,类似于参数服务器
        self.server = ActorCritic(4, 2) # 状态向量,动作数量
        self.server(tf.random.normal((2, 4)))

    def train(self):
        res_queue = Queue() # 共享队列
        # 创建各个交互环境
        workers = [Worker(self.server, self.opt, res_queue, i) for i in range(multiprocessing.cpu_count())]
        for i, worker in enumerate(workers):
            print("Starting worker {}".format(i))
            worker.start()
        # 统计并绘制总回报曲线
        returns = []
        while True:
            reward = res_queue.get()
            if reward is not None:
                returns.append(reward)
            else: # 结束标志
                break
        [w.join() for w in workers] # 等待线程退出 

        print(returns)
        plt.figure()
        plt.plot(np.arange(len(returns)), returns)
        # plt.plot(np.arange(len(moving_average_rewards)), np.array(moving_average_rewards), 's')
        plt.xlabel('回合数')
        plt.ylabel('总回报')
        plt.savefig('a3c-tf-cartpole.svg')


class Worker(threading.Thread): 
    def __init__(self,  server, opt, result_queue, idx):
        super(Worker, self).__init__()
        self.result_queue = result_queue # 共享队列
        self.server = server # 中央模型
        self.opt = opt # 中央优化器
        self.client = ActorCritic(4, 2) # 线程私有网络
        self.worker_idx = idx # 线程id
        self.env = gym.make('CartPole-v1').unwrapped
        self.ep_loss = 0.0

    def run(self): 
        mem = Memory() # 每个worker自己维护一个memory
        for epi_counter in range(500): # 未达到最大回合数
            current_state = self.env.reset() # 复位client游戏状态
            mem.clear()
            ep_reward = 0.
            ep_steps = 0  
            done = False
            while not done:
                # 获得Pi(a|s),未经softmax
                logits, _ = self.client(tf.constant(current_state[None, :], dtype=tf.float32))
                probs = tf.nn.softmax(logits)
                # 随机采样动作
                action = np.random.choice(2, p=probs.numpy()[0])
                new_state, reward, done, _ = self.env.step(action) # 交互 
                ep_reward += reward # 累加奖励
                mem.store(current_state, action, reward) # 记录
                ep_steps += 1 # 计算回合步数
                current_state = new_state # 刷新状态 

                if ep_steps >= 500 or done: # 最长步数500
                    # 计算当前client上的误差
                    with tf.GradientTape() as tape:
                        total_loss = self.compute_loss(done, new_state, mem) 
                    # 计算误差
                    grads = tape.gradient(total_loss, self.client.trainable_weights)
                    # 梯度提交到server,在server上更新梯度
                    self.opt.apply_gradients(zip(grads, self.server.trainable_weights))
                    # 从server拉取最新的梯度
                    self.client.set_weights(self.server.get_weights())
                    mem.clear() # 清空Memory 
                    # 统计此回合回报
                    self.result_queue.put(ep_reward)
                    print(self.worker_idx, ep_reward)
                    break
        self.result_queue.put(None) # 结束线程

    def compute_loss(self, done, new_state, memory, gamma=0.99):
        if done:
            reward_sum = 0. # 终止状态的v(终止)=0
        else:
            reward_sum = self.client(tf.constant(new_state[None, :], dtype=tf.float32))[-1].numpy()[0]
        # 统计折扣回报
        discounted_rewards = []
        for reward in memory.rewards[::-1]:  # reverse buffer r
            reward_sum = reward + gamma * reward_sum
            discounted_rewards.append(reward_sum)
        discounted_rewards.reverse()
        # 获取状态的Pi(a|s)和v(s)
        logits, values = self.client(tf.constant(np.vstack(memory.states), dtype=tf.float32))
        # 计算advantage = R() - v(s)
        advantage = tf.constant(np.array(discounted_rewards)[:, None], dtype=tf.float32) - values
        # Critic网络损失
        value_loss = 0.5 * advantage ** 2
        # 策略损失
        policy_loss = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=memory.actions, logits=logits)
        # 计算策略网络损失时,并不会计算V网络
        policy_loss = policy_loss * tf.stop_gradient(advantage)
        # Entropy Bonus
        entropy = tf.nn.softmax_cross_entropy_with_logits(labels=tf.nn.softmax(logits), logits=logits)
        policy_loss = policy_loss - 0.01 * entropy
        # 聚合各个误差
        total_loss = tf.reduce_mean((value_loss + policy_loss))
        return total_loss


if __name__ == '__main__':
    master = Agent()
    master.train()

loss由两部分组成,计算方式和第二篇其实大同小异,那篇文章里其实已经引入了Actor-Critic的思想。Actor就是策略网络(估计某个状态下每个动作的概率),Critic就是值函数网络(估计某个状态下的期望回报,也就是基准线),过程进行中得到的实际回报可以用MC或者TD计算得到。这些之前都有提到过了。

A3C算法比较特别的地方就是异步更新网络的方式,一个全局网络加若干个子网络(子网络个数一般取CPU核数),每个网络都包含自己Actor和Critic。一开始都从全局网络中拷贝参数值,各自计算好梯度信息后传回全局网络,让全局网络的参数得到更新,然后再从全局网络中拷贝参数值,往复循环。。。注意的是子网络自己只负责算梯度,自己的参数值是不会用梯度更新的。

我的博客即将同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan?invite_code=1iz0ou8sfr9ij

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2020-07-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
私有网络
私有网络(Virtual Private Cloud,VPC)是基于腾讯云构建的专属云上网络空间,为您在腾讯云上的资源提供网络服务,不同私有网络间完全逻辑隔离。作为您在云上的专属网络空间,您可以通过软件定义网络的方式管理您的私有网络 VPC,实现 IP 地址、子网、路由表、网络 ACL 、流日志等功能的配置管理。私有网络还支持多种方式连接 Internet,如弹性 IP 、NAT 网关等。同时,您也可以通过 VPN 连接或专线接入连通腾讯云与您本地的数据中心,灵活构建混合云。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档