前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >PyTorch提速四倍!提高DALI利用率,创建基于CPU的Pipeline

PyTorch提速四倍!提高DALI利用率,创建基于CPU的Pipeline

作者头像
大数据文摘
发布2020-02-21 12:22:29
1.2K0
发布2020-02-21 12:22:29
举报
文章被收录于专栏:大数据文摘大数据文摘

大数据文摘出品

来源:medium

编译:赵吉克

在过去的几年里,深度学习硬件方面取得了巨大的进步,Nvidia的最新产品Tesla V100和Geforce RTX系列包含专用的张量核,用于加速神经网络中常用的操作。

特别值得一提的是,V100有足够的能力以每秒数千张图的速度训练神经网络,这使得基于ImageNet数据集小模型在单GPU上训练只需几小时,与2012年在ImageNet上训练AlexNet模型所花费的5天时间相比简直是天壤之别!

然而,强大的GPU使数据预处理管道不堪重负。为了解决这个问题,Tensorflow发布了一个新的数据加载器:tf.data.Dataset,用C++编写,并使用基于图的方法将多个预处理操作链接在一起。

另一方面,PyTorch使用在PIL库上用Python编写的数据加载器,既方便优灵活,但在速度上有欠缺(尽管PIL-SIMD库确实稍微改善了这种情况)。

进入NVIDIA数据加载器(DALI):旨在消除数据预处理瓶颈,允许训练和推理全速运行。DALI主要用于在GPU上的预处理,但是大多数操作也在CPU上有快速实现。本文主要关注PyTorch,但是DALI也支持Tensorflow、MXNet和TensorRT,尤其是TensorRT有高度支持。它允许训练和推理步骤使用完全相同的预处理代码。需注意,不同的框架(如Tensorflow和PyTorch)通常在数据加载器之间有很小的差异,这可能会影响准确性。

本文是Medium上一位博主展示了一些技术来提高DALI的使用率并创建了一个完全基于CPU的管道。这些技术用于保持长期的内存稳定,并且与DALI包提供的CPU和GPU管道相比,可以增加50%的批处理大小。

DALI长期内存使用

第一个问题是,RAM的使用随着训练时间的增加而增加,这会导致OOM错误(即使是在拥有78GB RAM的VM上),并且尚未修正。

唯一解决方案是重新import DALI并每次重构训练和验证通道:

代码语言:javascript
复制
del self.train_loader, self.val_loader, self.train_pipe, self.val_pipe
torch.cuda.synchronize()
torch.cuda.empty_cache()
gc.collect()
importlib.reload(dali)
from dali import HybridTrainPipe, HybridValPipe, DaliIteratorCPU, DaliIteratorGPU
<rebuild DALI pipeline>

请注意,使用这种方法,DALI仍然需要大量RAM才能获得最好的结果。考虑到如今RAM的价格,这并不是什么大问题。从下表可以看出,DALI的最大批大小可能比TorchVision低50%:

接下来的部分涉及降低GPU占用率的方法。

构建一个完全基于CPU的Pipeline

让我们首先看看示例CPU管道。当不考虑峰值吞吐量时,基于CPU的管道非常有用。CPU训练管道只在CPU上执行解码和调整大小的操作,而CropMirrorNormalize操作则在GPU上运行。由于仅仅是传输输出到GPU与DALI就使用了大量的GPU内存,为了避免这种情况,我们修改了示例CPU管道,使其完全运行在CPU上:

