和一个策略
,或者是给定一个马尔科夫奖励过程MRP
,求解基于该策略的价值函数
,求解最优价值函数
和最优策略
.
来计算当前状态
的价值,在同步迭代法中,我们使用上一个迭代周期
内的后续状态价值来计算更新当前迭代周期
内某状态
的价值:
的价值等于前一次迭代该状态的即时奖励与所有
的下一个可能状态
的价值与其概率乘积的和:
:考虑如上图所示的4 * 4的方格阵列,我们把它看成一个小世界.这个世界有16个状态,图中每一个小方格对应一个状态,依次使用0-15标记他们.图中状态0和15分别位于左上角和右下角,是终止状态,用灰色表示.
:假设在这个小型方格世界中有一个可以进行上,下,左,右移动的个体,它需要通过移动自己来到达两个灰色格子中的任意一个来完成任务.即{n, e, s, w} 对于任何非终止状态可以有东南西北移动四个行为;
:这个小型格子世界作为环境有着自己的动力学特征:当个体采取的移动行为不会导致个体离开格子世界时,个体将以100%的几率到达它所要移动方向的相邻的那个格子,之所以是相邻的格子而不能跳格是由于环境约束个体每次只能移动一格,同时规定个体也不能斜向移动;如果个体采取会跳出格子世界的行为,那么环境将让个体以100%的概率停留在原来的格子;如果个体到达终止状态,任务结束.
:每当个体采取了一个行为后,只要这个行为是个体在非终止状态时执行的,不管个体随后到达哪一个状态,环境都将给予个体值为-1的奖励值;而当个体处于终止位置时,任何行为将获得值为0的奖励并仍旧停留在终止位置.环境设置如此的奖励机制是利用了个体希望获得累计最大奖励的天性,而为了让个体在格子世界中用尽可能少的步数来到达终止状态,因此个体在世界中每多走一步,都将得到一个负值的奖励.
为了简化问题,我们设置衰减因子
,至此一个格子世界的强化学习问题就描述清楚了.
:对于身在格子世界中的个体来说,它并不知道各个状态之间的位置关系,它不知道当自己处在状态4时只需要选择"向上"移动的行为就可以直接到达终止状态.此时个体所能做的就是在任何一个状态时,它选择朝四个方向移动的概率想等.个体想到这个办法就是一个基于均一概率的随机策略(uniform random policy)
时,可以得到基于该策略的价值函数
,基于产生的价值函数可以得到一个贪婪策略
.
会得到一个新的价值函数,并产生新的贪婪策略,如此重复迭代将最终得到最优价值函数
和最优策略
.策略在循环迭代中得到更新改善的过程称为策略迭代(policy iteration).下图直观显示了策略迭代的过程.
和初始价值函数
开始,基于该策略进行完整的价值评估过程得到一个新的价值函数,随后依据新的价值函数得到新的贪婪策略,随后计算新的贪婪策略下的价值函数,整个过程反复进行,在这个循环过程中策略和价值函数均得到迭代更新,并最终收敛成最优价值函数和最优策略.除初始策略外,迭代中的策略均是依据价值函数的贪婪策略.
对任意状态
产生的行为
,贪婪策略在同样的状态
下会得到新行为
,其中:
对状态集中的所有状态都成立,那么针对状态
的所有后续状态均使用贪婪策略产生的行为,不等式:
将成立.这表明新策略下状态价值函数总不劣于原策略下的状态价值函数.该步推导如下:
,满足:
,这表明此时的策略
即为最优策略.证明完成.
的最优价值当且仅当该策略也同时获得状态
所有可能的后续状态
的最优价值.
,其中m,n分别为行为和状态空间的大小.
是实际与Agent相关或者说Agent经历的状态,可以省去那些仅存在理论上的状态的计算.
S = [i for i in range(16)] #状态空间
A = ['n', 'e', 's', 'w'] #行为空间
#P,R将由dynamics动态生成
ds_actions = {"n":-4, "e":1, "s":4, "w":-1} #行为对状态的改变(选择某个方向对应改变的状态)
def dynamics(s,a):
'''
模拟小型方格世界的环境动力学特征
:param s: 当前状态 int 0-15
:param a: 行为 str in ['n', 'e', 's', 'w']代表北,东,南,西
:return: tuple(s_prime, reward, is_end)
s_prime 后续状态 reward 奖励值 is_end 是否进入终止状态
'''
s_prime = s
#规定超出边界的行为会使agent停在原地
if (s % 4 == 0 and a == "w") or (s < 4 and a == "n")\
or ((s+1) % 4 == 0 and a == "e") or (s > 11 and a == "s")\
or s in [0,15]:
pass
else:
ds = ds_actions[a]
s_prime = s + ds
reward = 0 if s in [0,15] else -1
is_end = True if s in [0,15] else False
return s_prime, reward, is_end
def P(s,a,s1):#状态转移概率函数
s_prime, _, _ = dynamics(s,a)
return s1 == s_prime
def R(s,a):#奖励函数
_,r,_ = dynamics(s,a)
return r
gamma = 1.00
MDP = S,A,R,P,gamma
和
都变成了函数而不是字典了.同样变成函数的还有策略.下面的代码分贝建立了均一随机策略和贪婪策略,并给出了调用这两个策略的统一的接口.由于生成一个策略所需要的参数并不统一,例如像均一随机策略多数只需要知道行为空间就可以了,而贪婪策略则需要知道状态的价值.为了方便程序使用相同的代码调用不同的策略,我们对参数进行了统一.
def uniform_random_pi(MDP = None, V = None, s = None, a = None):
_, A, _, _, _ = MDP
n = len(A)
return 0 if n == 0 else 1.0/n
def greedy_pi(MDP, V, s, a):
S, A, P, R, gamma = MDP
max_v, a_max_v = -float('inf'), []
for a_opt in A:#统计后续状态的最大价值以及到达该状态的行为(可能不止一个)
s_prime,reward,_ = dynamics(s, a_opt)
v_s_prime = get_value(V, s_prime)
if v_s_prime > max_v:
max_v = v_s_prime
a_max_v = [a_opt]
elif v_s_prime == max_v:
a_max_v.append(a_opt)
n = len(a_max_v)
if n == 0: return 0.0
return 1.0/n if a in a_max_v else 0.0
def get_pi(Pi, s, a, MDP = None, V = None):
return Pi(MDP, V, s, a)
def get_prob(P, s, a, s1):#获取状态转移概率
return P(s, a, s1)
def get_reward(R, s, a):#获取奖励值
return R(s,a)
def set_value(V, s, v):#设置价值字典
V[s] = v
def get_value(V, s):#获取状态价值
return V[s]
def display_V(V):#显示状态价值
for i in range(16):
print('{0:>6.2f}'.format(V[i]), end = " ")
if(i+1) % 4 == 0:
print("")
print()
def compute_q(MDP, V, s, a):
'''
根据给定的MDP,价值函数V,计算状态行为对s,a的价值q_sa
:param MDP: 马尔科夫决策函数
:param V: 价值函数V
:param s: 状态
:param a: 行为
:return: 在s状态下a行为的价值q_sa
'''
S, A, R, P, gamma = MDP
q_sa = 0
for s_prime in S:
q_sa += get_prob(P, s, a, s_prime) * get_value(V, s_prime)
q_sa = get_reward(R, s, a) + gamma * q_sa
return q_sa
def compute_v(MDP, V, Pi, s):
'''
给定MDP下依据某一策略Pi和当前状态价值函数V计算某状态s的价值
:param MDP: 马尔科夫决策过程
:param V: 状态价值函数V
:param Pi: 策略
:param s: 某状态s
:return: 某状态的价值v_s
'''
S, A, R, P, gamma = MDP
v_s = 0
for a in A:
v_s += get_pi(Pi, s, a, MDP, V) * compute_q(MDP, V, s, a)#一个状态的价值可以由该状态下所有行为价值表达
return v_s
def update_V(MDP, V, Pi):
'''
给定一个MDP和一个策略,更新该策略下的价值函数
'''
S, _, _, _,_ = MDP
V_prime = V.copy()
for s in S:
set_value(V_prime, s, compute_v(MDP, V_prime, Pi, s))
return V_prime
def policy_evaluate(MDP, V, Pi, n):
'''
使用n次迭代计算来评估一个MDP在给定策略Pi下的状态价值,初始时为V
'''
for i in range(n):
V = update_V(MDP, V, Pi)
return V
def policy_iterate(MDP, V, Pi, n, m):
for i in range(m):
V = policy_evaluate(MDP, V, Pi, n)
Pi = greedy_pi # 第一次迭代产生新的价值函数后随机使用贪婪策略
return V
def compute_v_from_max_q(MDP, V, s):
'''
根据一个状态下的所有可能的行为价值中最大一个来确定当前状态价值
'''
S, A, R, P, gamma = MDP
v_s = -float('inf')
for a in A:
qsa = compute_q(MDP, V, s, a)
if qsa >= v_s:
v_s = qsa
return v_s
def update_V_without_pi(MDP, V):
'''
在不以来策略的情况下直接通过后续状态的价值来更新状态价值
'''
S, _, _, _, _ = MDP
V_prime = V.copy()
for s in S:
set_value(V_prime, s, compute_v_from_max_q(MDP, V_prime, s))
return V_prime
def value_iterate(MDP, V, n):
'''
价值迭代
'''
for i in range(n):
V = update_V_without_pi(MDP, V)
return V
V = [0 for _ in range(16)] #状态价值
V_pi = policy_evaluate(MDP, V, uniform_random_pi, 100)
display_V(V_pi)
V = [0 for _ in range(16)] #重置状态价值
V_pi = policy_evaluate(MDP, V, greedy_pi, 100)
display_V(V_pi)
# 0.00 -14.00 -20.00 -22.00
# -14.00 -18.00 -20.00 -20.00
# -20.00 -20.00 -18.00 -14.00
# -22.00 -20.00 -14.00 0.00
#
# 0.00 -1.00 -2.00 -3.00
# -1.00 -2.00 -3.00 -2.00
# -2.00 -3.00 -2.00 -1.00
# -3.00 -2.00 -1.00 0.00
V = [0 for _ in range(16)] #重置状态价值
V_pi = policy_iterate(MDP, V, greedy_pi,1,100)
display_V(V_pi)
# 0.00 - 1.00 - 2.00 - 3.00
# -1.00 - 2.00 - 3.00 - 2.00
# -2.00 - 3.00 - 2.00 - 1.00
# -3.00 - 2.00 - 1.00
# 0.00
V_star = value_iterate(MDP,V,4)
display_V(V_star)
# 0.00 - 1.00 - 2.00 - 3.00
# -1.00 - 2.00 - 3.00 - 2.00
# -2.00 - 3.00 - 2.00 - 1.00
# -3.00 - 2.00 - 1.00 0.00
def greedy_policy(MDP, V, s):
S, A, P, R, gamma = MDP
max_v, a_max_v = -float('inf'), []
for a_opt in A: #统计后续状态的最大价值以及到达该状态的行为(可能不止一个)
s_prime, reward, _ = dynamics(s, a_opt)
v_s_prime = get_value(V, s_prime)
if v_s_prime > max_v:
max_v = v_s_prime
a_max_v = a_opt
elif v_s_prime == max_v:
a_max_v += a_opt
return str(a_max_v)
def display_policy(policy, MDP, V):
S, A, P, R, gamma = MDP
for i in range(16):
print('{0:^6}'.format(policy(MDP, V, S[i])), end=" ")
if(i + 1) % 4 == 0:
print("")
print()
display_policy(greedy_policy, MDP, V_star)
# nesw w w sw
# n nw nesw s
# n nesw es s
# ne e e nesw