前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >[一起学RL] 蒙特卡洛方法及其实例实现

[一起学RL] 蒙特卡洛方法及其实例实现

作者头像
秋枫学习笔记
发布2022-09-19 10:57:58
6130
发布2022-09-19 10:57:58
举报
文章被收录于专栏:秋枫学习笔记

关注我们,一起学习~

上一节我们介绍了策略迭代和价值迭代两种方式来解决MDP下的决策问题,但是这两个方法都是需要模型已知的,即需要知道S,A,P,R,γ。但是现实生活中还有一种常见情况,即我们无法知道转移概率P,我们可以知道智能体可以执行哪些动作,因为这是我们设置的,可以知道他会经历哪些状态,也可以从环境的反馈中得到回报值,但是由于环境的复杂性而导致我们无法对环境建模,从而无法得到P。这时可以采用免模型的方法,本节以简单易懂的方式介绍蒙特卡洛方法。

code: https://github.com/dqdallen/RLstudy

什么是蒙特卡洛?

蒙特卡洛用一个词概括就是采样。

蒙特卡洛用一句话概括就是通过不断的采样来逼近我们想要计算的值。

蒙特卡洛方法

在RL中,我们需要计算每个状态的累积回报的期望,找到在这个状态下执行什么动作是可以达到最大效果的。这里我再再来回顾一下这两个公式,可以看到这两个公式都需要用到

