Pytorch 框架支持多卡分布式并行训练网络,可以利用更大的显存得到更大的 batchsize,同时也会倍增训练速度,本文记录 Pytorch 多卡训练实现过程。
DataParallel 可以帮助我们(使用单进程控)将模型和数据加载到多个 GPU 中,控制数据在 GPU 之间的流动,协同不同 GPU 上的模型进行并行训练(细粒度的方法有 scatter,gather 等等)。
DataParallel 使用起来非常方便,我们只需要用 DataParallel 包装模型,再设置一些参数即可。需要定义的参数包括:参与训练的 GPU 有哪些,device_ids=gpus;用于汇总梯度的 GPU 是哪个,output_device=gpus0 。DataParallel 会自动帮我们将数据切分 load 到相应 GPU,将模型复制到相应 GPU,进行正向传播计算梯度并汇总:
model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])
值得注意的是,模型和数据都需要先 load 进 GPU 中,DataParallel 的 module 才能对其进行处理,否则会报错:
# 这里要 model.cuda()
model = nn.DataParallel(model.cuda(), device_ids=gpus, output_device=gpus[0])
for epoch in range(100):
   for batch_idx, (data, target) in enumerate(train_loader):
      # 这里要 images/target.cuda()
      images = images.cuda(non_blocking=True)
      target = target.cuda(non_blocking=True)
      ...
      output = model(images)
      loss = criterion(output, target)
      ...
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
汇总一下,DataParallel 并行训练部分主要与如下代码段有关:
# main.py
import torch
import torch.distributed as dist
gpus = [0, 1, 2, 3]
torch.cuda.set_device('cuda:{}'.format(gpus[0]))
train_dataset = ...
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=...)
model = ...
model = nn.DataParallel(model.to(device), device_ids=gpus, output_device=gpus[0])
optimizer = optim.SGD(model.parameters())
for epoch in range(100):
   for batch_idx, (data, target) in enumerate(train_loader):
      images = images.cuda(non_blocking=True)
      target = target.cuda(non_blocking=True)
      ...
      output = model(images)
      loss = criterion(output, target)
      ...
      optimizer.zero_grad()
      loss.backward()
      optimizer.step()
在使用时,使用 python 执行即可:
python main.py

在 pytorch 1.0 之后,官方终于对分布式的常用方法进行了封装,支持 all-reduce,broadcast,send 和 receive 等等。通过 MPI 实现 CPU 通信,通过 NCCL 实现 GPU 通信。官方也曾经提到用 DistributedDataParallel 解决 DataParallel 速度慢,GPU 负载不均衡的问题,目前已经很成熟。
模型加载,数据加载,模型训练,模型验证,模型保存等模块,此时我们要将该套代码改为分布式多卡并行训练
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1, 2, 3"
random.seed(rand_seed)
torch.manual_seed(rand_seed)
不同显卡各自需要一个进行进行控制,每个进程由不同
local_rank区分,单个显卡对应着携带某个local_rank的Worker函数
_train)数据加载,模型加载,前向推理,梯度回传,损失汇总,模型保存 的工作Worker 后需要用多进程的方式启动nprocs 参数中填入进程数量,一般为需要使用的显卡数量import torch.multiprocessing as mp
mp.spawn(self._train, nprocs=device_num, args=(arg1, arg2, ...))
0 - (device_num-1)范围内的整数依次作为第一个参数分配给一个进程执行 self._train 函数__name__ == '__main__' 下:if __name__ == '__main__':
    main()
import torch.distributed as dist
dist.init_process_group(backend='nccl',
    init_method='tcp://127.0.0.1:' + self.process_port,
    world_size=self.device_num,
    rank=rank)
分布式训练需要将模型转换为分布式模型
modeltorch.cuda.set_device(rank)
model.cuda(rank)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank], find_unused_parameters=True)
分布式训练需要分布式数据
Dataset 对象 datasetsampler = torch.utils.data.distributed.DistributedSampler(dataset)
Dataloader 初始化参数中,此时 shuffle 参数需要设置为 Falsedataloader = torch.utils.data.DataLoader(dataset, batch_size=self.batch_size, shuffle=False, sampler=sampler, **kwargs)
def reduce_mean(tensor, nprocs):
    rt = tensor.clone()
    dist.all_reduce(rt, op=dist.ReduceOp.SUM)
    rt /= nprocs
    return rt
torch.distributed.barrier()  # 在所有进程运行到这一步之前,先完成此前代码的进程会等待其他进程。这使得我们能够得到准确、有序的输出。
reduced_loss = reduce_mean(loss, self.device_num)
保存模型其实可以正常保存,不过如果不加处理会有额外开销
if local_rank == 0:
	save_checkpoint ...
torch.save(m.module.state_dict(), path)
关于 nn.DataParallel (以下简称DP) 和 DistributedDataParallel (以下简称DDP) 的区别
rank=0 的进程,将其 broadcast 到所有进程后,各进程用该梯度来独立的更新参数而 DP是梯度汇总到GPU0,反向传播更新参数,再广播参数给其他剩余的GPU。由于DDP各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致。而在DP中,全程维护一个 optimizer,对各个GPU上梯度进行求平均,而在主卡进行参数更新,之后再将模型参数 broadcast 到其他GPU.相较于DP, DDP传输的数据量更少,因此速度更快,效率更高。官方建议使用 DDP,无论是从效率还是结果来看都要稳定一些
RuntimeError: Expected to have finished reduction in the prior iteration before starting a new one. This error indicates that your module has parameters that were not used in producing loss. You can enable unused parameter detection by (1) passing the keyword argument `find_unused_parameters=True` to `torch.nn.parallel.DistributedDataParallel`; (2) making sure all `forward` function outputs participate in calculating loss. If you already have done the above two steps, then the distributed data parallel module wasn't able to locate the output tensors in the return value of your module's `forward` function. Please include the loss function and the structure of the return value of `forward` of your module when reporting this issue (e.g. list, dict, iterable). find_unused_parameters=True
例如:student_model = torch.nn.parallel.DistributedDataParallel(student_model, device_ids=[rank], find_unused_parameters=True)
dp_model.module 中,因此我们保存时可以仅保存这一部分- 仅保存 module 的部分torch.save(m.module.state_dict(), path)
- 加载时仍使用 DPm=nn.DataParallel(Resnet18())
m.load_state_dict(torch.load(path))
m=m.module
raise _env_error("MASTER_ADDR")
ValueError: Error initializing torch.distributed using env:// rendezvous: environment variable MASTER_ADDR expected, but not set
export MASTER_ADDR=localhost
export MASTER_PORT=5678
dist.init_process_group(backend='nccl',
    init_method='tcp://127.0.0.1:' + self.process_port,
    world_size=self.device_num,
    rank=rank)