前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >多GPU,具有Tensorflow的多进程

多GPU,具有Tensorflow的多进程

作者头像
代码医生工作室
发布2019-07-22 10:20:40
2.2K0
发布2019-07-22 10:20:40
举报
文章被收录于专栏:相约机器人

Tensorflow是实验深度学习算法的绝佳工具。但是要利用深度学习的力量,需要利用计算能力和良好的工程技术。最终需要使用多个GPU,甚至可能需要多个流程才能实现目标。建议先阅读TensorFlow关于GPU 的官方教程。

https://www.tensorflow.org/guide/using_gpu

一个过程,很多GPU

这是最常见的情况,因为大多数深度学习社区正在进行监督学习,具有大数据集(图像,文本,声音......)和许多参数。这也是最困难的一个:需要在多个计算单元上并行化反向传播。

Jonathan Hui 在2017年发表了一篇很好的文章,可以直接阅读。

https://jhui.github.io/2017/03/07/TensorFlow-GPU/

多个进程,许多GPU

这是本文的真正意义所在。如果正在从事强化学习或“奇特”类型的学习,例如遗传算法或储层计算,可能会注意到有多个过程是必不可少的。

经验

将尝试以解决蛇的游戏为例。蛇是一条正方形的链,目标是在网格上吃水果。吃水果时,蛇的长度增加一个,并且在网格上随机出现新的水果。当他(不小心)吃掉他的尾巴时,蛇会失去。

贪吃蛇,红点是水果

将同时播放多个代理以加速学习过程。

代理

将使用一个简单的卷积神经网络,但可以使用任何想要的模型。例如也可以使用密集神经网络或决策树。

这个游戏不是“动态的”:代理人需要采取的政策只取决于最后一帧。因此网络最后一帧,开发的python版本中的10x10图像。使用100个4x4过滤器,然后使用200个3x3过滤器。最终展平了卷积,添加了200个密集层,以及长度为4的最终输出层,用于4种可能的操作(向上,向右,向左,向下)。

学习

不会详细介绍,因为这不是重点。例如可以使用策略渐变,其中输出层包含每个操作的概率,算法的概念是“提升”与其导致的分数相关的操作。

还可以使用Q-learning,其中输出图层包含指定状态(输入框架)中每个动作的平均分数,并采用这些分数的argmax来选择动作。

最后,还可以使用遗传算法,其中的想法是为参数(这里是网络的权重)添加噪声,并且只保留最佳代理。

让开始多处理吧

好的,现在可以谈论多处理。一般来说,这不是一件容易的事。在这里,不谈多线程,这种方式更简单但功能也更少。

多处理意味着多核。需要与要启动的进程一样多的内核(有时内核可以处理多个“线程”,因此这是最后关注的数字)。

将使用AWS的实例p3.8xlarge,提供32个vCores和4个V100显卡。AWS租金约为12美元/小时,而此套装的投资额约为45,000美元,加上运行所需的能源成本。

因此,可以同时运行32个不同的代理,每个代理在一个单独的流程中。将在python中使用“多处理”包。这个包允许启动进程并创建管道以与它们通信。以下是架构的拓扑:

多处理图

有32个工作进程和1个主进程。工作进程只是在玩游戏来收集数据并将其发送到主进程,主进程将训练这些数据并将新网络保存在文件中。然后,工作人员收到加载新网络,加载并再次播放N个游戏的消息。因此,需要从主进程启动32个进程,并在主进程和每个进程(即32个管道)之间创建一个管道。还需要在主进程内创建线程以异步侦听管道。这是它的代码:

代码语言:javascript
复制
class MasterProcess():
    def __init__(self, verbose=False):
        self.processes = {}
 
    def train_agents(self):
        pipes = {}
        for i in range(0, variables.n_process+1):
            parent_conn, child_conn = Pipe()
            pipes[i] = parent_conn
            p = AgentProcess(conn=child_conn, id=i, n_games=variables.n_per_process)
            p.start()
            self.processes[i] = p
 
        scores = {}
        batchs = {}
        t0 = time.time()
        def listenToAgent(id, scores):
            while True:
                msg = pipes[id].recv()
                if msg == "saved":
                    print("Master process (0) saved his weights.")
                    for j in pipes:
                        if(j != 0):
                            pipes[j].send("load")
                else:
                    score = float(msg[0])
                    scores[id] = score
                    batchs[id] = msg[1]
                    print("Process "+str(id)+" returns score "+str(score))
 
        threads_listen = []
        print("Threads to start")
        for id in pipes:
            t = threading.Thread(target=listenToAgent, args=(id,scores))
            t.start()
            threads_listen.append(t)
        print("Threads started")
 
        window = 50000 // (variables.n_process*variables.n_per_process)
        iter = 1
        mean_scores = []
        file = open("log_scores", "w")
        while True:
            if(len(scores) == variables.n_process):
                id_best = min(scores, key=scores.get)
                mean_scores.append(np.mean(list(scores.values())))
                print("End of iteration "+str(iter)+". Mean score sor far : "+str(np.mean(mean_scores)))
                iter += 1
                file.write(str(np.mean(mean_scores))+"\n")
                print("Time : "+str(time.time()-t0))
                print("\n")
                pipes[0].send(("train_with_batchs", list(batchs.values())))
                t0 = time.time()
                scores.clear()
                batchs.clear()
 
            if(len(mean_scores) >= window):
                mean_scores = mean_scores[1:]

