目录
NVIDIA Megatron 是一个基于 PyTorch 的分布式训练框架,用来训练超大Transformer语言模型,其通过综合应用了数据并行,Tensor并行和Pipeline并行来复现 GPT3,值得我们深入分析其背后机理。
本系列大概有 5 篇文章,通过论文和源码和大家一起学习研究。本文将看看 Megatron 如何处理设置并行。
本系列其他文章为:
[源码解析] 模型并行分布式训练Megatron (1) --- 论文 & 基础
[源码解析] 模型并行分布式训练Megatron (2) --- 整体架构
[源码解析] 模型并行分布式训练 Megatron (3) ---模型并行实现
前文我们对模型并行的原理和代码进行了分析,对于给定的模型,现在还需要解决几个问题:
我们接下来就仔细分析一下。
initialize_model_parallel 方法用来设置模型并行,所以我们接下来就具体分析。
因为前文_initialize_distributed之中调用了torch.distributed.init_process_group 初始化分布式环境,所以我们知道,每个进程都有自己的 gloabl rank 和 local rank,都有自己的全局变量。
主要变量如下(具体例子可以结合 initialize_model_parallel 之中的注释来看):
具体如下:
# Intra-layer model parallel group that the current rank belongs to.
_TENSOR_MODEL_PARALLEL_GROUP = None
# Inter-layer model parallel group that the current rank belongs to.
_PIPELINE_MODEL_PARALLEL_GROUP = None
# Model parallel group (both intra- and pipeline) that the current rank belongs to.
_MODEL_PARALLEL_GROUP = None
# Embedding group.
_EMBEDDING_GROUP = None
# Data parallel group that the current rank belongs to.
_DATA_PARALLEL_GROUP = None
_VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = None
_VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = None
_PIPELINE_MODEL_PARALLEL_SPLIT_RANK = None
# These values enable us to change the mpu sizes on the fly.
_MPU_TENSOR_MODEL_PARALLEL_WORLD_SIZE = None
_MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = None
_MPU_TENSOR_MODEL_PARALLEL_RANK = None
_MPU_PIPELINE_MODEL_PARALLEL_RANK = None
# A list of ranks that have a copy of the embedding.
_EMBEDDING_GLOBAL_RANKS = None
# A list of global ranks for each pipeline group to ease calculation of the source
# rank when broadcasting from the first or last pipeline stage.
_PIPELINE_GLOBAL_RANKS = None
我们首先把 initialize_model_parallel 代码摘录出来。initialize_model_parallel 作用就是对模型进行分组,然后初始化进程组相关的各种全局变量。
def initialize_model_parallel(tensor_model_parallel_size_=1,
pipeline_model_parallel_size_=1,
virtual_pipeline_model_parallel_size_=None,
pipeline_model_parallel_split_rank_=None):
"""
Initialize model data parallel groups.
Arguments:
tensor_model_parallel_size: number of GPUs used for tensor model parallelism.
pipeline_model_parallel_size: number of GPUs used for pipeline model parallelism.
virtual_pipeline_model_parallel_size: number of virtual stages (interleaved
pipeline).
pipeline_model_parallel_split_rank: for models with both encoder and decoder,
rank in pipeline with split point.
Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we
use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
the model pipeline. The present function will
create 8 tensor model-parallel groups, 4 pipeline model-parallel groups
and 8 data-parallel groups as:
8 data_parallel groups:
[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
8 tensor model-parallel groups:
[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
4 pipeline model-parallel groups:
[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
Note that for efficiency, the caller should make sure adjacent ranks
are on the same DGX box. For example if we are using 2 DGX-1 boxes
with a total of 16 GPUs, rank 0 to 7 belong to the first box and
ranks 8 to 15 belong to the second box.
"""
if torch.distributed.get_rank() == 0:
print('> initializing tensor model parallel with size {}'.format(
tensor_model_parallel_size_))
print('> initializing pipeline model parallel with size {}'.format(
pipeline_model_parallel_size_))
# Get world size and rank. Ensure some consistencies.
world_size = torch.distributed.get_world_size()
tensor_model_parallel_size = min(tensor_model_parallel_size_, world_size)
pipeline_model_parallel_size = min(pipeline_model_parallel_size_, world_size)
ensure_divisibility(world_size,
tensor_model_parallel_size * pipeline_model_parallel_size)
data_parallel_size = world_size // (tensor_model_parallel_size *
pipeline_model_parallel_size)
num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size
num_data_parallel_groups = world_size // data_parallel_size
if virtual_pipeline_model_parallel_size_ is not None:
global _VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK
global _VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE
_VIRTUAL_PIPELINE_MODEL_PARALLEL_RANK = 0
_VIRTUAL_PIPELINE_MODEL_PARALLEL_WORLD_SIZE = virtual_pipeline_model_parallel_size_
if pipeline_model_parallel_split_rank_ is not None:
global _PIPELINE_MODEL_PARALLEL_SPLIT_RANK
_PIPELINE_MODEL_PARALLEL_SPLIT_RANK = pipeline_model_parallel_split_rank_
rank = torch.distributed.get_rank()
# Build the data-parallel groups.
global _DATA_PARALLEL_GROUP
all_data_parallel_group_ranks = []
for i in range(pipeline_model_parallel_size):
start_rank = i * num_pipeline_model_parallel_groups
end_rank = (i + 1) * num_pipeline_model_parallel_groups
for j in range(tensor_model_parallel_size):
ranks = range(start_rank + j, end_rank,
tensor_model_parallel_size)
all_data_parallel_group_ranks.append(list(ranks))
group = torch.distributed.new_group(ranks)
if rank in ranks:
_DATA_PARALLEL_GROUP = group
# Build the model-parallel groups.
global _MODEL_PARALLEL_GROUP
for i in range(data_parallel_size):
ranks = [data_parallel_group_ranks[i]
for data_parallel_group_ranks in all_data_parallel_group_ranks]
group = torch.distributed.new_group(ranks)
if rank in ranks:
_MODEL_PARALLEL_GROUP = group
# Build the tensor model-parallel groups.
global _TENSOR_MODEL_PARALLEL_GROUP
for i in range(num_tensor_model_parallel_groups):
ranks = range(i * tensor_model_parallel_size,
(i + 1) * tensor_model_parallel_size)
group = torch.distributed.new_group(ranks)
if rank in ranks:
_TENSOR_MODEL_PARALLEL_GROUP = group
# Build the pipeline model-parallel groups and embedding groups
# (first and last rank in each pipeline model-parallel group).
global _PIPELINE_MODEL_PARALLEL_GROUP
global _PIPELINE_GLOBAL_RANKS
global _EMBEDDING_GROUP
global _EMBEDDING_GLOBAL_RANKS
for i in range(num_pipeline_model_parallel_groups):
ranks = range(i, world_size,
num_pipeline_model_parallel_groups)
group = torch.distributed.new_group(ranks)
if rank in ranks:
_PIPELINE_MODEL_PARALLEL_GROUP = group
_PIPELINE_GLOBAL_RANKS = ranks
# Setup embedding group (to exchange gradients between
# first and last stages).
if len(ranks) > 1:
embedding_ranks = [ranks[0], ranks[-1]]
if pipeline_model_parallel_split_rank_ is not None and \
pipeline_model_parallel_split_rank_ not in embedding_ranks:
embedding_ranks = [ranks[0],
ranks[pipeline_model_parallel_split_rank_],
ranks[-1]]
else:
embedding_ranks = ranks
group = torch.distributed.new_group(embedding_ranks)
if rank in embedding_ranks:
_EMBEDDING_GROUP = group
if rank in ranks:
_EMBEDDING_GLOBAL_RANKS = embedding_ranks
我们使用注释内容来进行学习如何切分模型,如何把多种并行模式组合在一起。
initialize_model_parallel 的注释值得我们深入学习,具体如下:
Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we
use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
the model pipeline. The present function will
create 8 tensor model-parallel groups, 4 pipeline model-parallel groups
and 8 data-parallel groups as:
8 data_parallel groups:
[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
8 tensor model-parallel groups:
[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
4 pipeline model-parallel groups:
[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
Note that for efficiency, the caller should make sure adjacent ranks
are on the same DGX box. For example if we are using 2 DGX-1 boxes
with a total of 16 GPUs, rank 0 to 7 belong to the first box and
ranks 8 to 15 belong to the second box.
从注释可以知道如下信息:
- 沿着行横向切了一刀:tensor\_model\_parallel\_size = 16 / 8 = 2,就是2个 GPUs 来进行模型张量并行。
- 沿着列纵向切了三刀:pipeline\_model\_parallel\_size = 16 /4 = 4,就是4个GPUs 进行流水线并行。
- 因此,一个模型分为8块,每一块放在一个GPU之上,就是8个GPU。而通过如下计算可以知 16 GPUs / 8 GPUs = 2 models。即,16张卡可以放置两个完整模型。因为张量模型并行组大小是2,即16个GPU被分成8组,则这8组内容是 g0, g1, g2, g3, g4, g5, g6, g7, g8, g9, g10, g11, g12, g13, g14, g15。
模型原始图如下
模型切分之后如下,一共被分成8块。其中,第一层被切分为 A,B,所以 A,B 之间就是 Tensor Model parallel。后面 C,D 之间也是 Tensor Model parallel,把两层都做了切分,依次类推。
我们的目标就是用代码来看看如何生成注释里面的各种模型组。
我们接下来看看具体切分的策略,也就是GPU分配策略。切分需要综合考虑多种情况,首先看看模型并行的通信状况。
我们接下来看看各种并行机制的对比。
最后看看结论
我们接下来做一个实验看看。
import torch
world_size = 16
tensor_model_parallel_size = 2 # 2 GPUs to parallelize the model tensor
pipeline_model_parallel_size = 4 # 4 GPUs to parallelize the model pipeline
data_parallel_size = world_size // (tensor_model_parallel_size *
pipeline_model_parallel_size) # 2
num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size # 8
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size # 4
num_data_parallel_groups = world_size // data_parallel_size # 8
# Build the data-parallel groups.
print("------ Build the data-parallel groups -----")
all_data_parallel_group_ranks = []
for i in range(pipeline_model_parallel_size):
start_rank = i * num_pipeline_model_parallel_groups
end_rank = (i + 1) * num_pipeline_model_parallel_groups
for j in range(tensor_model_parallel_size):
ranks = range(start_rank + j, end_rank,
tensor_model_parallel_size)
all_data_parallel_group_ranks.append(list(ranks))
print(all_data_parallel_group_ranks)
# Build the model-parallel groups.
print("------ Build the model-parallel groups -----")
for i in range(data_parallel_size):
ranks = [data_parallel_group_ranks[i]
for data_parallel_group_ranks in all_data_parallel_group_ranks]
print(list(ranks))
# Build the tensor model-parallel groups.
print("------ Build the tensor model-parallel groups -----")
for i in range(num_tensor_model_parallel_groups):
ranks = range(i * tensor_model_parallel_size,
(i + 1) * tensor_model_parallel_size)
print(list(ranks))
# Build the pipeline model-parallel groups and embedding groups
# (first and last rank in each pipeline model-parallel group).
print("------ Build the pipeline model-parallel groups -----")
for i in range(num_pipeline_model_parallel_groups):
ranks = range(i, world_size,
num_pipeline_model_parallel_groups)
print(list(ranks))
输出如下。需要注意,这里都是 GPU 的序列号,0,2 就是 g0, g2:
------ Build the data-parallel groups -----
[[0, 2], [1, 3], [4, 6], [5, 7], [8, 10], [9, 11], [12, 14], [13, 15]]
------ Build the model-parallel groups -----
[0, 1, 4, 5, 8, 9, 12, 13]
[2, 3, 6, 7, 10, 11, 14, 15]
------ Build the tensor model-parallel groups -----
[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]
[10, 11]
[12, 13]
[14, 15]
------ Build the pipeline model-parallel groups -----
[0, 4, 8, 12]
[1, 5, 9, 13]
[2, 6, 10, 14]
[3, 7, 11, 15]
我们对比一下注释,发现代码打印结果可以和注释对应上:
Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we
use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
the model pipeline. The present function will
create 8 tensor model-parallel groups, 4 pipeline model-parallel groups
and 8 data-parallel groups as:
8 data_parallel groups:
[g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
8 tensor model-parallel groups:
[g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
4 pipeline model-parallel groups:
[g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
我们接下来会进行具体分析。
从注释中可以看到:
Note that for efficiency, the caller should make sure adjacent ranks are on the same DGX box. For example if we are using 2 DGX-1 boxes with a total of 16 GPUs, rank 0 to 7 belong to the first box and ranks 8 to 15 belong to the second box.
意思就是:调用者需要确保相邻的rank在同一个节点上,我们例子有两个Node,其中第一个Node拥有 GPU 0 ~ 7,就是 rank 0 ~ 7,第二个Node是 GPU 8~15,就是 rank 8 ~ 15。
具体如下,这里每行4个GPU,是因为 4 GPUs to parallelize the model pipeline,所以流水线每个stage是4个GPU。
下面是论文之中提到的一些符号,这里有必要再取出来温习一下:
依据注释,我们得出目前分组情况和一些全局信息。
接下来结合代码看看需要分成多少个process groups,他们在代码之中的变量是什么。
具体如下:
world_size = 16
tensor_model_parallel_size = 2 # 2 GPUs to parallelize the model tensor
pipeline_model_parallel_size = 4 # 4 GPUs to parallelize the model pipeline
data_parallel_size = world_size // (tensor_model_parallel_size *
pipeline_model_parallel_size) # 2
num_tensor_model_parallel_groups = world_size // tensor_model_parallel_size # 8
num_pipeline_model_parallel_groups = world_size // pipeline_model_parallel_size # 4
num_data_parallel_groups = world_size // data_parallel_size # 8
本节我们分析的是,如何将 Node 上的 GPU 分给 tensor model 并行组。
对于注释例子,16 / 2 = 8,分成 8 个进程组,每个组 两个 rank。这些分组分别是:g0, g1, g2, g3, g4, g5, g6, g7, g8, g9, g10, g11, g12, g13, g14, g15,我们得到了如下信息:
我们再看看代码:
# Build the tensor model-parallel groups.
global _TENSOR_MODEL_PARALLEL_GROUP
for i in range(num_tensor_model_parallel_groups): # 8
ranks = range(i * tensor_model_parallel_size,
(i + 1) * tensor_model_parallel_size)
group = torch.distributed.new_group(ranks) # 就有生成 8 组
if rank in ranks:
# 如果本rank在某一list之中,即1 在 [0,1] 之中,则本 rank 就属于 new_group([0,1])
_TENSOR_MODEL_PARALLEL_GROUP = group
我们实验之中在这里得到:
------ Build the tensor model-parallel groups -----
[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]
[10, 11]
[12, 13]
[14, 15]
对应我们图上如下,每个 tensor model group 用一个虚线小矩形框标示,一共8个:
_TENSOR_MODEL_PARALLEL_GROUP = group 就记录了本rank的进程组信息,比如 rank 2,它的 _TENSOR_MODEL_PARALLEL_GROUP 内容就是:group(g2, g3)。
我们接下来看看如何使用。
get_tensor_model_parallel_group 返回了自己 rank 对应的 tensor model group。
def get_tensor_model_parallel_group():
"""Get the tensor model parallel group the caller rank belongs to."""
return _TENSOR_MODEL_PARALLEL_GROUP
在 megatron/mpu/mappings.py 之中有对 tensor model group 的使用:
def _reduce(input_):
"""All-reduce the input tensor across model parallel group."""
# Bypass the function if we are using only 1 GPU.
if get_tensor_model_parallel_world_size()==1:
return input_
# All-reduce.
torch.distributed.all_reduce(input_, group=get_tensor_model_parallel_group())
return input_
就是当流水线反向传播时候,利用 _TENSOR_MODEL_PARALLEL_GROUP 进行在组内进行集合通信。
本节我们分析的是,如何将 Node 上的 GPU 分给 pipeline model 并行组。
从注释可以看到,流水线分组就是把这个16个GPU 分成 4 组,每组 4 个 GPU,得到 g0, g4, g8, g12, g1, g5, g9, g13, g2, g6, g10, g14, g3, g7, g11, g15,我们得到了如下信息:
具体代码如下:
# Build the pipeline model-parallel groups and embedding groups
# (first and last rank in each pipeline model-parallel group).
global _PIPELINE_MODEL_PARALLEL_GROUP
global _PIPELINE_GLOBAL_RANKS
global _EMBEDDING_GROUP
for i in range(num_pipeline_model_parallel_groups): # 4
ranks = range(i, world_size, # 每隔 n // p个取一个
num_pipeline_model_parallel_groups)
group = torch.distributed.new_group(ranks)
if rank in ranks:
_PIPELINE_MODEL_PARALLEL_GROUP = group
_PIPELINE_GLOBAL_RANKS = ranks
# Setup embedding group (to exchange gradients between
# first and last stages).
if len(ranks) > 1:
embedding_ranks = [ranks[0], ranks[-1]]
else:
embedding_ranks = ranks
group = torch.distributed.new_group(embedding_ranks)
if rank in embedding_ranks:
_EMBEDDING_GROUP = group
我们拓展之前图如下,现在看到增加了 4 条从上到下的虚线箭头,分别对应了 4 组流水线串行。横向层是从 Stage 0 ~ Stage 3。
接下来看看如何使用。
get_pipeline_model_parallel_group 返回了自己 rank 对应的 pipeline model group。
def get_pipeline_model_parallel_group():
"""Get the pipeline model parallel group the caller rank belongs to."""
return _PIPELINE_MODEL_PARALLEL_GROUP
具体使用是在 megatron/p2p_communication.py,_communicate 之中会用流水线组信息来进行通信。这里省略了大部分代码。
def _communicate(tensor_send_next, tensor_send_prev, recv_prev, recv_next,
use_ring_exchange=False, tensor_shape=None,
override_scatter_gather_tensors_in_pipeline=False,
dtype_=None):
"""Communicate tensors between stages. Used as helper method in other
communication methods that are used in megatron/schedules.py.
"""
# Send tensors in both the forward and backward directions as appropriate.
if use_ring_exchange: # 这里使用get_pipeline_model_parallel_group 进行通信
torch.distributed.ring_exchange(tensor_send_prev=tensor_send_prev,
tensor_recv_prev=tensor_recv_prev,
tensor_send_next=tensor_send_next,
tensor_recv_next=tensor_recv_next,
group=mpu.get_pipeline_model_parallel_group())
else:
ops = []
if tensor_send_prev is not None:
send_prev_op = torch.distributed.P2POp(
torch.distributed.isend, tensor_send_prev,
mpu.get_pipeline_model_parallel_prev_rank()) # 得到流水线前一个rank
ops.append(send_prev_op)
if tensor_recv_prev is not None:
recv_prev_op = torch.distributed.P2POp(
torch.distributed.irecv, tensor_recv_prev,
mpu.get_pipeline_model_parallel_prev_rank())
ops.append(recv_prev_op)
if tensor_send_next is not None:
send_next_op = torch.distributed.P2POp(
torch.distributed.isend, tensor_send_next,
mpu.get_pipeline_model_parallel_next_rank()) # 得到流水线下一个rank
ops.append(send_next_op)
if tensor_recv_next is not None:
recv_next_op = torch.distributed.P2POp(
torch.distributed.irecv, tensor_recv_next,
mpu.get_pipeline_model_parallel_next_rank())
ops.append(recv_next_op)
具体如何得到流水线上下游的rank?是通过 get_pipeline_model_parallel_next_rank 和 get_pipeline_model_parallel_prev_rank 来完成。其中_PIPELINE_GLOBAL_RANKS 得到了进程组的ranks,假如本进程是 rank 2,则流水线进程组 ranks 是 g2, g6, g10, g14。
def get_pipeline_model_parallel_next_rank():
rank_in_pipeline = get_pipeline_model_parallel_rank()
world_size = get_pipeline_model_parallel_world_size()
return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline + 1) % world_size]
def get_pipeline_model_parallel_prev_rank():
rank_in_pipeline = get_pipeline_model_parallel_rank()
world_size = get_pipeline_model_parallel_world_size()
return _PIPELINE_GLOBAL_RANKS[(rank_in_pipeline - 1) % world_size]
get_pipeline_model_parallel_world_size 得到了进程组的 world size。
def get_pipeline_model_parallel_world_size():
"""Return world size for the pipeline model parallel group."""
global _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE
if _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE is not None:
return _MPU_PIPELINE_MODEL_PARALLEL_WORLD_SIZE
return torch.distributed.get_world_size(group=get_pipeline_model_parallel_group())
我们接下来看看数据并行。
对于注释例子,16 / 2 = 8,分成 8 个进程组,每个组 两个 rank。这些分组分别是:g0, g2, g1, g3, g4, g6, g5, g7, g8, g10, g9, g11, g12, g14, g13, g15,我们得到了如下信息:
我们再看看用代码怎么确定有哪些group,每个group里面包含什么。
ranks = range(start_rank + j, end_rank, tensor_model_parallel_size)
,意思是这stage的n//p个GPUs中,每隔 t 个取一个作为数据并行 group 之中的一份子,因此每个data-parallel group大小为 n // p // t = d。具体代码如下:
# Build the data-parallel groups.
global _DATA_PARALLEL_GROUP
assert _DATA_PARALLEL_GROUP is None, \
'data parallel group is already initialized'
all_data_parallel_group_ranks = []
for i in range(pipeline_model_parallel_size): # 遍历流水线深度
start_rank = i * num_pipeline_model_parallel_groups # 找到每个stage的起始rank
end_rank = (i + 1) * num_pipeline_model_parallel_groups # 找到每个stage的终止rank
for j in range(tensor_model_parallel_size): # 遍历tensor model分组size
ranks = range(start_rank + j, end_rank, # 每隔 t 个取一个作为数据并行group中的一份子
tensor_model_parallel_size)
all_data_parallel_group_ranks.append(list(ranks))
group = torch.distributed.new_group(ranks)
if rank in ranks:
_DATA_PARALLEL_GROUP = group
打印输出如下,和注释一致。
------ Build the data-parallel groups -----
[[0, 2], [1, 3], [4, 6], [5, 7], [8, 10], [9, 11], [12, 14], [13, 15]]
对应图片拓展如下:其中,每个新增的双箭头对应一个DDP(两个rank),比如2, 3对应一个DDP。
我们接下来看看如何使用。
get_data_parallel_group 会得到本rank对应的 _DATA_PARALLEL_GROUP。
def get_data_parallel_group():
"""Get the data parallel group the caller rank belongs to."""
return _DATA_PARALLEL_GROUP
在 allreduce_gradients之中,会对本数据并行组进行all-reduce。
def allreduce_gradients(self):
"""Reduce gradients across data parallel ranks."""
# If we have buffers, simply reduce the data in the buffer.
if self._grad_buffers is not None:
for _, buffer_ in self._grad_buffers.items():
buffer_.data /= mpu.get_data_parallel_world_size() # 数据并行 world size
torch.distributed.all_reduce(
buffer_.data, group=mpu.get_data_parallel_group()) # 数据并行组
else:
# Otherwise, bucketize and all-reduce
buckets = {}
# Pack the buckets.
for param in self.module.parameters():
if param.requires_grad and param.grad is not None:
tp = param.data.type()
if tp not in buckets:
buckets[tp] = []
buckets[tp].append(param)
param.main_grad = param.grad
# For each bucket, all-reduce and copy all-reduced grads.
for tp in buckets:
bucket = buckets[tp]
grads = [param.grad.data for param in bucket]
coalesced = _flatten_dense_tensors(grads)
coalesced /= mpu.get_data_parallel_world_size()
torch.distributed.all_reduce(
coalesced, group=mpu.get_data_parallel_group())
for buf, synced in zip(grads, _unflatten_dense_tensors(
coalesced, grads)):
buf.copy_(synced)
前面实验中,我们得到模型并行组如下:0, 1, 4, 5, 8, 9, 12, 13。生成代码如下:
# Build the model-parallel groups.
global _MODEL_PARALLEL_GROUP
for i in range(data_parallel_size):
ranks = [data_parallel_group_ranks[i]
for data_parallel_group_ranks in all_data_parallel_group_ranks]
group = torch.distributed.new_group(ranks)
if rank in ranks:
_MODEL_PARALLEL_GROUP = group
_MODEL_PARALLEL_GROUP 会得到本rank对应的模型组。
def get_model_parallel_group():
"""Get the model parallel group the caller rank belongs to."""
return _MODEL_PARALLEL_GROUP
这里是裁剪梯度会用到,就是在本模型的全部rank之中进行梯度裁剪相关操作。
def clip_grad_norm_fp32(parameters, max_norm, norm_type=2):
"""Clips gradient norm of an iterable of parameters whose gradients
are in fp32.
This is adapted from torch.nn.utils.clip_grad.clip_grad_norm_ and
added functionality to handle model parallel parameters. Note that
the gradients are modified in place.
Arguments:
parameters (Iterable[Tensor] or Tensor): an iterable of Tensors or a
single Tensor that will have gradients normalized
max_norm (float or int): max norm of the gradients
norm_type (float or int): type of the used p-norm. Can be ``'inf'`` for
infinity norm.
Returns:
Total norm of the parameters (viewed as a single vector).
"""
if isinstance(parameters, torch.Tensor):
parameters = [parameters]
# Filter parameters based on:
# - grad should not be none
# - parameter should not be shared
# - should not be a replica due to tensor model parallelism
grads = []
grads_for_norm = []
for param in parameters:
grad_not_none = param.grad is not None
is_not_shared = param_is_not_shared(param)
is_not_tp_duplicate = param_is_not_tensor_parallel_duplicate(param)
grad = param.grad.detach()
if grad_not_none:
# Make sure the grads are in fp32
grads.append(grad)
if grad_not_none and is_not_shared and is_not_tp_duplicate:
grads_for_norm.append(grad)
# Norm parameters.
max_norm = float(max_norm)
norm_type = float(norm_type)
total_norm = 0.0
# Calculate norm.
if norm_type == inf:
total_norm = max(grad.abs().max() for grad in grads_for_norm)
total_norm_cuda = torch.cuda.FloatTensor([float(total_norm)])
# Take max across all model-parallel GPUs.
torch.distributed.all_reduce(total_norm_cuda,
op=torch.distributed.ReduceOp.MAX,
group=mpu.get_model_parallel_group()) # 模型组信息
total_norm = total_norm_cuda[0].item()
else:
if norm_type == 2.0:
dummy_overflow_buf = torch.cuda.IntTensor([0])
# Use apex's multi-tensor applier for efficiency reasons.
# Multi-tensor applier takes a function and a list of list
# and performs the operation on that list all in one kernel.
grad_norm, _ = multi_tensor_applier(
amp_C.multi_tensor_l2norm,
dummy_overflow_buf,
[grads_for_norm],
False # no per-parameter norm
)
# Since we will be summing across data parallel groups,
# we need the pow(norm-type).
total_norm = grad_norm ** norm_type
else:
for grad in grads_for_norm:
grad_norm = torch.norm(grad, norm_type)
total_norm += grad_norm ** norm_type
# Sum across all model-parallel GPUs.
torch.distributed.all_reduce(total_norm,
op=torch.distributed.ReduceOp.SUM,
group=mpu.get_model_parallel_group()) # 模型组信息
total_norm = total_norm.item() ** (1.0 / norm_type)
# Scale.
clip_coeff = max_norm / (total_norm + 1.0e-6)
if clip_coeff < 1.0:
dummy_overflow_buf = torch.cuda.IntTensor([0])
multi_tensor_applier(amp_C.multi_tensor_scale,
dummy_overflow_buf,
[grads, grads],
clip_coeff)
return total_norm
之前的图如下,利用看到分成两组,左边是Model 0 对应的全部ranks,右面是model 1 的ranks。
我们最后还有一个问题没有涉及,就是如何把模型分块放到对应的GPU之上。就是如何与最初分成A,B,..., H 的那个图对应起来。其实,不是根据模型来把模型部分拷贝到对应的rank或者GPU,而是rank或者GPU主动过来拷贝自己对应的层。
具体 ParallelTransformer 初始化代码如下:
class ParallelTransformer(MegatronModule):
"""Transformer class."""
def __init__(self, init_method, output_layer_init_method,
layer_type=LayerType.encoder,
self_attn_mask_type=AttnMaskType.padding,
pre_process=True, post_process=True):
super(ParallelTransformer, self).__init__()
args = get_args()
# 省略代码
# Transformer layers.
def build_layer(layer_number):
return ParallelTransformerLayer(
init_method,
output_layer_init_method,
layer_number,
layer_type=layer_type,
self_attn_mask_type=self_attn_mask_type)
# 下面 offset 就是根据rank知道自己应该生成模型的那些层
if args.virtual_pipeline_model_parallel_size is not None:
# Number of layers in each model chunk is the number of layers in the stage,
# divided by the number of model chunks in a stage.
self.num_layers = self.num_layers // args.virtual_pipeline_model_parallel_size
# With 8 layers, 2 stages, and 4 model chunks, we want an assignment of
# layers to stages like (each list is a model chunk):
# Stage 0: [0] [2] [4] [6]
# Stage 1: [1] [3] [5] [7]
# With 8 layers, 2 stages, and 2 virtual stages, we want an assignment of
# layers to stages like (each list is a model chunk):
# Stage 0: [0, 1] [4, 5]
# Stage 1: [2, 3] [6, 7]
offset = mpu.get_virtual_pipeline_model_parallel_rank() * (
args.num_layers // args.virtual_pipeline_model_parallel_size) + \
(mpu.get_pipeline_model_parallel_rank() * self.num_layers)
else:
# Each stage gets a contiguous set of layers.
offset = mpu.get_pipeline_model_parallel_rank() * self.num_layers
self.layers = torch.nn.ModuleList(
[build_layer(i + 1 + offset) for i in range(self.num_layers)])
if self.post_process:
# Final layer norm before output.
self.final_layernorm = LayerNorm(
args.hidden_size,
eps=args.layernorm_epsilon)
所以,最终效果如下,其中同名子模块具有同样的参数,可以数据并行,即两个A可以数据并行。一列上的层之间可以流水线串行,比如 A--> C --> E --> G 就是串行,而一个横行4个是流水线的一个stage,其中从0开始,横向相邻两个GPU是 tensor model 并行。
GTC 2020: Megatron-LM: Training Multi-Billion Parameter Language Models Using Model Parallelism