首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >batch-compute & GPU分布式机器学习

batch-compute & GPU分布式机器学习

原创
作者头像
用户7480322
修改2020-07-20 10:33:38
1.1K0
修改2020-07-20 10:33:38
举报
文章被收录于专栏:个人专栏个人专栏

当用户提交一些机器学习任务时,往往需要大规模的计算资源,但是对于响应时间并没有严格的要求。在这种场景下,首先使用腾讯云的batch-compute(批量计算)产品来自动化提交用户的任务,然后使用分布式+gpu的方式解决算力问题,在任务完成后通知用户,是一个可行的解决方案。

本文将分成2部分:首先通过一个demo介绍上述过程的实现,从仅使用gpu、不考虑并行的简单情况开始,扩展至并行+gpu的情况,并简要介绍batch-compute的使用方法;然后介绍一些技术的实现原理(部分资料来源于知乎和博客,仅供参考)。

一个简单的Demo

使用pytorch,利用torch.Tensor对cuda的支持进行数据和模型的迁移。先不考虑并行,仅考虑如何将传统的基于cpu的机器学习任务迁移到gpu上。

  1. 定义一个简单的模型ConvNet:
class ConvNet(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=5, stride=1,             padding=2),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        self.fc = nn.Linear(7*7*32, num_classes)

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

2. 进行基于gpu的训练

def train(gpu, args):
    torch.manual_seed(0)
    model = ConvNet()
    torch.cuda.set_device(gpu)  # set default gpu
    model.cuda(gpu) # move model to gpu
    batch_size = 100
    criterion = nn.CrossEntropyLoss().cuda(gpu)     # move loss function to gpu
    optimizer = torch.optim.SGD(model.parameters(), 1e-4)
    # Data loading code
    train_dataset = torchvision.datasets.MNIST(root='./data',
                                               train=True,
                                      transform=transforms.ToTensor(),
                                               download=True)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                               batch_size=batch_size,
                                               shuffle=True,
                                               num_workers=0,
                                               pin_memory=True)

    start = datetime.now()
    total_step = len(train_loader)
    for epoch in range(args.epochs):
        for i, (images, labels) in enumerate(train_loader):
            images = images.cuda(non_blocking=True)
            labels = labels.cuda(non_blocking=True)
            outputs = model(images)
            loss = criterion(outputs, labels)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

上面代码中的train函数接收一个gpu的编号gpu作为参数,并且在第4行用其指定torch默认使用的gpu。

在第5行,将模型迁移到gpu上。cuda()函数会返回将调用该函数的对象拷贝一份到cuda memory中并返回该拷贝。如果该对象已经存在cuda memory或是正确的gpu中,则直接返回原对象。

在第7行,将损失函数迁移到gpu上(如果不明白为什么函数也要迁移,可以查看github上这个issue)。

这样,机器学习任务就迁移到了gpu上。

然后来考虑并行。这里假设有多个节点,每个节点上有多个gpu,每个进程使用一块gpu。pytorch提供了分布式训练的包torch.distributed,并且支持跨节点训练。

  1. 在脚本中设置master节点的ip和port
import torch.multiprocessing as mp
def main():
    ...
    args.world_size = args.gpus * args.nodes
    os.environ['MASTER_ADDR'] = 'xxx.xxx.xxx.xxx'
    os.environ['MASTER_PORT'] = '8888'
    mp.spawn(train, nprocs=args.gpus, args=(args,))

第5,6行通过环境变量的方式设置了master的ip和端口,之后master将在该端口监听worker的连接请求并完成初始化、广播等操作。

第7行通过spawn函数在本地启动了数量等于gpu数的进程,并且每个进程中运行相同的函数train。如果一个进程异常退出,那么其他进程也会被终止。

2. 初始化本地进程,并等待其他进程初始化完毕

import torch.distributed as dist
def train(gpu, args):
    rank = args.nr * args.gpus + gpu
    print('starting making group.......')
    dist.init_process_group(
        backend='nccl', init_method='env://', world_size=args.world_size, rank=rank)
    print('all processes have been started!')
    torch.manual_seed(0)
    ...

第5行的init_process_group是一个阻塞函数,在所有进程启动完毕且socket连接建立成功后返回。这里使用了nccl作为后端(也就是通信架构),可以参考pytorch官方给出的最佳指南;init_method参数表示通过环境变量发现master;rank表示当前进程在进程组中的优先级,rank=0的进程是master进程;world_size表示进程组中总共有多少进程。