代码语言:javascript
复制
class HybridTrainPipe(Pipeline):
   def __init__(self, batch_size, num_threads, device_id, data_dir, crop,
                mean, std, local_rank=0, world_size=1, dali_cpu=False, shuffle=True, fp16=False,
                min_crop_size=0.08):

       # As we're recreating the Pipeline at every epoch, the seed must be -1 (random seed)
       super(HybridTrainPipe, self).__init__(batch_size, num_threads, device_id, seed=-1)

       # Enabling read_ahead slowed down processing ~40%
       self.input = ops.FileReader(file_root=data_dir, shard_id=local_rank, num_shards=world_size,
                                   random_shuffle=shuffle)

       # Let user decide which pipeline works best with the chosen model
       if dali_cpu:
           decode_device = "cpu"
           self.dali_device = "cpu"
           self.flip = ops.Flip(device=self.dali_device)
       else:
           decode_device = "mixed"
           self.dali_device = "gpu"

           output_dtype = types.FLOAT
           if self.dali_device == "gpu" and fp16:
               output_dtype = types.FLOAT16

           self.cmn = ops.CropMirrorNormalize(device="gpu",
                                              output_dtype=output_dtype,
                                              output_layout=types.NCHW,
                                              crop=(crop, crop),
                                              image_type=types.RGB,
                                              mean=mean,
                                              std=std,)

       # To be able to handle all images from full-sized ImageNet, this padding sets the size of the internal nvJPEG buffers without additional reallocations
       device_memory_padding = 211025920 if decode_device == 'mixed' else 0
       host_memory_padding = 140544512 if decode_device == 'mixed' else 0
       self.decode = ops.ImageDecoderRandomCrop(device=decode_device, output_type=types.RGB,
                                                device_memory_padding=device_memory_padding,
                                                host_memory_padding=host_memory_padding,
                                                random_aspect_ratio=[0.8, 1.25],
                                                random_area=[min_crop_size, 1.0],
                                                num_attempts=100)

       # Resize as desired.  To match torchvision data loader, use triangular interpolation.
       self.res = ops.Resize(device=self.dali_device, resize_x=crop, resize_y=crop,
                             interp_type=types.INTERP_TRIANGULAR)

       self.coin = ops.CoinFlip(probability=0.5)
       print('DALI "{0}" variant'.format(self.dali_device))

   def define_graph(self):
       rng = self.coin()
       self.jpegs, self.labels = self.input(name="Reader")

       # Combined decode & random crop
       images = self.decode(self.jpegs)

       # Resize as desired
       images = self.res(images)

       if self.dali_device == "gpu":
           output = self.cmn(images, mirror=rng)
       else:
           # CPU backend uses torch to apply mean & std
           output = self.flip(images, horizontal=rng)

       self.labels = self.labels.gpu()
       return [output, self.labels]

DALI管道现在在CPU上输出一个8位张量。我们需要使用PyTorch来完成CPU-> GPU的传输、浮点数的转换和归一化。这最后两个操作是在GPU上完成的,快速并且减少了CPU -> GPU内存带宽需求。在转移到GPU之前,尝试过固定这个张量,但是没有得到任何性能提升,把它和一个预存器放在一起:

代码语言:javascript
复制
def _preproc_worker(dali_iterator, cuda_stream, fp16, mean, std, output_queue, proc_next_input, done_event, pin_memory):
   """
   Worker function to parse DALI output & apply final preprocessing steps
   """

   while not done_event.is_set():
       # Wait until main thread signals to proc_next_input -- normally class="ql-long-20347411" style="line-height: 1.7;margin-bottom: 0pt;margin-top: 0pt;font-size: 11pt;color: #494949;">        proc_next_input.wait()
       proc_next_input.clear()

       if done_event.is_set():
           print('Shutting down preproc thread')
           break

       try:
           data = next(dali_iterator)

           # Decode the data output
           input_orig = data[0]['data']
           target = data[0]['label'].squeeze().long()  # DALI should already output target style="line-height: 1.7;margin-bottom: 0pt;margin-top: 0pt;font-size: 11pt;color: #494949;"> 
           # Copy to GPU and apply final processing in separate CUDA stream
           with torch.cuda.stream(cuda_stream):
               input = input_orig
               if pin_memory:
                   input = input.pin_memory()
                   del input_orig  # Save memory
               input = input.cuda(non_blocking=True)

               input = input.permute(0, 3, 1, 2)

               # Input tensor is kept as 8-bit integer for transfer to GPU, to save bandwidth
               if fp16:
                   input = input.half()
               else:
                   input = input.float()

               input = input.sub_(mean).div_(std)

           # Put the result class="ql-long-20347411" style="line-height: 1.7;margin-bottom: 0pt;margin-top: 0pt;font-size: 11pt;color: #494949;">            output_queue.put((input, target))

       except StopIteration:
           print('Resetting DALI loader')
           dali_iterator.reset()
           output_queue.put(None)


