前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >【强化学习】DQN 在运筹学中的应用

【强化学习】DQN 在运筹学中的应用

作者头像
阿泽 Crz
发布2020-11-09 10:38:14
8340
发布2020-11-09 10:38:14
举报

前段时间给出了 Q-Learning 在排班调度中的应用,现在给出 DQN 的实现。

1.背景

(搬运下背景)

假设有一个客服排班的任务,我们需要为 100 个人安排一个星期的排班问题,并且有以下约束条件:

  • 一天被划分为 24 个时间段,即每个时间段为 1 个小时;
  • 每个客服一个星期需要上七天班,每次上班八小时;
  • 每个客服两次上班时间需要间隔 12 小时;
  • 客服值班时,一个星期最早是 0,最晚 24*7 - 1。

评判标准:

  • 现在有每个时间段所需客服人数,我们希望每个时段排班后的人数与实际人数尽量相近。

最优化问题可以使用启发式算法来做,上次用强化学习,这次用深度强化学习。

2.代码

对 DQN 不太了解的可以去看先前的文章(我们用的是 2013 版的 DQN,没有双网络)。

相对 Q-Learning 来说,不仅改变了 Agent,还在 Env 方面做了一些改进,具体的改变可以看下代码。

代码语言:javascript
复制
import random
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from copy import deepcopy
from collections import defaultdict, deque

gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
# GPU 随使用量增长
tf.config.experimental.set_memory_growth(gpus[0], True)
# 设定最大显存
tf.config.experimental.set_virtual_device_configuration(
    gpus[0],
    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024*12)]
)

环境这里主要是改变了 list_2_str。原本的把所有的 list 拼接成 string 作为 state,现在需要把二维 list 展开成一维 list 作为神经网络的输入。

代码语言:javascript
复制
person_n = 10  # 给 10 个人排班

class Env():
    def __init__(self):
        # 10 个人, 7 天,每个 bar 都可以向左向右移动,也可以不移动 '-1'
        self.actions_space = ['{}{}L'.format(i, j) for i in range(person_n) for j in range(7)] + \
                                    ['{}{}R'.format(i, j) for i in range(person_n) for j in range(7)] + ['-1']
        self.n_actions = len(self.actions_space)
        self.act_list = [random.randint(int(person_n)/2, int(person_n)) for i in range(24*7)]
        self.w_list = [i / sum(self.act_list) for i in self.act_list]
        self.state = [[i*24 for i in range(7)] for i in range(person_n)]
        self.n_state = person_n * 7 * 24
        self.punish = -1000
        print(self.act_list)
    
    def list_2_str(self, l):
        # 拼接完整的 list
        state_list = [[0] * 24 * 7] * person_n
        for person in range(person_n):
            for i in l[person]:
                for j in range(8):
                    state_list[person][i+j] = 1
        return [i for state in state_list for i in state]
    
    def reset(self):
        self.state = [[i*24 for i in range(7)] for i in range(person_n)]
        return self.list_2_str(self.state)
    
    # 给当前排班打分,考虑权重
    def reward(self, tmp_state):
        # 判断每个人的排班要间隔 8+12 小时,否则 socre = -1000
        for i in range(person_n):
            # 星期天和星期一的排班间隔 8+12 小时
            if (tmp_state[i][0] + (24*7-1) - tmp_state[i][6]) < 20:
                return self.punish
            for j in range(6):
                if (tmp_state[i][j+1] - tmp_state[i][j]) < 20:
                    return self.punish
        # 拼接完整的 list
        state_list = [[0] * 24 * 7] * person_n
        for person in range(person_n):
            for i in tmp_state[person]:
                for j in range(8):
                    state_list[person][i+j] = 1
        plan_list = np.sum(state_list, axis=0).tolist()
        s_list = [abs(plan_list[i] - self.act_list[i])/self.act_list[i] for i in range(len(plan_list))]
        # 奖励越大越好,所以加个负号
        score = -np.sum([s_list[i]*self.w_list[i] for i in range(len(s_list))])
        return score
    
    def step(self, action):
        actions_str = self.actions_space[action]
        if actions_str == '-1':
            return self.list_2_str(self.state), self.reward(self.state)
        else:
            num = int(actions_str[0])
            day = int(actions_str[1])
            move = actions_str[2]
            # 这里做了点改进,不好的状态就不作为当前状态了,所以创建了一个临时状态来判断是否是好状态。
            tmp_state = deepcopy(self.state)
            if move == 'R':
                if tmp_state[num][day] == (24*7-8-1):
                    tmp_state[num][day] = tmp_state[num][day] + 1
                    return self.list_2_str(tmp_state), self.punish
                tmp_state[num][day] = tmp_state[num][day] + 1
            if move == 'L':
                if tmp_state[num][day] == 0:
                    tmp_state[num][day] = tmp_state[num][day] - 1
                    return self.list_2_str(tmp_state), self.punish
                tmp_state[num][day] = tmp_state[num][day] - 1
            reward = self.reward(tmp_state)
            if reward == self.punish:
                return self.list_2_str(tmp_state), self.punish
            self.state = tmp_state
            return self.list_2_str(self.state), self.reward(self.state)