2. 模型梯度同步

model = nn.parallel.DistributedDataParallel(model, device_ids=[gpu])

参与训练的数据集被分成多份,每个进程取一份input输入神经网络,独立计算梯度,然后将各个进程的梯度求平均值,用平均值更新模型参数。

3. 将数据划分到各个gpu上

train_sampler = torch.utils.data.distributed.DistributedSampler(
    train_dataset,
    num_replicas=args.world_size,
    rank=rank)

DistributedSampler将输入按照batch_size划分到不同的gpu上,使得每个进程能读到不同的batch,且不同进程间不会读到重复的batch。

这样,机器学习任务就可以在不同节点的多个gpu上并行地执行,不同的进程只需指定不同的rank即可。

最后将任务通过batch-compute实现自动化的任务提交和执行。

首先介绍batch-compute的概念。现代云计算有多种形式,其中常见的2种是流式计算(stream computing)和批量计算(batch computing)。流式计算处理对实时性要求高的请求,具有低延迟、持续性等特征,一般用于实时推荐、监控等服务;批量计算处理对实时性要求低但需要大量计算资源的请求,往往是耗时较长的一次性作业。机器学习任务就是一种很典型的批量计算。

利用腾讯云的batch-compute(批量计算)产品,开发者需要提供计算执行的环境、命令和输入输出存放的位置,由该产品自动去根据负载获取腾讯云的弹性资源,并自动调度作业执行流程,将企业和科研机构的双手从架设和配置数据中心中解放出来。

本文中使用batch-compute的python SDK,分为2步:先创建计算环境,然后提交计算作业。

  1. 创建计算环境
req = batch_models.CreateComputeEnvRequest()
params = '{\"ComputeEnv\":{\"EnvName\":\"batch-concurrent-test\",\"EnvData\":{\"InstanceType\":\"GN10X.2XLARGE40\",\"ImageId\":\"img-xxxxxx\",\"SystemDisk\":{\"DiskSize\":120},\"DesiredComputeNodeCount\":2},\"Placement\":{\"Zone\":\"ap-guangzhou-3\"}}'
print(params)
req.from_json_string(params)
resp = self.batch_client.CreateComputeEnv(req)
self.computeEnvId = json.loads(resp.to_json_string())["EnvId"]

第2行指定了创建2个节点,使用带gpu的机型GN10X.2XLARGE40;通过ImageId指定cvm的镜像,在这个镜像中部署了anaconda,pytorch,nvidia driver,cuda等。

如果需要获取创建节点的ip地址,可以通过第6行获取计算环境的id查看环境的详细信息。

2. 提交计算作业

commands = [
    'sudo service docker restart',
    'sudo service docker status',
    'set -x',
    'docker run -t --network host --gpus all <image-name> bash concurrent/task.sh',
]
params = '{\"Placement\":{\"Zone\":\"ap-guangzhou-3\"},\"Job\":{\"JobName\":\"test-job\",\"Tasks\":[{\"TaskName\":\"concurrent-task\",\"InputMappings\":[{\"SourcePath\":\"%s\",\"DestinationPath\":\"%s\"}],\"TaskInstanceNum\":2,\"Application\":{\"Command\":\"%s\"},\"EnvId\":\"%s\",\"RedirectInfo\":{\"StdoutRedirectPath\":\"%s\",\"StderrRedirectPath\":\"%s\"}}]}}' % (self.inPath, self.destPath, " && ".join(commands), self.computeEnvId, self.outPath, self.errPath)
req.from_json_string(params)
resp = self.batch_client.SubmitJob(req)

在第5行启动了一个docker容器并使用容器内装好的cuda。此处将网络设置为host模式使得可以在容器内通过host ip直接访问另一个节点上的容器;设置-t参数使得运行结果与在终端通过命令行手动执行的输出保持一致;但是不能设置-i参数,因为输入设备并不是一个真正的tty;设置cmd参数使得容器启动后执行task.sh脚本:

[[ $(hostname -I | cut -d ' ' -f 1) == "xxx.xxx.xxx.xxx" ]];
python3 concurrent/mnist-distributed.py -n 2 -nr $?

第1行判断当前节点的ip是否为master节点的ip;第二行运行执行机器学习任务的python脚本,并传入rank参数,如果是master节点则传入0,否则,传入1

3. 运行结果

为了直观地演示并行机器学习的输出结果,笔者在两台cvm上手动执行了脚本:

