关注我们,一起学习~
上一节我们介绍了策略迭代和价值迭代两种方式来解决MDP下的决策问题,但是这两个方法都是需要模型已知的,即需要知道S,A,P,R,γ。但是现实生活中还有一种常见情况,即我们无法知道转移概率P,我们可以知道智能体可以执行哪些动作,因为这是我们设置的,可以知道他会经历哪些状态,也可以从环境的反馈中得到回报值,但是由于环境的复杂性而导致我们无法对环境建模,从而无法得到P。这时可以采用免模型的方法,本节以简单易懂的方式介绍蒙特卡洛方法。
code: https://github.com/dqdallen/RLstudy
什么是蒙特卡洛?
蒙特卡洛用一个词概括就是采样。
蒙特卡洛用一句话概括就是通过不断的采样来逼近我们想要计算的值。
蒙特卡洛方法
在RL中,我们需要计算每个状态的累积回报的期望,找到在这个状态下执行什么动作是可以达到最大效果的。这里我再再来回顾一下这两个公式,可以看到这两个公式都需要用到
来计算最终的v,q的期望。
让我们回到最原始的公式,累积回报的期望公式如下,我们想要的就是G的期望,在这里我们采用蒙特卡洛方法,即采样法,通过很多次的采样得到不同的试验(episode),通过每次试验计算得到的G的经验平均值来替代期望,从而对v值进行估计或者说是逼近。这里所谓的试验,就是不断的让智能体去尝试,让智能体在该环境中尝试不同的初始状态以及不同的动作,从而得到不同的采样结果。
first visit和every visit
如上所述,我们需要采样很多次来得到数据,每一次采样我们都会得到一个序列,这个序列中某个状态s可能出现多次。那么在计算经验平均的时候,每个episode里第一次遇到状态s时考虑这个G,而后续再遇到不考虑,这就叫first visit。那么,every visit就很好理解了,就是每次遇到都考虑进去。
图片来自https://zhuanlan.zhihu.com/p/25743759
,N(s)为总数。
。
探索
在模型已知的情况下,我们会去尝试遍历所有的状态,但是,在模型未知的情况下,我们每次生成episode是依靠现存的策略π,为了尽可能的探索到所有状态,需要采用探索性的方法得到初始状态。
这里做简单解释,
on policy和off policy
这两个有很多种翻译方式,这里就不翻译了。
这里可能会有点困惑:
首先,前面也提到了,需要智能体在环境种不断尝试来得到数据(episode),从而进行迭代,那么智能体在环境中怎么运动,需要靠策略π。
然后,有了产生的数据之后,需要用这些数据计算v,q,然后来提升智能体的策略。
因此,这里涉及到生成数据的策略和需要改进的策略时都是同一个的问题。具体伪代码和流程可见“参考”中的链接。
实例
代码详见github,可点击阅读原文前往。
FrozenLake-v0问题是在一个4 * 4矩阵中,从起始位置S,走到终点G,中间有道路F和陷阱H,希望通过训练使得智能体能从S快速到达G。
代码中包含利用蒙特卡洛解决强化学习的相关方法,分别包含on policy和off policy的方法,其中on policy中包含first visit和every visit的判断,可以自行选择。
迭代后,代码中的test部分对其进行测试,运行一次后可以看到已经可以从S到G顺利到达了。
import gym
import numpy as np
env = gym.make("FrozenLake-v0", is_slippery=False) # FrozenLake-v1 FrozenLake-v0都可以,看库的版本
def on_policy_mc_mg(episode_num, env, gamma, first_every, eps):
policy = np.ones((env.observation_space.n, env.action_space.n)) / env.action_space.n
cnt = np.zeros_like(policy) # 记录不同的g出现的次数
q = np.zeros_like(policy) # 存储过程中的q值,用于计算最终q值
for epi in range(episode_num): # 迭代episode_num次,生成episode_num个序列
g = 0 # 即G,累积回报
state = env.reset() # 重置环境
a_s = [] # 存储一个序列中的动作和状态
while True:
action = np.random.choice(env.action_space.n, p=policy[state]) # 根据策略policy随机选取动作(这里采用随机是因为要探索)
new_state, reward, done, info = env.step(action) # 放到环境中执行后,得到新状态和奖励,done表示是否结束
# 这里对原环境产生的奖励做了修改,到达终点则奖励200,到达陷阱则-200,走出边界则-200并结束,向前走一步为-1
# 原始环境中走到边界是可以继续走的,这里进行了简单修改,方便训练,并且走一步奖励-1使得可以走的步数尽量少。
if reward == 1:
reward = 200
elif reward == 0 and done:
reward = -200
elif reward == 0 and state == new_state:
reward = -200
done = True
else:
reward = -1
if first_every == 'every': # 是every visit 还是first vist
a_s.append((action, state, reward))
else:
if (action, state, reward) not in a_s:
a_s.append((action, state, reward))
state = new_state
if done:
break
# 上述while True得到一条序列后,序列中状态s对应的q值
g = 0
for a, s, r in reversed(a_s): # reversed是倒序,使得在计算g的时候距离当前状态越远的系数越小,起到加权的作用
g = r + gamma * g
cnt[s][a] += 1
q[s][a] += (g - q[s][a]) / cnt[s][a]
ac = np.argmax(q[s]) # 找到q值最大的对应的动作
# 保留探索部分的概率
policy[s] = eps / env.action_space.n
policy[s][ac] = 1 - eps + eps / env.action_space.n
if (epi + 1) % 10000 == 0:
print(f'epi: {epi}, g = {g}')
return policy, q
def off_policy_mc_mg(env, gamma, episode_num):
# 初始化两个策略,分别是目标策略和行为策略
policy = np.zeros((env.observation_space.n, env.action_space.n))
behavior_policy = np.ones((env.observation_space.n, env.action_space.n)) / env.action_space.n
cnt = np.zeros_like(policy)
q = np.zeros_like(policy)
for epi in range(episode_num):
g = 0
state = env.reset()
a_s = []
w = 1
while True:
action = np.random.choice(env.action_space.n, p=behavior_policy[state])
new_state, reward, done, _ = env.step(action)
if reward == 1:
reward = 200
elif reward == 0 and done:
reward = -200
elif reward == 0 and state == new_state:
reward = -200
done = True
else:
reward = -1
a_s.append((action, state, reward))
state = new_state
if done:
break
# w这个变量用于计算重要性采样中的权重
g = 0
for a, s, r in reversed(a_s):
g = r + gamma * g
cnt[s][a] += w
q[s][a] += w / cnt[s][a] * (g - q[s][a])
ac = np.argmax(q[s])
# 这里和on_policy不一样,这里产生行为的是包含探索的,即上面的random.choice,这里做决策的时候,采用贪心策略
policy[s] = 0
policy[s][ac] = 1
if ac != a:
break
w = w / behavior_policy[s][a]
if (epi + 1) % 10000 == 0:
print(f'epi = {epi}, g = {g}')
return policy, q
if __name__ == '__main__':
gamma = 0.9
eps = 0.1
episode_num = 50000
method = 'off'
if method == 'on':
first_every = 'first'
p, q = on_policy_mc_mg(episode_num, env, gamma, first_every, eps)
else:
p, q = off_policy_mc_mg(env, gamma, episode_num)
# test
state = env.reset()
g = 0
i = 0
import time
while True:
i += 1
action = p[state].argmax()
state, reward, done, _ = env.step(action)
env.render()
time.sleep(1)
g = reward
if done:
break
https://blog.csdn.net/LagrangeSK/article/details/81182605
https://zhuanlan.zhihu.com/p/25743759