前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Hands on Reinforcement Learning 12 Proximal Policy Optimization

Hands on Reinforcement Learning 12 Proximal Policy Optimization

作者头像
一只野生彩色铅笔
发布2023-04-12 13:25:35
5050
发布2023-04-12 13:25:35
举报
文章被收录于专栏:彩铅的随笔博客

12 PPO 算法

12.1 简介

第 11 章介绍的 TRPO 算法在很多场景上的应用都很成功,但是我们也发现它的计算过程非常复杂,每一步更新的运算量非常大。于是,TRPO 算法的改进版——PPO 算法在 2017 年被提出,PPO 基于 TRPO 的思想,但是其算法实现更加简单。并且大量的实验结果表明,与 TRPO 相比,PPO 能学习得一样好(甚至更快),这使得 PPO 成为非常流行的强化学习算法。如果我们想要尝试在一个新的环境中使用强化学习算法,那么 PPO 就属于可以首先尝试的算法。

回忆一下 TRPO 的优化目标:

max⁡θEs∼νπθkEa∼πθk(⋅∣s)[πθ(a∣s)πθk(a∣s)Aπθk(s,a)]s.t. Es∼νπθk[DKL(πθk(⋅∣s)∣∣πθ(⋅∣s))]≤δ\begin{aligned} & \max_{\theta} \mathbb{E}_{s\sim \nu^{\pi_{\theta_k}}} \mathbb{E}_{a\sim \pi_{\theta_k}(\cdot|s)} \bigg[ \dfrac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)} A^{\pi_{\theta_k}}(s,a) \bigg]\\ & \text{s.t. } \mathbb{E}_{s\sim \nu^{\pi_{\theta_k}}} \bigg[ D_{KL}\Big(\pi_{\theta_k}(\cdot|s) || \pi_{\theta}(\cdot|s)\Big) \bigg] \le \delta \end{aligned} ​θmax​Es∼νπθk​​​Ea∼πθk​​(⋅∣s)​[πθk​​(a∣s)πθ​(a∣s)​Aπθk​​(s,a)]s.t. Es∼νπθk​​​[DKL​(πθk​​(⋅∣s)∣∣πθ​(⋅∣s))]≤δ​

TRPO 使用泰勒展开近似、共轭梯度、线性搜索等方法直接求解。PPO 的优化目标与 TRPO 相同,但 PPO 用了一些相对简单的方法来求解。具体来说,PPO 有两种形式,一是 PPO-惩罚,二是 PPO-截断,我们接下来对这两种形式进行介绍。

12.2 PPO-惩罚

PPO-惩罚(PPO-Penalty)用拉格朗日乘数法直接将 KL 散度的限制放进了目标函数中,这就变成了一个无约束的优化问题,在迭代的过程中不断更新 KL 散度前的系数。即:

arg max⁡θEs∼νπθkEa∼πθk(⋅∣s)[πθ(a∣s)πθk(a∣s)Aπθk(s,a)−βDKL[πθk(⋅∣s),πθ(⋅∣s)]]\underset{\theta}{\argmax} \mathbb{E}_{s\sim \nu^{\pi_{\theta_k}}} \mathbb{E}_{a\sim \pi_{\theta_k}(\cdot|s)} \bigg[ \dfrac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)} A^{\pi_{\theta_k}}(s,a) - \beta D_{KL}\Big[\pi_{\theta_k}(\cdot|s), \pi_{\theta}(\cdot|s)\Big] \bigg] θargmax​Es∼νπθk​​​Ea∼πθk​​(⋅∣s)​[πθk​​(a∣s)πθ​(a∣s)​Aπθk​​(s,a)−βDKL​[πθk​​(⋅∣s),πθ​(⋅∣s)]]

令dk=DKLνπθk(πθk,πθ)d_k=D_{KL}^{\nu^{\pi_{\theta_k}}}(\pi_{\theta_k}, \pi_\theta)dk​=DKLνπθk​​​(πθk​​,πθ​), β\betaβ的更新规则如下:

  1. 如果,那么dk<δ1.5d_k < \dfrac{\delta}{1.5}dk​<1.5δ​,那么βk+1=βk2\beta_{k+1} = \dfrac{\beta_k}{2}βk+1​=2βk​​
  2. 如果dk>δ×1.5d_k > \delta \times 1.5dk​>δ×1.5,那么βk+1=βk×2\beta_{k+1} = \beta_k \times 2βk+1​=βk​×2
  3. 否则βk+1=βk\beta_{k+1}=\beta_kβk+1​=βk​

