Tensorflow是实验深度学习算法的绝佳工具。但是要利用深度学习的力量,需要利用计算能力和良好的工程技术。最终需要使用多个GPU,甚至可能需要多个流程才能实现目标。建议先阅读TensorFlow关于GPU 的官方教程。
https://www.tensorflow.org/guide/using_gpu
一个过程,很多GPU
这是最常见的情况,因为大多数深度学习社区正在进行监督学习,具有大数据集(图像,文本,声音......)和许多参数。这也是最困难的一个:需要在多个计算单元上并行化反向传播。
Jonathan Hui 在2017年发表了一篇很好的文章,可以直接阅读。
https://jhui.github.io/2017/03/07/TensorFlow-GPU/
多个进程,许多GPU
这是本文的真正意义所在。如果正在从事强化学习或“奇特”类型的学习,例如遗传算法或储层计算,可能会注意到有多个过程是必不可少的。
经验
将尝试以解决蛇的游戏为例。蛇是一条正方形的链,目标是在网格上吃水果。吃水果时,蛇的长度增加一个,并且在网格上随机出现新的水果。当他(不小心)吃掉他的尾巴时,蛇会失去。
贪吃蛇,红点是水果
将同时播放多个代理以加速学习过程。
代理
将使用一个简单的卷积神经网络,但可以使用任何想要的模型。例如也可以使用密集神经网络或决策树。
这个游戏不是“动态的”:代理人需要采取的政策只取决于最后一帧。因此网络最后一帧,开发的python版本中的10x10图像。使用100个4x4过滤器,然后使用200个3x3过滤器。最终展平了卷积,添加了200个密集层,以及长度为4的最终输出层,用于4种可能的操作(向上,向右,向左,向下)。
学习
不会详细介绍,因为这不是重点。例如可以使用策略渐变,其中输出层包含每个操作的概率,算法的概念是“提升”与其导致的分数相关的操作。
还可以使用Q-learning,其中输出图层包含指定状态(输入框架)中每个动作的平均分数,并采用这些分数的argmax来选择动作。
最后,还可以使用遗传算法,其中的想法是为参数(这里是网络的权重)添加噪声,并且只保留最佳代理。
让开始多处理吧
好的,现在可以谈论多处理。一般来说,这不是一件容易的事。在这里,不谈多线程,这种方式更简单但功能也更少。
多处理意味着多核。需要与要启动的进程一样多的内核(有时内核可以处理多个“线程”,因此这是最后关注的数字)。
将使用AWS的实例p3.8xlarge,提供32个vCores和4个V100显卡。AWS租金约为12美元/小时,而此套装的投资额约为45,000美元,加上运行所需的能源成本。
因此,可以同时运行32个不同的代理,每个代理在一个单独的流程中。将在python中使用“多处理”包。这个包允许启动进程并创建管道以与它们通信。以下是架构的拓扑:
多处理图
有32个工作进程和1个主进程。工作进程只是在玩游戏来收集数据并将其发送到主进程,主进程将训练这些数据并将新网络保存在文件中。然后,工作人员收到加载新网络,加载并再次播放N个游戏的消息。因此,需要从主进程启动32个进程,并在主进程和每个进程(即32个管道)之间创建一个管道。还需要在主进程内创建线程以异步侦听管道。这是它的代码:
class MasterProcess():
def __init__(self, verbose=False):
self.processes = {}
def train_agents(self):
pipes = {}
for i in range(0, variables.n_process+1):
parent_conn, child_conn = Pipe()
pipes[i] = parent_conn
p = AgentProcess(conn=child_conn, id=i, n_games=variables.n_per_process)
p.start()
self.processes[i] = p
scores = {}
batchs = {}
t0 = time.time()
def listenToAgent(id, scores):
while True:
msg = pipes[id].recv()
if msg == "saved":
print("Master process (0) saved his weights.")
for j in pipes:
if(j != 0):
pipes[j].send("load")
else:
score = float(msg[0])
scores[id] = score
batchs[id] = msg[1]
print("Process "+str(id)+" returns score "+str(score))
threads_listen = []
print("Threads to start")
for id in pipes:
t = threading.Thread(target=listenToAgent, args=(id,scores))
t.start()
threads_listen.append(t)
print("Threads started")
window = 50000 // (variables.n_process*variables.n_per_process)
iter = 1
mean_scores = []
file = open("log_scores", "w")
while True:
if(len(scores) == variables.n_process):
id_best = min(scores, key=scores.get)
mean_scores.append(np.mean(list(scores.values())))
print("End of iteration "+str(iter)+". Mean score sor far : "+str(np.mean(mean_scores)))
iter += 1
file.write(str(np.mean(mean_scores))+"\n")
print("Time : "+str(time.time()-t0))
print("\n")
pipes[0].send(("train_with_batchs", list(batchs.values())))
t0 = time.time()
scores.clear()
batchs.clear()
if(len(mean_scores) >= window):
mean_scores = mean_scores[1:]
如您所见,在开始时创建流程和管道,然后初始化词典以存储最后的分数和批量数据(键是流程ID)。然后创建线程来监听代理并启动它们。通信协议非常简单,只有一个单词消息,如“已保存”或“train_with_batchs”。在进程之间进行通信并不容易,因为只需要传递可序列化的对象,因此基本上是易于解析的数据。例如,无法直接传递Tensorflow会话。最后,在将分数的移动平均值存储在文件中的同时玩游戏。
现在来看看AgentProcess类,它非常简单:
class AgentProcess(Process):
def __init__(self, conn, id, n_games):
super(AgentProcess,self).__init__()
self.conn = conn
self.n_games = n_games
self.id = id
self.msg_queue = []
np.random.seed(self.id*100)
def run(self):
self.agent = A2C(self.id)
def treatQueue():
msg = self.conn.recv()
if msg == "load":
self.agent.load_model()
print("Process "+str(self.id)+" loaded the master (0) model.")
if msg[0] == "train_with_batchs":
print("Master process is training ...")
t0 = time.time()
self.agent.train_with_batchs(msg[1])
self.agent.save_model()
print("Master process finished training. Time : "+str(time.time()-t0)+" \n")
self.conn.send("saved")
while True:
if(self.id != 0):
batch_values = []
batch_states = []
batch_actions = []
print("Process "+str(self.id)+" starts playing "+str(self.n_games)+" games.")
scores = []
env = SnakeEnv()
overall_data = 0
for i in range(self.n_games):
state = env.init()
t = 0
lastScoring = -1
while True:
action = self.agent([state])
newState, reward, done = env.step(action)
if(reward == 1):
for j in range(t - lastScoring):
batch_values.append(1)
lastScoring = t
batch_states.append([state])
batch_actions.append(action)
t += 1
if(done or (t - lastScoring >= 100)):
for j in range(t - lastScoring - 1):
batch_values.append(0)
break
state = newState
scores.append(env.score)
overall_data += t
if(overall_data >= 10000):
break
print("Process "+str(self.id)+" finished playing.")
batch = (batch_states, batch_actions, batch_values)
self.conn.send((np.mean(scores),batch))
treatQueue()
代理正在创建其AI模型,并正在使用它来玩游戏。评分方法不是我的重点,但您=可以检查并自己调整以获得更好的性能。“数据”是(状态,行动,奖励)的三倍。相当简单吧?
GPU分配和内存
默认情况下,Tensorflow会为模型选择第一个可用GPU,并在设备上为进程分配完整内存。不想要两个!希望工作进程共享一个模型,但是为自己的用法分配自己的GPU集部分。
共享模型非常困难,因为Tensorflow不允许在多个进程之间轻松共享图形或会话。目前正在深入了解Tensorflow,看看它是否可行并提高性能。目前,所拥有的唯一解决方案是在每个进程中实现一个新的Tensorflow核心,即在AgentProcess类中调用“import tensorflow”。每个流程都有自己的图表和会话。
对于GPU分配,有32个进程,4个GPU,每个16GB内存。增加每个进程的内存可以提高运行模型的进程速度。但内存有限,所以必须手动进行非常严格的优化......训练由主进程完成,需要大量内存,因此为他分配了几乎一整个GPU。
最终的分配是[3,10,10,10](每个GPU的进程数,其中第一个也包含主数据)。要限制内存,可以使用per_process_gpu_memory_fraction或gpu_options.allow_growth为每个进程手动限制比例,这将处理内存(在初始化时不分配所有内存,仅在需要时增加它)。这是代码:
n_gpu = 4
if(n_gpu == 1):
self._build_model()
self._build_train_op()
else:
# 1 GPU for training, n-1 for playing
if(self.id <= 3):
gpu_id = 0
else:
gpu_id = (1 + (self.id%(n_gpu-1)))
os.environ["CUDA_VISIBLE_DEVICES"]=str(gpu_id)
self._build_model()
self._build_train_op()
为了强制进程使用特定的GPU,使用环境变量CUDA_VISIBLE_DEVICES,它独立于分配工作进程的主进程。
很明显,增加进程数会提高性能,因为已经处理了更多批次。
结论
可以使用Tensorflow进行多处理,并在“相当”强大的机器上进行真正的强化学习。请记住,机器学习不是关于如何设想算法,而是主要关于如何有效地构建算法。
这是整个github回购。
https://github.com/fazega/snake-rls