如何实现深度强化学习(DQN)?

  • 回答 (7)
  • 关注 (0)
  • 查看 (956)

DQN(Deep Q-Learning)可谓是深度强化学习(Deep Reinforcement Learning,DRL)的开山之作,是将深度学习与强化学习结合起来从而实现从感知(Perception)到动作( Action )的端对端(End-to-end)学习的一种全新的算法 ,该如何实现深度强化学习(DQN)呢?

风华一代风华一代提问于
印度陆军医院补锅型码农回答于

DQN实现DEMO

找了很多DQN的例子,有原版的实现Atari的,也有Flappy Bird的,但是最简单的还是莫烦大神的Demo,github地址是:https://github.com/MorvanZhou/Reinforcement-learning-with-tensorflow。

其中,红色的方块代表寻宝人,黑色的方块代表陷阱,黄色的方块代表宝藏,我们的目标就是让寻宝人找到最终的宝藏。

这里,我们的状态可以用横纵坐标表示,而动作有上下左右四个动作。使用tkinter来做这样一个动画效果。宝藏的奖励是1,陷阱的奖励是-1,而其他时候的奖励都为0。

接下来,我们重点看一下我们DQN相关的代码。

定义相关输入

这了,我们用s代表当前状态,用a代表当前状态下采取的动作,r代表获得的奖励,s_代表转移后的状态。

self.s = tf.placeholder(tf.float32,[None,self.n_features],name='s')
self.s_ = tf.placeholder(tf.float32,[None,self.n_features],name='s_')
self.r = tf.placeholder(tf.float32,[None,],name='r')
self.a = tf.placeholder(tf.int32,[None,],name='a')

经验池

    def store_transition(self,s,a,r,s_):
        if not hasattr(self, 'memory_counter'):
            self.memory_counter = 0
        # hstack:Stack arrays in sequence horizontally
        transition = np.hstack((s,[a,r],s_))
        index = self.memory_counter % self.memory_size
        self.memory[index,:] = transition
        self.memory_counter += 1

双网络结构

target_net和eval_net的网络结构必须保持一致,这里我们使用的是两层全链接的神经网络,值得注意的一点是对于eval_net来说,网络的输入是当前的状态s,而对target_net网络来说,网络的输入是下一个状态s_,因为target_net的输出要根据贝尔曼公式计算q-target值,即

代码如下:

w_initializer, b_initializer = tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1)

# ------------------ build evaluate_net ------------------
with tf.variable_scope('eval_net'):
    e1 = tf.layers.dense(self.s,20,tf.nn.relu,kernel_initializer=w_initializer,
                         bias_initializer=b_initializer,name='e1'
                         )

    self.q_eval = tf.layers.dense(e1,self.n_actions,kernel_initializer=w_initializer,
                                  bias_initializer=b_initializer,name='q')

# ------------------ build target_net ------------------

with tf.variable_scope('target_net'):
    t1 = tf.layers.dense(self.s_, 20, tf.nn.relu, kernel_initializer=w_initializer,
                         bias_initializer=b_initializer, name='t1')
    self.q_next = tf.layers.dense(t1, self.n_actions, kernel_initializer=w_initializer,
                                  bias_initializer=b_initializer, name='t2')

每隔一定的步数,我们就要将target_net中的参数复制到eval_net中:

t_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='target_net')
e_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES,scope='eval_net')

with tf.variable_scope('soft_replacement'):
      self.target_replace_op = [tf.assign(t,e) for t,e in zip(t_params,e_params)]

计算损失并优化

首先,对于eval_net来说,我们只要得到当前的网络输出即可,但是我们定义的网络输出是四个动作对应的q-eval值,我们要根据实际的a来选择对应的q-eval值,这一部分的代码如下:

with tf.variable_scope('q_eval'):
    # tf.stack
    #a = tf.constant([1,2,3])
    # b = tf.constant([4,5,6])
    # c = tf.stack([a,b],axis=1)
    # [[1 4]
    #  [2 5]
    # [3 6]]
    a_indices = tf.stack([tf.range(tf.shape(self.a)[0], dtype=tf.int32), self.a], axis=1)
    # 用indices从张量params得到新张量
    # indices = [[0, 0], [1, 1]]
    # params = [['a', 'b'], ['c', 'd']]
    # output = ['a', 'd']
    # 这里self.q_eval是batch * action_number,a_indices是batch * 1,也就是说选择当前估计每个动作的Q值
    self.q_eval_wrt_a = tf.gather_nd(params=self.q_eval, indices=a_indices)