其中,δ\deltaδ是事先设定的一个超参数,用于限制学习策略和之前一轮策略的差距。

12.3 PPO-截断

PPO 的另一种形式 PPO-截断(PPO-Clip)更加直接,它在目标函数中进行限制,以保证新的参数和旧的参数的差距不会太大,即:

arg max⁡θEs∼νπθkEa∼πθk(⋅∣s)[min⁡(πθ(a∣s)πθk(a∣s)Aπθk(s,a),clip⁡(πθ(a∣s)πθk(a∣s),1−ϵ,1+ϵ)Aπθk(s,a))]\argmax_{\theta} \mathbb{E}_{s\sim\nu^{\pi_{\theta_k}}} \mathbb{E}_{a\sim\pi_{\theta_k}(\cdot|s)} \bigg[ \min \bigg( \dfrac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)} A^{\pi_{\theta_k}}(s,a), \operatorname{clip}\Big( \dfrac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)}, 1-\epsilon, 1+\epsilon \Big)A^{\pi_{\theta_k}}(s,a) \bigg) \bigg] θargmax​Es∼νπθk​​​Ea∼πθk​​(⋅∣s)​[min(πθk​​(a∣s)πθ​(a∣s)​Aπθk​​(s,a),clip(πθk​​(a∣s)πθ​(a∣s)​,1−ϵ,1+ϵ)Aπθk​​(s,a))]

其中clip⁡(x,l,r):=max⁡(min⁡(x,r),l)\operatorname{clip}(x,l,r):=\max(\min(x,r),l)clip(x,l,r):=max(min(x,r),l),即把xxx限制在[l,r][l,r][l,r]内。上式中ϵ\epsilonϵ是一个超参数,表示进行截断(clip)的范围。

如果Aπθk(s,a)>0A^{\pi_{\theta_k}}(s,a) > 0Aπθk​​(s,a)>0,说明这个动作的价值高于平均,最大化这个式子会增大πθ(a∣s)πθk(a∣s)\dfrac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)}πθk​​(a∣s)πθ​(a∣s)​,但不会让其超过1+ϵ1+\epsilon1+ϵ。反之,如果Aπθk(s,a)<0A^{\pi_{\theta_k}}(s,a) < 0Aπθk​​(s,a)<0,最大化这个式子会减小πθ(a∣s)πθk(a∣s)\dfrac{\pi_{\theta}(a|s)}{\pi_{\theta_k}(a|s)}πθk​​(a∣s)πθ​(a∣s)​,但不会让其超过1−ϵ1-\epsilon1−ϵ。如图 12-1 所示。

12.4 PPO 代码实践

与 TRPO 相同,我们仍然在车杆和倒立摆两个环境中测试 PPO 算法。大量实验表明,PPO-截断总是比 PPO-惩罚表现得更好。因此下面我们专注于 PPO-截断的代码实现。

首先导入一些必要的库,并定义策略网络和价值网络。

代码语言:javascript
复制
import gym
import torch
import torch.nn.functional as F
import numpy as np
import matplotlib.pyplot as plt
import rl_utils


class PolicyNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(PolicyNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return F.softmax(self.fc2(x), dim=1)


class ValueNet(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim):
        super(ValueNet, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc2 = torch.nn.Linear(hidden_dim, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        return self.fc2(x)


class PPO:
    ''' PPO算法,采用截断方式 '''
    def __init__(
        self,
        state_dim,
        hidden_dim,
        action_dim,
        actor_lr,
        critic_lr,
        lmbda,
        epochs,
        eps,
        gamma,
        device
    ):
        self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
        self.critic = ValueNet(state_dim, hidden_dim).to(device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        self.gamma = gamma
        self.lmbda = lmbda
        self.epochs = epochs  # 一条序列的数据用来训练轮数
        self.eps = eps  # PPO中截断范围的参数
        self.device = device

    def take_action(self, state):
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        probs = self.actor(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item()

    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)
        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
        td_delta = td_target - self.critic(states)
        advantage = rl_utils.compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)
        old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach()

        for _ in range(self.epochs):
            log_probs = torch.log(self.actor(states).gather(1, actions))
            ratio = torch.exp(log_probs - old_log_probs)
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage  # 截断
            actor_loss = torch.mean(-torch.min(surr1, surr2))  # PPO损失函数
            critic_loss = torch.mean(
                F.mse_loss(self.critic(states), td_target.detach()))
            self.actor_optimizer.zero_grad()
            self.critic_optimizer.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_optimizer.step()
            self.critic_optimizer.step()

接下来在车杆环境中训练 PPO 算法。

代码语言:javascript
复制
actor_lr = 1e-3
critic_lr = 1e-2
num_episodes = 500
hidden_dim = 128
gamma = 0.98
lmbda = 0.95
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

env_name = 'CartPole-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
agent = PPO(
    state_dim, 
    hidden_dim, 
    action_dim, 
    actor_lr, 
    critic_lr, 
    lmbda, 
    epochs, 
    eps, 
    gamma, 
    device
)
return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)
代码语言:javascript
复制
Iteration 0: 100%|██████████| 50/50 [00:02<00:00, 19.41it/s, episode=50, return=183.200]
Iteration 1: 100%|██████████| 50/50 [00:03<00:00, 13.49it/s, episode=100, return=184.900]
Iteration 2: 100%|██████████| 50/50 [00:03<00:00, 12.64it/s, episode=150, return=200.000]
Iteration 3: 100%|██████████| 50/50 [00:03<00:00, 12.64it/s, episode=200, return=200.000]
Iteration 4: 100%|██████████| 50/50 [00:03<00:00, 12.81it/s, episode=250, return=200.000]
Iteration 5: 100%|██████████| 50/50 [00:03<00:00, 12.63it/s, episode=300, return=200.000]
Iteration 6: 100%|██████████| 50/50 [00:03<00:00, 12.83it/s, episode=350, return=200.000]
Iteration 7: 100%|██████████| 50/50 [00:03<00:00, 12.58it/s, episode=400, return=200.000]
Iteration 8: 100%|██████████| 50/50 [00:03<00:00, 12.78it/s, episode=450, return=200.000]
Iteration 9: 100%|██████████| 50/50 [00:03<00:00, 12.59it/s, episode=500, return=187.200]
代码语言:javascript
复制
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()

mv_return = rl_utils.moving_average(return_list, 9)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()

倒立摆是与连续动作交互的环境,同 TRPO 算法一样,我们做一些修改,让策略网络输出连续动作高斯分布(Gaussian distribution)的均值和标准差。后续的连续动作则在该高斯分布中采样得到。

代码语言:javascript
复制
class PolicyNetContinuous(torch.nn.Module):
    def __init__(self, state_dim, hidden_dim, action_dim):
        super(PolicyNetContinuous, self).__init__()
        self.fc1 = torch.nn.Linear(state_dim, hidden_dim)
        self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)
        self.fc_std = torch.nn.Linear(hidden_dim, action_dim)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        mu = 2.0 * torch.tanh(self.fc_mu(x))
        std = F.softplus(self.fc_std(x))
        return mu, std


class PPOContinuous:
    ''' 处理连续动作的PPO算法 '''
    def __init__(
        self,
        state_dim,
        hidden_dim,
        action_dim,
        actor_lr,
        critic_lr,
        lmbda,
        epochs,
        eps,
        gamma,
        device
    ):
        self.actor = PolicyNetContinuous(state_dim, hidden_dim, action_dim).to(device)
        self.critic = ValueNet(state_dim, hidden_dim).to(device)
        self.actor_optimizer = torch.optim.Adam(self.actor.parameters(), lr=actor_lr)
        self.critic_optimizer = torch.optim.Adam(self.critic.parameters(), lr=critic_lr)
        self.gamma = gamma
        self.lmbda = lmbda
        self.epochs = epochs
        self.eps = eps
        self.device = device

    def take_action(self, state):
        state = torch.tensor([state], dtype=torch.float).to(self.device)
        mu, sigma = self.actor(state)
        action_dist = torch.distributions.Normal(mu, sigma)
        action = action_dist.sample()
        return [action.item()]

    def update(self, transition_dict):
        states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
        actions = torch.tensor(transition_dict['actions'], dtype=torch.float).view(-1, 1).to(self.device)
        rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
        next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
        dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)
        rewards = (rewards + 8.0) / 8.0  # 和TRPO一样,对奖励进行修改,方便训练
        td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)
        td_delta = td_target - self.critic(states)
        advantage = rl_utils.compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)
        mu, std = self.actor(states)
        action_dists = torch.distributions.Normal(mu.detach(), std.detach())
        # 动作是正态分布
        old_log_probs = action_dists.log_prob(actions)

        for _ in range(self.epochs):
            mu, std = self.actor(states)
            action_dists = torch.distributions.Normal(mu, std)
            log_probs = action_dists.log_prob(actions)
            ratio = torch.exp(log_probs - old_log_probs)
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage
            actor_loss = torch.mean(-torch.min(surr1, surr2))
            critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))
            self.actor_optimizer.zero_grad()
            self.critic_optimizer.zero_grad()
            actor_loss.backward()
            critic_loss.backward()
            self.actor_optimizer.step()
            self.critic_optimizer.step()

