目录
1、简介2、算法细节3、代码3.1 主结构3.2 Actor Critic 网络3.3 Worker3.4 Worker并行工作4、参考
A3C是Google DeepMind 提出的一种解决Actor-Critic
不收敛问题的算法。我们知道DQN中很重要的一点是他具有经验池,可以降低数据之间的相关性,而A3C则提出降低数据之间的相关性的另一种方法:异步。
简单来说:A3C会创建多个并行的环境, 让多个拥有副结构的 agent 同时在这些并行环境上更新主结构中的参数. 并行中的 agent 们互不干扰, 而主结构的参数更新受到副结构提交更新的不连续性干扰, 所以更新的相关性被降低, 收敛性提高.
A3C的算法实际上就是将Actor-Critic放在了多个线程中进行同步训练. 可以想象成几个人同时在玩一样的游戏, 而他们玩游戏的经验都会同步上传到一个中央大脑. 然后他们又从中央大脑中获取最新的玩游戏方法。
这样, 对于这几个人, 他们的好处是: 中央大脑汇集了所有人的经验, 是最会玩游戏的一个, 他们能时不时获取到中央大脑的必杀招, 用在自己的场景中.
对于中央大脑的好处是: 中央大脑最怕一个人的连续性更新, 不只基于一个人推送更新这种方式能打消这种连续性. 使中央大脑不必像DQN,DDPG那样的记忆库也能很好的更新。
为了达到这个目的,我们要有两套体系, 可以看作中央大脑拥有global net
和他的参数, 每位玩家有一个global net
的副本local net
, 可以定时向global net
推送更新, 然后定时从global net
那获取综合版的更新.
如果在 tensorboard 中查看我们今天要建立的体系, 这就是你会看到的。
W_0
就是第0个worker
, 每个worker
都可以分享global_net
。
如果我们调用sync
中的pull
, 这个worker
就会从global_net
中获取到最新的参数.
如果我们调用sync
中的push
, 这个worker
就会将自己的个人更新推送去global_net
.
这次我们也是使用连续动作环境Pendulum
做例子。
我们使用了 Normal distribution 来选择动作, 所以在搭建神经网络的时候,actor
这边要输出动作的均值和方差. 然后放入 Normal distribution 去选择动作. 计算actor loss
的时候我们还需要使用到critic
提供的TD error
作为 gradient ascent 的导向.
而critic
只需要得到他对于state
的价值就好了. 用于计算TD error
.
这里因为代码有点多,有些部分会使用伪代码,完整代码最后会附上链接。
我们将Actor
和Critic
合并成一整套系统, 这样方便运行.
# 这个 class 可以被调用生成一个 global net.
# 也能被调用生成一个 worker 的 net, 因为他们的结构是一样的,
# 所以这个 class 可以被重复利用.
class ACNet(object):
def __init__(self, globalAC=None):
# 当创建 worker 网络的时候, 我们传入之前创建的 globalAC 给这个 worker
if 这是 global: # 判断当下建立的网络是 local 还是 global
with tf.variable_scope('Global_Net'):
self._build_net()
else:
with tf.variable_scope('worker'):
self._build_net()
# 接着计算 critic loss 和 actor loss
# 用这两个 loss 计算要推送的 gradients
with tf.name_scope('sync'): # 同步
with tf.name_scope('pull'):
# 更新去 global
with tf.name_scope('push'):
# 获取 global 参数
def _build_net(self):
# 在这里搭建 Actor 和 Critic 的网络
return 均值, 方差, state_value
def update_global(self, feed_dict):
# 进行 push 操作
def pull_global(self):
# 进行 pull 操作
def choose_action(self, s):
# 根据 s 选动作
这些只是在创建网络而已,worker
还有属于自己的class, 用来执行在每个线程里的工作.
每个worker
有自己的class, class 里面有他的工作内容work
class Worker(object):
def __init__(self, name, globalAC):
self.env = gym.make(GAME).unwrapped # 创建自己的环境
self.name = name # 自己的名字
self.AC = ACNet(name, globalAC) # 自己的 local net, 并绑定上 globalAC
def work(self):
# s, a, r 的缓存, 用于 n_steps 更新
buffer_s, buffer_a, buffer_r = [], [], []
while not COORD.should_stop() and GLOBAL_EP < MAX_GLOBAL_EP:
s = self.env.reset()
for ep_t in range(MAX_EP_STEP):
a = self.AC.choose_action(s)
s_, r, done, info = self.env.step(a)
buffer_s.append(s) # 添加各种缓存
buffer_a.append(a)
buffer_r.append(r)
# 每 UPDATE_GLOBAL_ITER 步 或者回合完了, 进行 sync 操作
if total_step % UPDATE_GLOBAL_ITER == 0 or done:
# 获得用于计算 TD error 的 下一 state 的 value
if done:
v_s_ = 0 # terminal
else:
v_s_ = SESS.run(self.AC.v, {self.AC.s: s_[np.newaxis, :]})[0, 0]
buffer_v_target = [] # 下 state value 的缓存, 用于算 TD
for r in buffer_r[::-1]: # 进行 n_steps forward view
v_s_ = r + GAMMA * v_s_
buffer_v_target.append(v_s_)
buffer_v_target.reverse()
buffer_s, buffer_a, buffer_v_target = np.vstack(buffer_s), np.vstack(buffer_a), np.vstack(buffer_v_target)
feed_dict = {
self.AC.s: buffer_s,
self.AC.a_his: buffer_a,
self.AC.v_target: buffer_v_target,
}
self.AC.update_global(feed_dict) # 推送更新去 globalAC
buffer_s, buffer_a, buffer_r = [], [], [] # 清空缓存
self.AC.pull_global() # 获取 globalAC 的最新参数
s = s_
if done:
GLOBAL_EP += 1 # 加一回合
break # 结束这回合
这里是重点,也就是Worker并行工作的计算
GLOBAL_AC = ACNet(GLOBAL_NET_SCOPE) # 建立 Global AC
workers = []
for i in range(N_WORKERS): # 创建 worker, 之后在并行
workers.append(Worker(GLOBAL_AC)) # 每个 worker 都有共享这个 global AC
COORD = tf.train.Coordinator() # Tensorflow 用于并行的工具
worker_threads = []
for worker in workers:
job = lambda: worker.work()
t = threading.Thread(target=job) # 添加一个工作线程
t.start()
worker_threads.append(t)
COORD.join(worker_threads) # tf 的线程调度
电脑里CPU有几个核就可以建立多少个worker
, 也就可以把它们放在CPU核数个线程中并行探索更新. 最后的学习结果可以用这个获取 moving average 的 reward 的图来概括.
完整代码链接:
https://github.com/cristianoc20/RL_learning/tree/master/A3C