中间有几个函数不太了解的,上面都有详细的注释,如果还不是很理解的话,大家可以百度或者阅读相应函数的源码。

对于target_net网络来说,我们要根据下面的式子来计算q-target值:

第一部分的R我们是已经得到了的,剩下的就是根据贪心策略选择四个输出中最大的一个即可:

with tf.variable_scope('q_target'):
    q_target = self.r + self.gamma * tf.reduce_max(self.q_next,axis=1,name='Qmax_s_')
    # 一个节点被 stop之后,这个节点上的梯度,就无法再向前BP了
    self.q_target = tf.stop_gradient(q_target)

接下来,我们就可以定义我们的损失函数并选择优化器进行优化:

with tf.variable_scope('loss'):
    self.loss = tf.reduce_mean(tf.squared_difference(self.q_target,self.q_eval_wrt_a,name='TD_error'))

with tf.variable_scope('train'):
    self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)

网络的训练

每隔一定的步数,我们就要将eval_net中的参数复制到target_net中,同时我们要从经验池中选择batch大小的数据输入到网络中进行训练。

def learn(self):
    if self.learn_step_counter % self.replace_target_iter == 0:
        self.sess.run(self.target_replace_op)
        print('\ntarget_params_replaced\n')

    if self.memory_counter > self.memory_size:
        sample_index = np.random.choice(self.memory_size,size=self.batch_size)
    else:
        sample_index = np.random.choice(self.memory_counter,size = self.batch_size)

    batch_memory = self.memory[sample_index,:]

    _,cost = self.sess.run(
        [self._train_op,self.loss],
        feed_dict={
            self.s:batch_memory[:,:self.n_features],
            self.a:batch_memory[:,self.n_features],
            self.r:batch_memory[:,self.n_features+1],
            self.s_:batch_memory[:,-self.n_features:]
        }
    )
也许没主见的年轻人回答于

定义超参数 我们首先定义网络中的超参数,比如经验池的大小,两个网络的学习率等等:

MAX_EPISODES = 200
MAX_EP_STEPS = 200
LR_A = 0.001    # learning rate for actor
LR_C = 0.002    # learning rate for critic
GAMMA = 0.9     # reward discount
TAU = 0.01      # soft replacement
MEMORY_CAPACITY = 10000
BATCH_SIZE = 32

RENDER = False
ENV_NAME = 'Pendulum-v0'

定义网络输入 我们需要定义的placeholder包括当前的状态S,下一时刻的状态S',以及对应的奖励R,而动作A由Actor得到,因此不需要再定义:

self.S = tf.placeholder(tf.float32, [None, s_dim], 's')
self.S_ = tf.placeholder(tf.float32, [None, s_dim], 's_')
self.R = tf.placeholder(tf.float32, [None, 1], 'r')

构建两个网络 两个网络都是两层全链接的神经网络,Actor输出一个具体的动作,而Critic网络输出一个具体的Q值

def _build_a(self, s, scope, trainable):
    with tf.variable_scope(scope):
        net = tf.layers.dense(s, 30, activation=tf.nn.relu, name='l1', trainable=trainable)
        a = tf.layers.dense(net, self.a_dim, activation=tf.nn.tanh, name='a', trainable=trainable)
        return tf.multiply(a, self.a_bound, name='scaled_a')

def _build_c(self, s, a, scope, trainable):
    with tf.variable_scope(scope):
        n_l1 = 30
        w1_s = tf.get_variable('w1_s', [self.s_dim, n_l1], trainable=trainable)
        w1_a = tf.get_variable('w1_a', [self.a_dim, n_l1], trainable=trainable)
        b1 = tf.get_variable('b1', [1, n_l1], trainable=trainable)
        net = tf.nn.relu(tf.matmul(s, w1_s) + tf.matmul(a, w1_a) + b1)
        return tf.layers.dense(net, 1, trainable=trainable)  # Q(s,a)

soft模式参数更新 可以看到,我们这里进行的是soft模式的参数更新,每次在原来target-net参数的基础上,改变一丢丢,增加一点点eval-net的参数信息。

# networks parameters
self.ae_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Actor/eval')
self.at_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Actor/target')
self.ce_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Critic/eval')
self.ct_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='Critic/target')

# target net replacement
self.soft_replace = [[tf.assign(ta, (1 - TAU) * ta + TAU * ea), tf.assign(tc, (1 - TAU) * tc + TAU * ec)]
                     for ta, ea, tc, ec in zip(self.at_params, self.ae_params, self.ct_params, self.ce_params)]