创建环境Pendulum-v0,并设定随机数种子以便重复实现。接下来我们在倒立摆环境中训练 PPO 算法。

代码语言:javascript
复制
actor_lr = 1e-4
critic_lr = 5e-3
num_episodes = 2000
hidden_dim = 128
gamma = 0.9
lmbda = 0.9
epochs = 10
eps = 0.2
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

env_name = 'Pendulum-v0'
env = gym.make(env_name)
env.seed(0)
torch.manual_seed(0)
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.shape[0]  # 连续动作空间
agent = PPOContinuous(
    state_dim,
    hidden_dim,
    action_dim,
    actor_lr,
    critic_lr,
    lmbda,
    epochs, 
    eps, 
    gamma, 
    device
)

return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)
代码语言:javascript
复制
Iteration 0: 100%|██████████| 200/200 [00:22<00:00,  9.02it/s, episode=200, return=-1000.354]
Iteration 1: 100%|██████████| 200/200 [00:22<00:00,  8.78it/s, episode=400, return=-922.780]
Iteration 2: 100%|██████████| 200/200 [00:20<00:00,  9.63it/s, episode=600, return=-483.957]
Iteration 3: 100%|██████████| 200/200 [00:20<00:00,  9.80it/s, episode=800, return=-472.933]
Iteration 4: 100%|██████████| 200/200 [00:20<00:00,  9.54it/s, episode=1000, return=-327.589]
Iteration 5: 100%|██████████| 200/200 [00:20<00:00,  9.63it/s, episode=1200, return=-426.262]
Iteration 6: 100%|██████████| 200/200 [00:20<00:00,  9.73it/s, episode=1400, return=-224.806]
Iteration 7: 100%|██████████| 200/200 [00:21<00:00,  9.49it/s, episode=1600, return=-279.722]
Iteration 8: 100%|██████████| 200/200 [00:20<00:00,  9.62it/s, episode=1800, return=-428.538]
Iteration 9: 100%|██████████| 200/200 [00:20<00:00,  9.81it/s, episode=2000, return=-235.771]
代码语言:javascript
复制
episodes_list = list(range(len(return_list)))
plt.plot(episodes_list, return_list)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()

mv_return = rl_utils.moving_average(return_list, 21)
plt.plot(episodes_list, mv_return)
plt.xlabel('Episodes')
plt.ylabel('Returns')
plt.title('PPO on {}'.format(env_name))
plt.show()

12.5 总结

PPO 是 TRPO 的一种改进算法,它在实现上简化了 TRPO 中的复杂计算,并且它在实验中的性能大多数情况下会比 TRPO 更好,因此目前常被用作一种常用的基准算法。需要注意的是,TRPO 和 PPO 都属于在线策略学习算法,即使优化目标中包含重要性采样的过程,但其只是用到了上一轮策略的数据,而不是过去所有策略的数据。

PPO 是 TRPO 的第一作者 John Schulman 从加州大学伯克利分校博士毕业后在 OpenAI 公司研究出来的。通过对 TRPO 计算方式的改进,PPO 成为了最受关注的深度强化学习算法之一,并且其论文的引用量也超越了 TRPO。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2023-04-10,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 12 PPO 算法
    • 12.1 简介
      • 12.2 PPO-惩罚
        • 12.3 PPO-截断
          • 12.4 PPO 代码实践
            • 12.5 总结
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档