如您所见,在开始时创建流程和管道,然后初始化词典以存储最后的分数和批量数据(键是流程ID)。然后创建线程来监听代理并启动它们。通信协议非常简单,只有一个单词消息,如“已保存”或“train_with_batchs”。在进程之间进行通信并不容易,因为只需要传递可序列化的对象,因此基本上是易于解析的数据。例如,无法直接传递Tensorflow会话。最后,在将分数的移动平均值存储在文件中的同时玩游戏。

现在来看看AgentProcess类,它非常简单:

代码语言:javascript
复制
class AgentProcess(Process):
    def __init__(self, conn, id, n_games):
        super(AgentProcess,self).__init__()
        self.conn = conn
        self.n_games = n_games
        self.id = id
        self.msg_queue = []
        np.random.seed(self.id*100)
 
    def run(self):
        self.agent = A2C(self.id)
 
        def treatQueue():
            msg = self.conn.recv()
            if msg == "load":
                self.agent.load_model()
                print("Process "+str(self.id)+" loaded the master (0) model.")
 
            if msg[0] == "train_with_batchs":
                print("Master process is training ...")
                t0 = time.time()
                self.agent.train_with_batchs(msg[1])
                self.agent.save_model()
                print("Master process finished training. Time : "+str(time.time()-t0)+" \n")
                self.conn.send("saved")
 
        while True:
            if(self.id != 0):
                batch_values = []
                batch_states = []
                batch_actions = []
                print("Process "+str(self.id)+" starts playing "+str(self.n_games)+" games.")
                scores = []
                env = SnakeEnv()
                overall_data = 0
                for i in range(self.n_games):
                    state = env.init()
                    t = 0
                    lastScoring = -1
                    while True:
                        action = self.agent([state])
                        newState, reward, done = env.step(action)
                        if(reward == 1):
                            for j in range(t - lastScoring):
                                batch_values.append(1)
                            lastScoring = t
 
                        batch_states.append([state])
                        batch_actions.append(action)
                        t += 1
                        if(done or (t - lastScoring >= 100)):
                            for j in range(t - lastScoring - 1):
                                batch_values.append(0)
                            break
                        state = newState
                    scores.append(env.score)
                    overall_data += t
 
                    if(overall_data >= 10000):
                        break
                print("Process "+str(self.id)+" finished playing.")
                batch = (batch_states, batch_actions, batch_values)
                self.conn.send((np.mean(scores),batch))
            treatQueue()

代理正在创建其AI模型,并正在使用它来玩游戏。评分方法不是我的重点,但您=可以检查并自己调整以获得更好的性能。“数据”是(状态,行动,奖励)的三倍。相当简单吧?

GPU分配和内存

默认情况下,Tensorflow会为模型选择第一个可用GPU,并在设备上为进程分配完整内存。不想要两个!希望工作进程共享一个模型,但是为自己的用法分配自己的GPU集部分。

共享模型非常困难,因为Tensorflow不允许在多个进程之间轻松共享图形或会话。目前正在深入了解Tensorflow,看看它是否可行并提高性能。目前,所拥有的唯一解决方案是在每个进程中实现一个新的Tensorflow核心,即在AgentProcess类中调用“import tensorflow”。每个流程都有自己的图表和会话。

对于GPU分配,有32个进程,4个GPU,每个16GB内存。增加每个进程的内存可以提高运行模型的进程速度。但内存有限,所以必须手动进行非常严格的优化......训练由主进程完成,需要大量内存,因此为他分配了几乎一整个GPU。

最终的分配是[3,10,10,10](每个GPU的进程数,其中第一个也包含主数据)。要限制内存,可以使用per_process_gpu_memory_fractiongpu_options.allow_growth为每个进程手动限制比例,这将处理内存(在初始化时不分配所有内存,仅在需要时增加它)。这是代码:

代码语言:javascript
复制
       n_gpu = 4
        if(n_gpu == 1):
            self._build_model()
            self._build_train_op()
        else:
            # 1 GPU for training, n-1 for playing
            if(self.id <= 3):
                gpu_id = 0
            else:
                gpu_id = (1 + (self.id%(n_gpu-1)))
            os.environ["CUDA_VISIBLE_DEVICES"]=str(gpu_id)
            self._build_model()
            self._build_train_op()

为了强制进程使用特定的GPU,使用环境变量CUDA_VISIBLE_DEVICES,它独立于分配工作进程的主进程。

很明显,增加进程数会提高性能,因为已经处理了更多批次。

结论

可以使用Tensorflow进行多处理,并在“相当”强大的机器上进行真正的强化学习。请记住,机器学习不是关于如何设想算法,而是主要关于如何有效地构建算法。

这是整个github回购。

https://github.com/fazega/snake-rls

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2019-07-16,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 相约机器人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档