定义两个网络的损失 关于两个网络的损失,我们之前已经详细介绍过了,这里只是对刚才思路的一个代码实现。

q_target = self.R + GAMMA * q_
# in the feed_dic for the td_error, the self.a should change to actions in memory
td_error = tf.losses.mean_squared_error(labels=q_target, predictions=q)
self.ctrain = tf.train.AdamOptimizer(LR_C).minimize(td_error, var_list=self.ce_params)

a_loss = - tf.reduce_mean(q)    # maximize the q
self.atrain = tf.train.AdamOptimizer(LR_A).minimize(a_loss, var_list=self.ae_params)

学习 我们首先要从经验池中取出一个batch的数据,然后训练我们的Actor和Critic

def learn(self):
    # soft target replacement
    self.sess.run(self.soft_replace)

    indices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)
    bt = self.memory[indices, :]
    bs = bt[:, :self.s_dim]
    ba = bt[:, self.s_dim: self.s_dim + self.a_dim]
    br = bt[:, -self.s_dim - 1: -self.s_dim]
    bs_ = bt[:, -self.s_dim:]

    self.sess.run(self.atrain, {self.S: bs})
    self.sess.run(self.ctrain, {self.S: bs, self.a: ba, self.R: br, self.S_: bs_})

存储经验

def store_transition(self, s, a, r, s_):
    transition = np.hstack((s, a, [r], s_))
    index = self.pointer % MEMORY_CAPACITY  # replace the old memory with new memory
    self.memory[index, :] = transition
    self.pointer += 1
咕噜sasa心之所向,金石为开回答于

使用DQN训练“接砖块”游戏

深度学习的开源类库比较多,比较著名的有tensorlow、caffe等。此处我们使用Tensorflow来训练游戏“接砖块”。

游戏截图如下:

通过点击鼠标左键、右键控制滑块的左右移动来接住小球,如果球碰到底面,则游戏结束

主要python代码如下(游戏本身的代码省略,此处主要关注算法代码):

#Game的定义类,此处Game是什么不重要,只要提供执行Action的方法,获取当前游戏区域像素的方法即可
class Game(object):
    def __init__(self):  #Game初始化
    # action是MOVE_STAY、MOVE_LEFT、MOVE_RIGHT  
    # ai控制棒子左右移动;返回游戏界面像素数和对应的奖励。(像素->奖励->强化棒子往奖励高的方向移动)  
    def step(self, action):

# learning_rate
LEARNING_RATE = 0.99
# 跟新梯度
INITIAL_EPSILON = 1.0
FINAL_EPSILON = 0.05
# 测试观测次数
EXPLORE = 500000 
OBSERVE = 500
# 记忆经验大小
REPLAY_MEMORY = 500000
# 每次训练取出的记录数
BATCH = 100
# 输出层神经元数。代表3种操作-MOVE_STAY:[1, 0, 0]  MOVE_LEFT:[0, 1, 0]  MOVE_RIGHT:[0, 0, 1] 
output = 3  # MOVE_STAY:[1, 0, 0]  MOVE_LEFT:[0, 1, 0]  MOVE_RIGHT:[0, 0, 1]
input_image = tf.placeholder("float", [None, 80, 100, 4])  # 游戏像素
action = tf.placeholder("float", [None, output])     # 操作
#定义CNN-卷积神经网络
def convolutional_neural_network(input_image):
    weights = {'w_conv1':tf.Variable(tf.zeros([8, 8, 4, 32])),
               'w_conv2':tf.Variable(tf.zeros([4, 4, 32, 64])),
               'w_conv3':tf.Variable(tf.zeros([3, 3, 64, 64])),
               'w_fc4':tf.Variable(tf.zeros([3456, 784])),
               'w_out':tf.Variable(tf.zeros([784, output]))}

    biases = {'b_conv1':tf.Variable(tf.zeros([32])),
              'b_conv2':tf.Variable(tf.zeros([64])),
              'b_conv3':tf.Variable(tf.zeros([64])),
              'b_fc4':tf.Variable(tf.zeros([784])),
              'b_out':tf.Variable(tf.zeros([output]))}

    conv1 = tf.nn.relu(tf.nn.conv2d(input_image, weights['w_conv1'], strides = [1, 4, 4, 1], padding = "VALID") + biases['b_conv1'])
    conv2 = tf.nn.relu(tf.nn.conv2d(conv1, weights['w_conv2'], strides = [1, 2, 2, 1], padding = "VALID") + biases['b_conv2'])
    conv3 = tf.nn.relu(tf.nn.conv2d(conv2, weights['w_conv3'], strides = [1, 1, 1, 1], padding = "VALID") + biases['b_conv3'])
    conv3_flat = tf.reshape(conv3, [-1, 3456])
    fc4 = tf.nn.relu(tf.matmul(conv3_flat, weights['w_fc4']) + biases['b_fc4'])

    output_layer = tf.matmul(fc4, weights['w_out']) + biases['b_out']
    return output_layer