创建 DQNAgent

代码语言:javascript
复制
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.discount_factor = 0.9
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.999
        self.learning_rate = 0.01
        self.model = self._build_model()
    
    def _build_model(self):
        model = tf.keras.Sequential()
        model.add(layers.Dense(512, input_dim=self.state_size, activation='relu'))
        model.add(layers.BatchNormalization())
        model.add(layers.Dropout(0.5))
        model.add(layers.Dense(512, activation='relu'))
        model.add(layers.BatchNormalization())
        model.add(layers.Dropout(0.5))
        model.add(layers.Dense(256, activation='relu'))
        model.add(layers.BatchNormalization())
        model.add(layers.Dropout(0.5))
        model.add(layers.Dense(self.action_size, activation='softmax'))
        model.compile(loss='mse', optimizer=tf.keras.optimizers.Adam(lr=self.learning_rate))
        return model
    
    def memorize(self, state, action, reward, next_state):
        # 收集样本
        self.memory.append((state, action, reward, next_state))
    
    def get_action(self, state):
        # 得到行为
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        act_values = act_values[0]
        max_action = np.random.choice(np.where(act_values == np.max(act_values))[0])
        return max_action  # returns action
    
    def replay(self, batch_size):
        # 训练
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state in minibatch:
            target = reward + self.discount_factor * np.amax(self.model.predict(next_state)[0])
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def load(self, name):
        self.model.load_weights(name)

    def save(self, name):
        self.model.save_weights(name)

训练

代码语言:javascript
复制
env = Env()
agent = DQNAgent(env.n_state, env.n_actions)

episodes = 1
batch_size = 32
bst_reward = -500
bst_state = env.state

for e in range(episodes):
    state = env.reset()
    print('---------- ', e, ' ------------')
 # 训练 1w 次就行了,比较慢
    for i in range(10000):
        state = np.reshape(state, [1, env.n_state])
        action = agent.get_action(state)
        next_state, reward = env.step(action)
        next_state = np.reshape(next_state, [1, env.n_state])
        # 不好的状态就不记录了
        if reward != -1000:
            state = deepcopy(next_state)
            agent.memorize(state, action, reward, next_state)
            if len(agent.memory) > batch_size:
                agent.replay(batch_size)
        if bst_reward < reward:
            bst_reward = reward
            bst_state = deepcopy(env.state)
            print('episode: {}/{}, i:{}, reward: {}, e: {:.2}'.format(e, episodes, i, bst_reward, agent.epsilon))

代码语言:javascript
复制
episode: 0/1, i:0, reward: -0.7850318471337578, e: 1.0
episode: 0/1, i:1, reward: -0.7770700636942675, e: 1.0
episode: 0/1, i:2, reward: -0.7722929936305731, e: 1.0
episode: 0/1, i:3, reward: -0.7675159235668789, e: 1.0
episode: 0/1, i:4, reward: -0.7627388535031847, e: 1.0
... ...
episode: 0/1, i:2333, reward: -0.36305732484076436, e: 0.25
episode: 0/1, i:2424, reward: -0.3598726114649682, e: 0.24
episode: 0/1, i:2796, reward: -0.3535031847133758, e: 0.21
episode: 0/1, i:3179, reward: -0.3487261146496815, e: 0.19
episode: 0/1, i:7086, reward: -0.34076433121019106, e: 0.086

最终的拟合度为 0.34。

对 Q-Learning 进行了类似的修改,保证其除了 Agent 外其他策略都一致(除了迭代次数,DQN 为 1 w,Q-Learning 为 10 w),得到的结果为(比之前的 Q-Learning 效果要好很多):

代码语言:javascript
复制
episode: 0/1, i:0, reward: -0.7874015748031497, e: 0.1
episode: 0/1, i:1, reward: -0.7826771653543307, e: 0.1
episode: 0/1, i:3, reward: -0.7795275590551182, e: 0.1
episode: 0/1, i:4, reward: -0.7748031496062993, e: 0.1
episode: 0/1, i:6, reward: -0.7716535433070866, e: 0.1
episode: 0/1, i:7, reward: -0.7685039370078741, e: 0.1
... ...
episode: 0/1, i:7898, reward: -0.352755905511811, e: 0.1
episode: 0/1, i:7906, reward: -0.35118110236220473, e: 0.1
episode: 0/1, i:7916, reward: -0.34488188976377954, e: 0.1
episode: 0/1, i:8077, reward: -0.3401574803149606, e: 0.1
episode: 0/1, i:8130, reward: -0.33700787401574805, e: 0.1
episode: 0/1, i:16242, reward: -0.3291338582677166, e: 0.1

可以看到,Q-Learning 相比于 DQN 来说速度更快(1 分钟迭代 10 W 次),效果更好一点。

当然,这也只是在当前场景下,使用了简单的模型,大家可以进行更多的尝试。

此外,我还实验了用 CNN 来代替 NN,但效果不是太好(-0.44)。

后面可能会去试下 DQN 的诸多改进版。

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-11-04,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 阿泽的学习笔记 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1.背景
  • 2.代码
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档