“他山之石,可以攻玉”,站在巨人的肩膀才能看得更高,走得更远。在科研的道路上,更需借助东风才能更快前行。为此,我们特别搜集整理了一些实用的代码链接,数据集,软件,编程技巧等,开辟“他山之石”专栏,助你乘风破浪,一路奋勇向前,敬请关注。
作者:知乎—LittleWhite
地址:https://www.zhihu.com/people/liu-zhao-41-67
本文对使用pytorch进行分布式训练(单机多卡)的过程进行了详细的介绍,附加实际代码,希望可以给正在看的你提供帮助。本文分三个部分展开,分别是:
若想学习分布式的部署,看完本文就足够了,但为了读者能了解更多细节,我在第一部分的每个模块都加了对应的官方文档的链接。
同时,我正在进行PyTorch官方文档的翻译工作,除了对其进行便于理解的翻译,还添加了我的解释。项目地址:https://github.com/liuzhaoo/Pytorch-API-and-Tutorials-CN,欢迎各位下载使用!
01
先验知识
分布式训练涉及到pytorch的很多API,这里对它们进行简单的介绍,其中重点为第三节DataLoader。若想直接看到使用方法,请看第二部分。
此两种方法都可以实现多GPU并行训练,但是后者更快,同时需要写更多代码,而DataParallel只需一行代码就可以搞定。尽管如此,还是建议使用DistributedDataParallel,建议参考官方介绍。
如下,只需在将model加载到device(model.to(device))之后,加上以下代码即可
net = torch.nn.DataParallel(model, device_ids=[0, 1, 2])
DDP为基于torch.distributed的分布式数据并行结构,工作机制为:在batch维度上对数据进行分组,将输入的数据分配到指定的设备(GPU)上,从而将程序的模型并行化。对应的,每个GPU上会复制一个模型的副本,负责处理分配到的数据,在后向传播过程中再对每个设备上的梯度进行平均。
在这里贴上官方文档,供读者进行更详细的了解:DDP
以下是使用方法:
在每个有N个GPU 的主机上,都应该创建N个进程。同时确保每个进程分别在从0到N-1的单独的GPU上工作。因此,应该分别指定工作的GPU:
>>> torch.cuda.set_device(i) # i为0 - N-1
在每个进程中,参考以下内容来构建模块
>>> from torch.nn.parallel import DistributedDataParallel
>>> torch.distributed.init_process_group(backend='nccl', world_size=4, init_method='...')
>>> model = DistributedDataParallel(model, device_ids=[i], output_device=i)
为了在每个节点上产生多个进程,可以使用torch.distributed.launch或torch.multiprocessing.spawn
torch.distributed.init_process_group(backend, init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None, group_name='')
torch.distributed包为在一台或多台机器上运行的多个计算节点上的多进程并行结构提供PyTorch支持和通信原语。torch.nn.parallel.DistributedDataParallel()类就是基于此功能构建的,作为任何PyTorch模型的包装来提供同步分布式训练。这不同于 Multiprocessing package - torch.multiprocessing 和 torch.nn.DataParallel() 提供的并行结构,因为它支持多台联网的机器而且用户必须显式地为每个进程启动主要训练脚本的副本。
以上叙述来自pytorch官方文档,点击链接可以查看详细内容。此教程中只涉及到此包的初始化,因此不对其他内容再做介绍。
torch.distributed初始化
目前支持三种初始化方式:TCP初始化,共享文件初始化以及环境变量初始化。
一般使用TCP初始化,使用GPU时backend一般设置为'nccl':
import torch.distributed as dist
# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:23456', rank=args.rank, world_size=4)
torch.utils.data.DataLoader类是PyTorch数据加载功能的核心,此类中的很多参数都是数据并行时所需要的,本节将对它进行详细的介绍。
DataLoader(dataset, batch_size=1, shuffle=False, sampler=None,
batch_sampler=None, num_workers=0, collate_fn=None,
pin_memory=False, drop_last=False, timeout=0,
worker_init_fn=None)
我们使用的大部分数据集都是map-style类型的数据集
注意:在分布式模式下,在每个epoch开始之前应该调用 sampler.set_eopch(i)方法。
用户可以使用自定义的collate_fn来实现自定义批处理,例如沿第一个维度以外的维度排序、各种长度的填充序列或添加对自定义数据类型的支持。 当batch_size和batch_sampler都为None (batch_sampler的默认值已经为None)时,为非自动成批模式。此时使用作为collate_fn参数传递的函数来处理从数据集获得的每个示例。这时,这个函数只是将Numpy数组转换维PyTorch的Tensor,其他保持不变。
以上就是在部署分布式训练需要了解的知识,更多细节参见官方文档。下面的配置流程为本教程的核心部分。
02
使用过程框架
在DDP分布式训练中,关键是要在不同的进程中使用GPU进行数据处理,因此首先应该分配进程。假设只有一个机器,两块GPU。总数据量(视频或图片数量)为8000。batchsize设置为16。
准备工作:使用pytorch的spawn生成两个进程(对应GPU数量),分别使用1个GPU进行任务。在每个进程中都执行以下操作。
03
代码解析
这部分将对应第二部分,给出每一步的代码以及详细的解释或明,但是作为分布式教程,下文主要针对与分布式相关的代码,而其他部分,如优化策略,学习率改变方法等不进行详细介绍。
本实验(图像分类)是在双显卡环境下进行的,在四块显卡的服务器上指定了0,3号显卡:os.environ['CUDA_VISIBLE_DEVICES'] = '0,3'
首先分配进程
import torch.multiprocessing as mp
opt.world_size = opt.ngpus_per_node * opt.world_size
mp.spawn(main_worker, nprocs=opt.ngpus_per_node, args=(opt,))
代码说明:opt为整个程序用到的参数,batch_size,num_classes等参数都已指定,在下文中,每个参数出现时都会进行说明。这里的opt.world_size为总节点数(机器),由于本教程针对单机多卡,因此设置为1。opt.ngpus_per_node 是每个节点的GPU数,设置为2,因此经过运算opt.world_size为2。mp.spawn产生了两个进程,每个进程都运行 main_worker函数( main_worker是训练的主函数,包括模型、数据的加载,以及训练,以下所有内容都是在main_worker函数中的)
def main_worker(index, opt):
random.seed(opt.manual_seed)
np.random.seed(opt.manual_seed)
torch.manual_seed(opt.manual_seed)
if index >= 0 and opt.device.type == 'cuda':
opt.device = torch.device(f'cuda:{index}')
opt.dist_rank = opt.dist_rank * opt.ngpus_per_node + index
dist.init_process_group(backend='nccl',
init_method=opt.dist_url,
world_size=opt.world_size,
rank=opt.dist_rank)
opt.batch_size = int(opt.batch_size / opt.ngpus_per_node)
opt.n_threads = int((opt.n_threads + opt.ngpus_per_node - 1) / opt.ngpus_per_node)
opt.is_master_node = not opt.distributed or opt.dist_rank == 0
model = generate_model(opt)
if opt.batchnorm_sync:
assert opt.distributed, 'SyncBatchNorm only supports DistributedDataParallel.'
model = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model)
model = make_data_parallel(model, opt.distributed, opt.device)
parameters = model.parameters()
criterion = CrossEntropyLoss().to(opt.device)
(train_loader, train_sampler, train_logger, train_batch_logger,
optimizer, scheduler) = get_train_utils(opt, parameters)
for i in range(opt.begin_epoch, opt.n_epochs + 1):
if not opt.no_train:
if opt.distributed:
train_sampler.set_epoch(i)
current_lr = get_lr(optimizer)
train_epoch(i, train_loader, model, criterion, optimizer,
opt.device, current_lr, train_logger,
train_batch_logger, tb_writer, opt.distributed)
if i % opt.checkpoint == 0 and opt.is_master_node:
save_file_path = opt.result_path / 'save_{}.pth'.format(i)
save_checkpoint(save_file_path, i, opt.arch, model, optimizer,
scheduler)
scheduler.step()
def main_worker(index, opt):
random.seed(opt.manual_seed)
np.random.seed(opt.manual_seed)
torch.manual_seed(opt.manual_seed)
if index >= 0 and opt.device.type == 'cuda':
opt.device = torch.device(f'cuda:{index}')
opt.dist_rank = opt.dist_rank * opt.ngpus_per_node + index
dist.init_process_group(backend='nccl',
init_method=opt.dist_url,
world_size=opt.world_size,
rank=opt.dist_rank)
opt.batch_size = int(opt.batch_size / opt.ngpus_per_node)
opt.n_threads = int((opt.n_threads + opt.ngpus_per_node - 1) / opt.ngpus_per_node)
opt.is_master_node = opt.dist_rank == 0
代码说明: 在每个进程中,都会分配一个index,由于我们有两个进程,所以在两个进程中的index 分别为0,1。同样的,opt为传入的参数,前三行代码为指定用到的随机seed。然后根据index 分别指定每个进程的device:cuda:0 和cuda:1(对应实际的0号和3号GPU)。接着指定opt.dist_rank,它将作为初始化时的rank参数,opt.dist_rank原始值为0,因此经过运算,在两个进程中的值分别为0,1。
下面就是本步的核心,初始化torch.distributed在它的参数里,在每个进程中init_method和world_size都是一样的,rank用来标识各自的进程,同样的,分别为0,1。
因为分了两个进程,所以对原始指定的batch_size,n_threads(DataLoader中的num_workers)除以进程数2。
model = generate_model(opt)
此部分没什么好说的,从其他函数或类中获取模型。
但是注意到在它之后还有一段代码,是用来操作batch_norm的,这里不做过多解释,感兴趣可以查看原文档。
model = make_data_parallel(model, opt.device)
def make_data_parallel(model, device):
if device.type == 'cuda' and device.index is not None:
local_rank = torch.distributed.get_rank()
torch.cuda.set_device(local_rank)
model.to(device)
model = nn.parallel.DistributedDataParallel(model,device_ids=[device])
代码说明: 在两个进程中分别对模型进行并行化,local_rank是获得每个进程的rank,分别为0,1。device在第一步中已经定义过。
三行代码分别对应三个步骤。
train_data = get_training_data(**kwargs)
代码说明:根据参数获取原始数据
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_data)
train_loader = torch.utils.data.DataLoader(train_data,
batch_size=opt.batch_size,
shuffle=(train_sampler is None),
num_workers=opt.n_threads,
pin_memory=True,
sampler=train_sampler,
worker_init_fn=worker_init_fn)
for i in range(opt.begin_epoch, opt.n_epochs + 1):
train_sampler.set_epoch(i)
current_lr = get_lr(optimizer)
train_epoch(i, train_loader, model, criterion, optimizer,
opt.device, current_lr, train_logger,
train_batch_logger, tb_writer, opt.distributed)
本文目的在于学术交流,并不代表本公众号赞同其观点或对其内容真实性负责,版权归原作者所有,如有侵权请告知删除。