#训练神经网络
def train_neural_network(input_image):
    predict_action = convolutional_neural_network(input_image)

    argmax = tf.placeholder("float", [None, output])
    gt = tf.placeholder("float", [None])

    action = tf.reduce_sum(tf.mul(predict_action, argmax), reduction_indices = 1)
    cost = tf.reduce_mean(tf.square(action - gt))
    optimizer = tf.train.AdamOptimizer(1e-6).minimize(cost)

    game = Game()
    D = deque()

    _, image = game.step(MOVE_STAY)
    image = cv2.cvtColor(cv2.resize(image, (100, 80)), cv2.COLOR_BGR2GRAY)
    ret, image = cv2.threshold(image, 1, 255, cv2.THRESH_BINARY)
    input_image_data = np.stack((image, image, image, image), axis = 2)
    #print ("IMG2:%s" %input_image_data)

    with tf.Session() as sess:
        sess.run(tf.initialize_all_variables())

        saver = tf.train.Saver()

        n = 0
        epsilon = INITIAL_EPSILON
        while True:
            #print("InputImageData:", input_image_data)
            action_t = predict_action.eval(feed_dict = {input_image : [input_image_data]})[0]

            argmax_t = np.zeros([output], dtype=np.int)
            if(random.random() <= INITIAL_EPSILON):
                maxIndex = random.randrange(output)
            else:
                maxIndex = np.argmax(action_t)
            argmax_t[maxIndex] = 1
            if epsilon > FINAL_EPSILON:
                epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EXPLORE

            reward, image = game.step(list(argmax_t))

            image = cv2.cvtColor(cv2.resize(image, (100, 80)), cv2.COLOR_BGR2GRAY)
            ret, image = cv2.threshold(image, 1, 255, cv2.THRESH_BINARY)
            image = np.reshape(image, (80, 100, 1))
            input_image_data1 = np.append(image, input_image_data[:, :, 0:3], axis = 2)

            D.append((input_image_data, argmax_t, reward, input_image_data1))

            if len(D) > REPLAY_MEMORY:
                D.popleft()

            if n > OBSERVE:
                minibatch = random.sample(D, BATCH)
                input_image_data_batch = [d[0] for d in minibatch]
                argmax_batch = [d[1] for d in minibatch]
                reward_batch = [d[2] for d in minibatch]
                input_image_data1_batch = [d[3] for d in minibatch]

                gt_batch = []

                out_batch = predict_action.eval(feed_dict = {input_image : input_image_data1_batch})

                for i in range(0, len(minibatch)):
                    gt_batch.append(reward_batch[i] + LEARNING_RATE * np.max(out_batch[i]))

                print("gt_batch:", gt_batch, "argmax:", argmax_batch)
                optimizer.run(feed_dict = {gt : gt_batch, argmax : argmax_batch, input_image : input_image_data_batch})

            input_image_data = input_image_data1
            n = n+1
            print(n, "epsilon:", epsilon, " " ,"action:", maxIndex, " " ,"reward:", reward)

train_neural_network(input_image)
efsdfarr网络工程师回答于

DQN算法流程

NIPS 2013版

Nature 2015版

两版的DQN都使用了经验池,而2015版的DQN增加了target-net,提高了算法稳定性。

青之软件宅男回答于

将原始的Q-learning转换成深度学习

将Q-Table的更新问题变成一个函数拟合问题,相近的状态得到相近的输出动作,通过更新参数 θ 使Q函数逼近最优Q值 。因此,DQN就是要设计一个神经网络结构,通过函数来拟合Q值。

bsrt123回答于

单纯的Q-Learning算法使用表来保存状态,一个1000×1000图像的像素状态数基本接近与无穷,故有了CNN+Q-Learning 即DQN算法,算法描述如下:

应用案例分享回答于

Google给出的深度络架构图如下:

网络的左边是输入,右边是输出。 游戏屏幕的图像先经过两个卷积层(论文中写的是三个),然后经过两个全连接层, 最后映射到游戏手柄所有可能的动作。各层之间使用ReLU激活函数。

扫码关注云+社区

领取腾讯云代金券