P_{ss'}^a

来计算最终的v,q的期望。

v_{\pi}(s)=\sum_{a \in A} \pi(a \mid s)\left(R_{s}^{a}+\gamma \sum_{s^{\prime} \in S} P_{s s^{\prime}}^{a} v_{\pi}\left(s^{\prime}\right)\right)
q_{\pi}(s, a)=R_{s}^{a}+\gamma \sum_{s^{\prime} \in S} P_{s s^{\prime}}^{a} \sum_{a^{\prime} \in A} \pi\left(a^{\prime} \mid s^{\prime}\right) q_{\pi}\left(s^{\prime}, a^{\prime}\right)

让我们回到最原始的公式,累积回报的期望公式如下,我们想要的就是G的期望,在这里我们采用蒙特卡洛方法,即采样法,通过很多次的采样得到不同的试验(episode),通过每次试验计算得到的G的经验平均值来替代期望,从而对v值进行估计或者说是逼近。这里所谓的试验,就是不断的让智能体去尝试,让智能体在该环境中尝试不同的初始状态以及不同的动作,从而得到不同的采样结果。

v_{\pi}(s)=E_{\pi}\left[G_{t} \mid S_{t}=s\right]=E_{\pi}\left[\sum_{k=0}^{\infty} \gamma^{k} R_{t+k+1} \mid S_{t}=s\right]

first visit和every visit

如上所述,我们需要采样很多次来得到数据,每一次采样我们都会得到一个序列,这个序列中某个状态s可能出现多次。那么在计算经验平均的时候,每个episode里第一次遇到状态s时考虑这个G,而后续再遇到不考虑,这就叫first visit。那么,every visit就很好理解了,就是每次遇到都考虑进去。

图片来自https://zhuanlan.zhihu.com/p/25743759

  • 对于first visit,平均值的计算方式为
(G_{11}+G_{21}) / N(s)

,N(s)为总数。

  • 对于every visit,平均值的计算方式为
G_{11}+G_{21}+G_{12} / N(s)

探索

在模型已知的情况下,我们会去尝试遍历所有的状态,但是,在模型未知的情况下,我们每次生成episode是依靠现存的策略π,为了尽可能的探索到所有状态,需要采用探索性的方法得到初始状态。

这里做简单解释,

  • 首先,一开始的时候策略π中每个状态下执行不同动作的概率是相同的,即各个状态都有可能跳转到。
  • 然后,在策略提升时,如果直接采用贪心法,那么下次遇到这个状态就只会执行当时认为的最优动作。
  • 因此,需要进行探索。通常采用ε-greedy方法,公式如下。对于非最优动作仍保留一部分的概率,从而使得可以进行探索。
\pi(a \mid s) \leftarrow\left\{\begin{array}{c} 1-\varepsilon+\frac{\varepsilon}{|A(s)|} \text { if } a=\arg \max _{a} Q(s, a) \\ \frac{\varepsilon}{|A(s)|} \text { if } a \neq \arg \max _{a} Q(s, a) \end{array}\right.

on policy和off policy

这两个有很多种翻译方式,这里就不翻译了。

  • on policy:该策略对应的方法主要是产生数据的策略和需要改进的策略同一个策略
  • off policy:该策略对应的方式是产生数据的策略和需要改进的策略不是同一个策略

这里可能会有点困惑:

首先,前面也提到了,需要智能体在环境种不断尝试来得到数据(episode),从而进行迭代,那么智能体在环境中怎么运动,需要靠策略π。

然后,有了产生的数据之后,需要用这些数据计算v,q,然后来提升智能体的策略。

因此,这里涉及到生成数据的策略和需要改进的策略时都是同一个的问题。具体伪代码和流程可见“参考”中的链接。

实例

代码详见github,可点击阅读原文前往。

FrozenLake-v0问题是在一个4 * 4矩阵中,从起始位置S,走到终点G,中间有道路F和陷阱H,希望通过训练使得智能体能从S快速到达G。

代码中包含利用蒙特卡洛解决强化学习的相关方法,分别包含on policy和off policy的方法,其中on policy中包含first visit和every visit的判断,可以自行选择。

迭代后,代码中的test部分对其进行测试,运行一次后可以看到已经可以从S到G顺利到达了。

代码语言:javascript
复制
import gym
import numpy as np

env = gym.make("FrozenLake-v0", is_slippery=False) # FrozenLake-v1 FrozenLake-v0都可以,看库的版本

def on_policy_mc_mg(episode_num, env, gamma, first_every, eps):
    policy = np.ones((env.observation_space.n, env.action_space.n)) / env.action_space.n
    cnt = np.zeros_like(policy)  # 记录不同的g出现的次数
    q = np.zeros_like(policy)  # 存储过程中的q值,用于计算最终q值
    for epi in range(episode_num):  # 迭代episode_num次,生成episode_num个序列
        g = 0  # 即G,累积回报
        state = env.reset()  # 重置环境
        a_s = []  # 存储一个序列中的动作和状态
        while True:
            action = np.random.choice(env.action_space.n, p=policy[state])  # 根据策略policy随机选取动作(这里采用随机是因为要探索)
            new_state, reward, done, info = env.step(action)  # 放到环境中执行后,得到新状态和奖励,done表示是否结束
            # 这里对原环境产生的奖励做了修改,到达终点则奖励200,到达陷阱则-200,走出边界则-200并结束,向前走一步为-1
            # 原始环境中走到边界是可以继续走的,这里进行了简单修改,方便训练,并且走一步奖励-1使得可以走的步数尽量少。
            if reward == 1:
                reward = 200
            elif reward == 0 and done:
                reward = -200
            elif reward == 0 and state == new_state:
                reward = -200
                done = True
            else:
                reward = -1 
            if first_every == 'every':  # 是every visit 还是first vist
                a_s.append((action, state, reward))

            else:
                if (action, state, reward) not in a_s:
                    a_s.append((action, state, reward))
            state = new_state
            if done:
                break
        # 上述while True得到一条序列后,序列中状态s对应的q值
        g = 0
        for a, s, r in reversed(a_s): # reversed是倒序,使得在计算g的时候距离当前状态越远的系数越小,起到加权的作用
            g = r + gamma * g
            cnt[s][a] += 1
            q[s][a] += (g - q[s][a]) / cnt[s][a]
            ac = np.argmax(q[s]) # 找到q值最大的对应的动作
            # 保留探索部分的概率
            policy[s] = eps / env.action_space.n
            policy[s][ac] = 1 - eps + eps / env.action_space.n

        if (epi + 1) % 10000 == 0:
            print(f'epi: {epi}, g = {g}')
    return policy, q       

def off_policy_mc_mg(env, gamma, episode_num):
    # 初始化两个策略,分别是目标策略和行为策略
    policy = np.zeros((env.observation_space.n, env.action_space.n))
    behavior_policy = np.ones((env.observation_space.n, env.action_space.n)) / env.action_space.n
    cnt = np.zeros_like(policy)
    q = np.zeros_like(policy)
    
    for epi in range(episode_num):
        g = 0
        state = env.reset()
        a_s = []
        w = 1
        while True:
            action = np.random.choice(env.action_space.n, p=behavior_policy[state])
            new_state, reward, done, _ = env.step(action)
            if reward == 1:
                reward = 200
            elif reward == 0 and done:
                reward = -200
            elif reward == 0 and state == new_state:
                reward = -200
                done = True
            else:
                reward = -1 
            
            a_s.append((action, state, reward))
            state = new_state
            if done:
                break
        # w这个变量用于计算重要性采样中的权重
        g = 0
        for a, s, r in reversed(a_s):
            g =  r + gamma * g
            cnt[s][a] += w
            q[s][a] += w / cnt[s][a] * (g - q[s][a])
            ac = np.argmax(q[s])
            # 这里和on_policy不一样,这里产生行为的是包含探索的,即上面的random.choice,这里做决策的时候,采用贪心策略
            policy[s] = 0
            policy[s][ac] = 1
            if ac != a:
                break
        w = w / behavior_policy[s][a]
        if (epi + 1) % 10000 == 0:
            print(f'epi = {epi}, g = {g}')
    return policy, q

if __name__ == '__main__':
    gamma = 0.9
    eps = 0.1
    episode_num = 50000
    method = 'off'
    if method == 'on':
        first_every = 'first'
        p, q = on_policy_mc_mg(episode_num, env, gamma, first_every, eps)
    else:
        p, q = off_policy_mc_mg(env, gamma, episode_num)
    # test
    state = env.reset()
    g = 0
    i = 0
    import time
    while True:
        i += 1
        action = p[state].argmax()
        state, reward, done, _ = env.step(action)
        env.render()
        time.sleep(1)
        g = reward
        if done:
            break

参考

https://blog.csdn.net/LagrangeSK/article/details/81182605

https://zhuanlan.zhihu.com/p/25743759

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-03-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 秋枫学习笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 参考
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档