如图,首先通过ip地址判断脚本输入参数中的rank值,并且等待所有进程启动成功。

然后开始训练,可以看见每个节点上进行了3个epoch,batch_size为300,耗时8秒左右。

为了对比使用并行前后的差距,在一个节点上启动任务。

如图,进行了3个epoch,batch_size为600,耗时为12秒左右。

至此,机器学习的任务就通过batch-compute产品提交并且在2台云服务器上并行地执行了,以下搬运一些pytorch文档/博客/知乎上关于分布式训练的原理实现。

原理

  1. DDP(DistributedDataParallel)的构造函数

每个进程都有一个模型(module)。在构造函数中,DDP首先获得该module的引用,然后将module.state_dict()从master进程广播到全体进程,使得所有进程具有相同的初始状态。state_dict的返回值是buffer等不在参数列表中但是代表了网络状态的数据,例如batch normalization中的running_mean。

不同进程间梯度的汇总、求和和同步是通过一个Reducer类实现的。在构造函数中初始化了一个Reducer对象,并通过该对象管理梯度计算。

在Reducer对象的构造函数中,首先将所有的参数装进若干个bucket(桶),之后一桶一桶地计算可以提高效率。参数进入桶的顺序和其在数组Model.parameters中的顺序相反,后向传播中最后一层的梯度是最先被计算完毕的,因此应该最先参加求和。

然后,Reducer为每个参数注册了一个autograd_hook,在该参数被计算完毕后触发。

2. 前向传播

前向传播没有涉及梯度计算,但是设计一个corner case——如果用户定义了某些参数但是没有将其加入模型之中(即神经网络中存在孤立节点),那么autograd_hook永远不会被触发。为此,DDP的构造函数中提供了find_unused_parameters,如果被设置为True,则在前向传播完毕后会找出这些节点并直接将其标记为已完成计算。当然这一操作会引入额外的开销,因此作为一个参数。

3. 后向传播

当所有节点上的同一编号的bucket中所有梯度均计算完成后,启动异步函数all_reduce求和。本地计算梯度和跨节点求平均值可以并行地进行,因为后向传播中用到的只是本地的计算结果(因为前向传播中的output就是只用local input算出来的)。

4. all_reduce实现细节

all_reduce实现了跨节点的求和计算。一种主流的实现方式是Parameter Server,即一个master节点接收其他节点发送的数值并求和,然后将结果发送给其他节点。

但是这样会引入单点故障,因此Pytorch 1.x使用了一种名为Ring AllReduce的算法(Uber的开源分布式框架Horovord也采用了这一算法)。正如其名字所表现的,所有节点排成一个环,每个节点从作邻居接收数据,在本地完成一部分求和工作,然后向右邻居发送数据。所有节点是平等的,没有master节点。

Ring AllReduce算法分成2个阶段:Share-Reduce阶段和Share-Only阶段。

在Share-Reduce阶段结束后,每个节点上会得到一部分位置的求和结果。

如图,经过3轮迭代后,WorkerA将得到a1+b1+c1+d1,WorkerB将得到a2+b2+c2+d2,WorkerC将得到a3+b3+c3+d3,WorkerD将得到a0+b0+c0+d0。

在Share-Only阶段,节点间共享这些和,使得所有节点最终拥有所有位置的求和结果。

如图,经过3轮迭代后,每个节点都会拥有全部4个位置的和。

5. Master进程有何意义?

既然使用了Ring AllReduce算法,那么在使用torch.distributed包时一定要指定的master ip&port有什么作用呢?

Master的主要作用时在初始化时为各个进程建立连接。具体而言,Master会创建一个守护线程,在这个线程中为所有worker各自创建一个socket,然后等待worker的连接,并在连上后发送其他进程所在的位置。

Worker则创建和master通信的socket,并主动连接master,在连上后获取其他进程的位置信息并报告自己的位置,然后和其他进程建立连接。

参考文献

1. https://pytorch.org/docs/stable/data.html

2. https://yangkky.github.io/2019/07/08/distributed-pytorch-tutorial.html

3. https://pytorch.org/docs/master/generated/torch.nn.parallel.DistributedDataParallel.html

4. https://pytorch.org/docs/stable/notes/ddp.html

5. https://towardsdatascience.com/visual-intuition-on-ring-allreduce-for-distributed-deep-learning-d1f34b4911da

6. https://zhuanlan.zhihu.com/p/76638962

7. https://www.zhihu.com/question/306242771/answer/825668022

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
GPU 云服务器
GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档