class DaliIteratorCPU(DaliIterator):
   """
   Wrapper class to decode the DALI iterator output & provide iterator that functions in the same way as TorchVision.
   Note that permutation to channels first, converting from 8-bit integer to float & normalization are all performed style="line-height: 1.7;margin-bottom: 0pt;margin-top: 0pt;font-size: 11pt;color: #494949;"> 
   pipelines (Pipeline): DALI pipelines
   size (int): Number of examples in set
   fp16 (bool): Use fp16 as output format, f32 otherwise
   mean (tuple): Image mean value for each channel
   std (tuple): Image standard deviation value for each channel
   pin_memory (bool): Transfer input tensor to pinned memory, before moving to GPU
   """
   def __init__(self, fp16=False, mean=(0., 0., 0.), std=(1., 1., 1.), pin_memory=True, **kwargs):
       super().__init__(**kwargs)
       print('Using DALI CPU iterator')
       self.stream = torch.cuda.Stream()

       self.fp16 = fp16
       self.mean = torch.tensor(mean).cuda().view(1, 3, 1, 1)
       self.std = torch.tensor(std).cuda().view(1, 3, 1, 1)
       self.pin_memory = pin_memory

       if self.fp16:
           self.mean = self.mean.half()
           self.std = self.std.half()

       self.proc_next_input = Event()
       self.done_event = Event()
       self.output_queue = queue.Queue(maxsize=5)
       self.preproc_thread = threading.Thread(
           target=_preproc_worker,
           kwargs={'dali_iterator': self._dali_iterator, 'cuda_stream': self.stream, 'fp16': self.fp16, 'mean': self.mean, 'std': self.std, 'proc_next_input': self.proc_next_input, 'done_event': self.done_event, 'output_queue': self.output_queue, 'pin_memory': self.pin_memory})
       self.preproc_thread.daemon = True
       self.preproc_thread.start()

       self.proc_next_input.set()

   def __next__(self):
       torch.cuda.current_stream().wait_stream(self.stream)
       data = self.output_queue.get()
       self.proc_next_input.set()
       if data is None:
           raise StopIteration
       return data

   def __del__(self):
       self.done_event.set()
       self.proc_next_input.set()
       torch.cuda.current_stream().wait_stream(self.stream)
       self.preproc_thread.join()

基于GPU的Pipeline

测试中,在类似最大批处理大小下,上述CPU管道的速度大约是TorchVision数据加载器的两倍。CPU管道可以很好地与像ResNet50这样的大型模型一起工作;然而,当使用像AlexNet或ResNet18这样的小模型时,CPU更好。GPU管道的问题是最大批处理大小减少了近50%,限制了吞吐量。

一种显著减少GPU内存使用的方法是将验证管道与GPU隔离直到最后再调用。这很容易做到,因为我们已经重新导入DALI,并在每个epoch中重新创建数据加载器。

更多小提示

在验证时,将数据集均分的批处理大小效果最好,这避免了在验证数据集结束时还需要进行不完整的批处理。

与Tensorflow和PyTorch数据加载器类似,TorchVision和DALI管道不会产生相同的输出—您将看到验证精度略有不同。我发现这是由于不同的JPEG图像解码器。另一方面,DALI支持TensorRT,允许使用完全相同的预处理来进行训练和推理。

对于峰值吞吐量,尝试将数据加载器的数量设置为number_of_virtual_CPU核心,2个虚拟核对应1个物理核。

如果你想要绝对最好的性能并且不需要有类似TorchVision的输出,尝试关闭DALI三角形插值。

不要忘记磁盘IO。确保您有足够的内存来缓存数据集和/或一个非常快的SSD。DALI读取高达400Mb/s !

合并

为了方便地集成这些修改,我创建了一个data loader类,其中包含这里描述的所有修改,包括DALI和TorchVision后端。使用很简单。实例化数据加载程序:

代码语言:javascript
复制
dataset = Dataset(data_dir,
                 batch_size,
                 val_batch_size
                 workers,
                 use_dali,
                 dali_cpu,
                 fp16)

然后得到训练和验证数据装载器:

代码语言:javascript
复制
train_loader = dataset.get_train_loader()
val_loader = dataset.get_val_loader()

在每个训练周期结束时重置数据加载器:

代码语言:javascript
复制
dataset.reset()

或者,验证管道可以在模型验证之前在GPU上重新创建:

代码语言:javascript
复制
dataset.prep_for_val()

基准

以下是使用ResNet18的最大批量大小:

因此,通过应用这些修改,DALI可以在CPU和GPU模式下使用的最大批处理大小增加了约50%!

这里是一些使用Shufflenet V2 0.5和批量大小512的吞吐量图:

这里是一些使用DALI GPU管道训练各种网络,包括在TorchVision:

所有测试都在谷歌Cloud V100实例上运行,该实例有12个vcpu(6个物理核心),78GB RAM,并使用Apex FP16培训。要重现这些结果,请使用以下参数:

代码语言:javascript
复制
— fp16 — batch-size 512 — workers 10 — arch “shufflenet_v2_x0_5 or resnet18” — prof — use-dali

所以,DALI使得单核特斯拉V100可以达到接近4000张/秒的图像处理速度!这达到了Nvidia DGX-1的一半多一点(它有8个V100 gpu),尽管我们使用了小模型。对我来说,能够在几个小时内在一个GPU上运行ImageNet是生产力进步。

本文中提供的代码如下:

https://github.com/yaysummeriscoming/DALI_pytorch_demo

相关报道:

https://towardsdatascience.com/nvidia-dali-speeding-up-pytorch-876c80182440

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-02-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据文摘 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档