MindSpeed框架理解
Contents
first step
MindSpeed作为适配megatron的分布式大模型推理框架,具有相当好的拆解与学习价值,现在由于需要去结合MindSpeed去做一些测试工作,所以首先得慢慢理解这个框架的结构,使用、拆解其中的关键代码,方便后期的测试。首先我们从pretrain_gpt.py出发,去看哪些代码是我们可以理解使用的
import部分直接跳过不看。我们先来看第一个方法model_provider
provide_model
参数如下:
Args:
pre_process (bool, optional): Set to true if you need to compute embedings. Defaults to True.
post_process (bool, optional): Set to true if you need to want to compute output logits/loss. Defaults to True.
Returns: Union[GPTModel, megatron.legacy.model.GPTModel]: The returned model
总体是用来加载mcore或者leagcy形式model的一个方法,然后可以选有pre/post process,现在理解是方便连上embedding层和loss计算层的。
关于mcore和legacy类型模型解释
MCore 模型和 Legacy 模型的区别主要出现在与 Megatron 相关的技术背景下。MCore 指的是 Megatron Core,它是由 Megatron 的早期版本(即 Legacy 版本)经过进一步的抽象和封装而来的 。与 Legacy 模型相比,Megatron-Core 提供了更灵活的接口和底层功能 。最近版本的 Megatron 已经加入了 MCore 模型,用于替代之前的 Legacy 模型,例如在将 Hugging Face 的 llama-2 模型转换到 Megatron 时,现在倾向于使用 mcore 模型而非 legacy 模型 。Megatron Core (Mcore) 能够支持用户大规模训练 Transformer 模型
然后本函数根据mcore或者legacy两种类型进行模型的实例化,如下:
if not args.use_legacy_models:
if args.spec is not None:
transformer_layer_spec = import_module(args.spec)
else:
if use_te:
transformer_layer_spec = get_gpt_layer_with_transformer_engine_spec(args.num_experts, args.moe_grouped_gemm)
else:
transformer_layer_spec = get_gpt_layer_local_spec(args.num_experts, args.moe_grouped_gemm)
mtp_block_spec = None
if args.mtp_num_layers is not None:
mtp_block_spec = get_gpt_mtp_block_spec(config, transformer_layer_spec, use_transformer_engine=use_te)
model = GPTModel(
config=config,
transformer_layer_spec=transformer_layer_spec,
vocab_size=args.padded_vocab_size,
max_sequence_length=args.max_position_embeddings,
pre_process=pre_process,
post_process=post_process,
fp16_lm_cross_entropy=args.fp16_lm_cross_entropy,
parallel_output=True,
share_embeddings_and_output_weights=not args.untie_embeddings_and_output_weights,
position_embedding_type=args.position_embedding_type,
rotary_percent=args.rotary_percent,
seq_len_interpolation_factor=args.rotary_seq_len_interpolation_factor,
mtp_block_spec=mtp_block_spec,
)
else:
if not args.context_parallel_size == 1:
raise ValueError("Context parallelism is only supported with Megatron Core!")
model = megatron.legacy.model.GPTModel(
config,
num_tokentypes=0,
parallel_output=True,
pre_process=pre_process,
post_process=post_process
)
其中mcore有一个关于transformer层的use_te参数配置,意思是是否使用nv的transformer engine,能够有效提高训练效率。其他参数配置这边暂不涉及,都是一些配置,还有pre和post的配置直接传参即可,然后context_parallelism应该是一个和TP PP类似的并行手段,仅仅适用于mcore的模型
第二个函数是get_batch,给每个npu获取TP,PP后切分的batch的数据的一个分配办法
def get_batch(data_iterator):
"""Generate a batch."""
args = get_args()
is_middle_stage = not (mpu.is_pipeline_first_stage() or mpu.is_pipeline_last_stage())
pretrain_not_tnd_flags = not args.is_instruction_dataset and not args.reset_position_ids
if pretrain_not_tnd_flags and is_middle_stage:
return (None,) * 5
# get batches based on the TP rank you are on
batch, actual_seq_len = get_batch_on_this_tp_rank(data_iterator)
if args.return_document_ids and mpu.get_context_parallel_rank() == 0 and mpu.get_tensor_model_parallel_rank() == 0 and mpu.get_pipeline_model_parallel_rank() == 0:
print("current idx: {}, current rank: {}, data_parallel_rank: {}, document_ids: {}".format(batch['idx'], torch.distributed.get_rank(), mpu.get_data_parallel_rank(), batch['document_ids']))
batch.pop('document_ids', None)
batch.pop('idx', None)
if args.reset_position_ids and not args.reset_attention_mask:
generate_actual_seq_len(batch, actual_seq_len)
batch = get_batch_on_this_cp_rank(batch)
else:
# slice batch along sequence dimension for context parallelism
batch = get_batch_on_this_cp_rank(batch)
return batch.values()
batch没啥好看的因为分布式模型训练的数据处理我不懂(划掉)所以这个函数就这样过一下,把它当成NPU的一个输入获取器就行
第三个函数是loss_func,具体内容我们先看再理解:
def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor):
"""Loss function.
Args:
loss_mask (torch.Tensor): Used to mask out some portions of the loss
output_tensor (torch.Tensor): The tensor with the losses
"""
args = get_args()
losses = output_tensor.float()
loss_mask = loss_mask.view(-1).float()
if args.context_parallel_size > 1:
loss = torch.cat([torch.sum(losses.view(-1) * loss_mask).view(1), loss_mask.sum().view(1)])
torch.distributed.all_reduce(loss, group=mpu.get_context_parallel_group())
loss = loss[0] / loss[1]
else:
loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
# Check individual rank losses are not NaN prior to DP all-reduce.
if args.check_for_nan_in_loss_and_grad:
global_rank = torch.distributed.get_rank()
if loss.isnan():
raise ValueError(f'Rank {global_rank}: found NaN in local forward loss calculation. '
f'Device: {torch.cuda.current_device()}, node: {os.uname()[1]}')
if args.async_log_allreduce:
# Reduce loss for logging, which is different from megatron pretrain_gpt.py.
reporting_loss = loss.clone().detach()
allreduce_handle = torch.distributed.all_reduce(
reporting_loss, group=mpu.get_data_parallel_group(), async_op=True
)
return loss * args.context_parallel_size, ({"lm loss": (reporting_loss)}, allreduce_handle)
else:
# Reduce loss for logging.
averaged_loss = average_losses_across_data_parallel_group([loss])
return loss * args.context_parallel_size, {'lm loss': averaged_loss[0]}
loss_mask是因为有些地方的损失属于无效的,所以mask掉之后再进行计算
get_args()
获取训练配置参数(来自全局配置对象args
),例如:args.context_parallel_size
:上下文并行的设备数量(用于分布式训练)。args.check_for_nan_in_loss_and_grad
:是否检查损失中的NaN
值。args.async_log_allreduce
:是否异步执行all_reduce
操作以优化日志记录。
计算的时候output_tensor和mask都展开成view(-1)的一维张量方便计算,长度都是batch_size*seq_length
由于是分布式的,所以计算误差也需要考虑到分布式的存在
loss calculate损失计算(关键逻辑)
根据 args.context_parallel_size
的值分两种情况处理:
情况 1:context_parallel_size > 1
(上下文并行启用)
if args.context_parallel_size > 1:
loss = torch.cat([torch.sum(losses.view(-1) * loss_mask).view(1),
loss_mask.sum().view(1)])
torch.distributed.all_reduce(loss, group=mpu.get_context_parallel_group())
loss = loss[0] / loss[1]
- 目的:在上下文并行(Context Parallelism)场景下,正确聚合跨设备的损失。
- 步骤:
- 局部计算:
- 每个设备计算本地损失总和
sum(losses * loss_mask)
和有效 token 数loss_mask.sum()
。
- 每个设备计算本地损失总和
- 全局聚合:
- 通过
all_reduce
操作,在上下文并行组内汇总所有设备的损失总和与有效 token 数。 - 例如:设备 A 的损失总和为
30
、有效 token 数50
;设备 B 为40
、50
→ 全局总和为70
、100
。
- 通过
- 全局平均:
loss = 全局损失总和 / 全局有效 token 数
(即70/100 = 0.7
)。
- 局部计算:
- 为什么需要?
上下文并行将输入序列分片到多个设备,每个设备仅处理部分序列。直接计算局部平均会导致结果偏差,必须通过全局聚合得到正确的平均损失。
情况 2:context_parallel_size = 1
(无上下文并行)
else:
loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()
结果即为标准的 Masked Loss(忽略无效位置的损失)。
逻辑:直接计算本地损失的加权平均。
sum(losses * loss_mask)
:有效位置的损失总和。loss_mask.sum()
:有效位置的总数。
然后下一个就是很关键的一个函数:前向传播函数
forward_step函数
def forward_step(data_iterator, model: GPTModel):
"""Forward training step.
Args:
data_iterator : Input data iterator
model (GPTModel): The GPT Model
"""
args = get_args()
timers = get_timers()
# Get the batch.
timers('batch-generator', log_level=2).start()
tokens, labels, loss_mask, attention_mask, position_ids = get_batch(
data_iterator)
timers('batch-generator').stop()
if args.use_legacy_models:
output_tensor = model(tokens, position_ids, attention_mask,
labels=labels)
else:
output_tensor = model(tokens, position_ids, attention_mask,
labels=labels, loss_mask=loss_mask)
return output_tensor, partial(loss_func, loss_mask)
get_batch
的作用:
从data_iterator
中提取一个训练批次的数据,各字段含义:tokens
输入 token IDs(形状[batch_size, seq_len]
)。labels
目标 token IDs(通常为tokens
右移一位,用于计算损失)。loss_mask
二进制掩码,标记哪些位置参与损失计算(1
=有效,0
=填充/忽略)。attention_mask
注意力掩码,防止模型关注填充部分(如[1,1,1,0,0]
)。position_ids
位置编码 IDs(通常为[0,1,2,...,seq_len-1]
)。- 典型场景:
- 在语言模型中,
loss_mask
可能标记出需要预测的 token 位置(例如仅计算句子结尾的损失)。 attention_mask
处理变长序列(避免 padding 影响注意力计算)。
- 在语言模型中,
我们注意到,在最后一步做forward获取output时,mcore比legacy额外需要一个lossmask,原因如下,也体现了mcore的一个优势:
- MCore 的设计目标:
- 提供更细粒度的控制(如将
loss_mask
直接传递给 Transformer 层,优化上下文并行中的损失计算)。 - 与 Transformer Engine (TE) 深度集成(例如
use_te=True
时需显式传递loss_mask
)。
- 提供更细粒度的控制(如将
- Legacy 的局限性:
- 损失掩码仅在
loss_func
中应用,无法在模型内部利用(例如无法在注意力层提前过滤无效位置)。
- 损失掩码仅在
其他部分相对来说就次要一些了,我们发现这个pretrain_gpt其实是从megatron中提取的部分,其他内容如下:
数据集构建 (train_valid_test_datasets_provider
函数)
- 功能: 构建训练、验证和测试数据集。
- 实现细节:
- 根据配置参数构建GPT数据集。
- 支持使用模拟数据 (
MockGPTDataset
) 或真实数据 (GPTDataset
)。 - 使用
BlendedMegatronDatasetBuilder
来构建混合数据集。
6. 核心GPT数据集配置 (core_gpt_dataset_config_from_args
函数)
- 功能: 从命令行参数中提取配置并构建GPT数据集的配置对象。
- 实现细节:
- 配置包括随机种子、序列长度、数据混合比例、数据集分割、缓存路径等。
7. 主程序入口 (if __name__ == "__main__":
)
- 功能: 启动预训练流程。
- 实现细节:
- 调用
pretrain
函数,传入数据集提供者、模型提供者、模型类型、前向传播函数等参数。 - 设置默认的tokenizer类型为
GPT2BPETokenizer
。
- 调用
8. 其他模块
- StragglerDetector: 用于检测训练中的慢节点。
- mpu (Model Parallel Utilities): 提供模型并行相关的工具函数,如判断当前是否是管道的第一个或最后一个阶段。
- get_args: 获取命令行参数。
- get_tokenizer: 获取tokenizer。
- get_timers: 获取计时器,用于性能分析。