MindSpeed框架理解

2025年8月20日 0 作者 ScotI_Blog

first step

MindSpeed作为适配megatron的分布式大模型推理框架,具有相当好的拆解与学习价值,现在由于需要去结合MindSpeed去做一些测试工作,所以首先得慢慢理解这个框架的结构,使用、拆解其中的关键代码,方便后期的测试。首先我们从pretrain_gpt.py出发,去看哪些代码是我们可以理解使用的

import部分直接跳过不看。我们先来看第一个方法model_provider

provide_model

参数如下:
Args:
pre_process (bool, optional): Set to true if you need to compute embedings. Defaults to True.
post_process (bool, optional): Set to true if you need to want to compute output logits/loss. Defaults to True.

Returns: Union[GPTModel, megatron.legacy.model.GPTModel]: The returned model

总体是用来加载mcore或者leagcy形式model的一个方法,然后可以选有pre/post process,现在理解是方便连上embedding层和loss计算层的。

关于mcore和legacy类型模型解释

MCore 模型和 Legacy 模型的区别主要出现在与 Megatron 相关的技术背景下。MCore 指的是 Megatron Core,它是由 Megatron 的早期版本(即 Legacy 版本)经过进一步的抽象和封装而来的 。与 Legacy 模型相比,Megatron-Core 提供了更灵活的接口和底层功能 。最近版本的 Megatron 已经加入了 MCore 模型,用于替代之前的 Legacy 模型,例如在将 Hugging Face 的 llama-2 模型转换到 Megatron 时,现在倾向于使用 mcore 模型而非 legacy 模型 。Megatron Core (Mcore) 能够支持用户大规模训练 Transformer 模型

然后本函数根据mcore或者legacy两种类型进行模型的实例化,如下:

if not args.use_legacy_models:
        if args.spec is not None:
            transformer_layer_spec = import_module(args.spec)
        else:
            if use_te:
                transformer_layer_spec = get_gpt_layer_with_transformer_engine_spec(args.num_experts, args.moe_grouped_gemm)
            else:
                transformer_layer_spec = get_gpt_layer_local_spec(args.num_experts, args.moe_grouped_gemm)
        mtp_block_spec = None
        if args.mtp_num_layers is not None:
            mtp_block_spec = get_gpt_mtp_block_spec(config, transformer_layer_spec, use_transformer_engine=use_te)

        model = GPTModel(
            config=config,
            transformer_layer_spec=transformer_layer_spec,
            vocab_size=args.padded_vocab_size,
            max_sequence_length=args.max_position_embeddings,
            pre_process=pre_process,
            post_process=post_process,
            fp16_lm_cross_entropy=args.fp16_lm_cross_entropy,
            parallel_output=True,
            share_embeddings_and_output_weights=not args.untie_embeddings_and_output_weights,
            position_embedding_type=args.position_embedding_type,
            rotary_percent=args.rotary_percent,
            seq_len_interpolation_factor=args.rotary_seq_len_interpolation_factor,
            mtp_block_spec=mtp_block_spec,
        )
    else:
        if not args.context_parallel_size == 1:
            raise ValueError("Context parallelism is only supported with Megatron Core!")

        model = megatron.legacy.model.GPTModel(
            config,
            num_tokentypes=0,
            parallel_output=True,
            pre_process=pre_process,
            post_process=post_process
        )

其中mcore有一个关于transformer层的use_te参数配置,意思是是否使用nv的transformer engine,能够有效提高训练效率。其他参数配置这边暂不涉及,都是一些配置,还有pre和post的配置直接传参即可,然后context_parallelism应该是一个和TP PP类似的并行手段,仅仅适用于mcore的模型

第二个函数是get_batch,给每个npu获取TP,PP后切分的batch的数据的一个分配办法

def get_batch(data_iterator):
    """Generate a batch."""

    args = get_args()

    is_middle_stage = not (mpu.is_pipeline_first_stage() or mpu.is_pipeline_last_stage())
    pretrain_not_tnd_flags = not args.is_instruction_dataset and not args.reset_position_ids
    if pretrain_not_tnd_flags and is_middle_stage:
        return (None,) * 5

    # get batches based on the TP rank you are on
    batch, actual_seq_len = get_batch_on_this_tp_rank(data_iterator)

    if args.return_document_ids and mpu.get_context_parallel_rank() == 0 and mpu.get_tensor_model_parallel_rank() == 0 and mpu.get_pipeline_model_parallel_rank() == 0:
        print("current idx: {}, current rank: {}, data_parallel_rank: {}, document_ids: {}".format(batch['idx'], torch.distributed.get_rank(), mpu.get_data_parallel_rank(), batch['document_ids']))
        batch.pop('document_ids', None)
        batch.pop('idx', None)

    if args.reset_position_ids and not args.reset_attention_mask:
        generate_actual_seq_len(batch, actual_seq_len)
        batch = get_batch_on_this_cp_rank(batch)
    else:
        # slice batch along sequence dimension for context parallelism
        batch = get_batch_on_this_cp_rank(batch)
    return batch.values()

batch没啥好看的因为分布式模型训练的数据处理我不懂(划掉)所以这个函数就这样过一下,把它当成NPU的一个输入获取器就行

第三个函数是loss_func,具体内容我们先看再理解:


def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor):
    """Loss function.

    Args:
        loss_mask (torch.Tensor): Used to mask out some portions of the loss
        output_tensor (torch.Tensor): The tensor with the losses
    """    
    args = get_args()

    losses = output_tensor.float()
    loss_mask = loss_mask.view(-1).float()
    if args.context_parallel_size > 1:
        loss = torch.cat([torch.sum(losses.view(-1) * loss_mask).view(1), loss_mask.sum().view(1)])
        torch.distributed.all_reduce(loss, group=mpu.get_context_parallel_group())
        loss = loss[0] / loss[1]
    else:
        loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()

    # Check individual rank losses are not NaN prior to DP all-reduce.
    if args.check_for_nan_in_loss_and_grad:
        global_rank = torch.distributed.get_rank()
        if loss.isnan():
            raise ValueError(f'Rank {global_rank}: found NaN in local forward loss calculation. '
                             f'Device: {torch.cuda.current_device()}, node: {os.uname()[1]}')

    if args.async_log_allreduce:
        # Reduce loss for logging, which is different from megatron pretrain_gpt.py.
        reporting_loss = loss.clone().detach()

        allreduce_handle = torch.distributed.all_reduce(
            reporting_loss, group=mpu.get_data_parallel_group(), async_op=True
        )

        return loss * args.context_parallel_size, ({"lm loss": (reporting_loss)}, allreduce_handle)
    else:

        # Reduce loss for logging.
        averaged_loss = average_losses_across_data_parallel_group([loss])

        return loss * args.context_parallel_size, {'lm loss': averaged_loss[0]}

loss_mask是因为有些地方的损失属于无效的,所以mask掉之后再进行计算

  • get_args()
    获取训练配置参数(来自全局配置对象 args),例如:
    • args.context_parallel_size:上下文并行的设备数量(用于分布式训练)。
    • args.check_for_nan_in_loss_and_grad:是否检查损失中的 NaN 值。
    • args.async_log_allreduce:是否异步执行 all_reduce 操作以优化日志记录。

计算的时候output_tensor和mask都展开成view(-1)的一维张量方便计算,长度都是batch_size*seq_length

由于是分布式的,所以计算误差也需要考虑到分布式的存在

loss calculate损失计算(关键逻辑)

根据 args.context_parallel_size 的值分两种情况处理:

情况 1:context_parallel_size > 1(上下文并行启用)

if args.context_parallel_size > 1:
    loss = torch.cat([torch.sum(losses.view(-1) * loss_mask).view(1), 
                      loss_mask.sum().view(1)])
    torch.distributed.all_reduce(loss, group=mpu.get_context_parallel_group())
    loss = loss[0] / loss[1]
  • 目的:在上下文并行(Context Parallelism)场景下,正确聚合跨设备的损失。
  • 步骤
    1. 局部计算
      • 每个设备计算本地损失总和 sum(losses * loss_mask) 和有效 token 数 loss_mask.sum()
    2. 全局聚合
      • 通过 all_reduce 操作,在上下文并行组内汇总所有设备的损失总和与有效 token 数。
      • 例如:设备 A 的损失总和为 30、有效 token 数 50;设备 B 为 4050 → 全局总和为 70100
    3. 全局平均
      • loss = 全局损失总和 / 全局有效 token 数(即 70/100 = 0.7)。
  • 为什么需要?
    上下文并行将输入序列分片到多个设备,每个设备仅处理部分序列。直接计算局部平均会导致结果偏差,必须通过全局聚合得到正确的平均损失。

情况 2:context_parallel_size = 1(无上下文并行)

else:
    loss = torch.sum(losses.view(-1) * loss_mask) / loss_mask.sum()

结果即为标准的 Masked Loss(忽略无效位置的损失)。

逻辑:直接计算本地损失的加权平均。

sum(losses * loss_mask):有效位置的损失总和。
loss_mask.sum():有效位置的总数。

然后下一个就是很关键的一个函数:前向传播函数

forward_step函数

def forward_step(data_iterator, model: GPTModel):
    """Forward training step.

    Args:
        data_iterator : Input data iterator
        model (GPTModel): The GPT Model
    """
    args = get_args()
    timers = get_timers()

    # Get the batch.
    timers('batch-generator', log_level=2).start()
    tokens, labels, loss_mask, attention_mask, position_ids = get_batch(
        data_iterator)
    timers('batch-generator').stop()

    if args.use_legacy_models:
        output_tensor = model(tokens, position_ids, attention_mask,
                              labels=labels)
    else:
        output_tensor = model(tokens, position_ids, attention_mask,
                              labels=labels, loss_mask=loss_mask)

    return output_tensor, partial(loss_func, loss_mask)
  • get_batch 的作用
    data_iterator 中提取一个训练批次的数据,各字段含义:tokens输入 token IDs(形状[batch_size, seq_len])。labels目标 token IDs(通常为tokens右移一位,用于计算损失)。loss_mask二进制掩码,标记哪些位置参与损失计算(1=有效,0=填充/忽略)。attention_mask注意力掩码,防止模型关注填充部分(如[1,1,1,0,0])。position_ids位置编码 IDs(通常为[0,1,2,...,seq_len-1])。
  • 典型场景
    • 在语言模型中,loss_mask 可能标记出需要预测的 token 位置(例如仅计算句子结尾的损失)。
    • attention_mask 处理变长序列(避免 padding 影响注意力计算)。

我们注意到,在最后一步做forward获取output时,mcore比legacy额外需要一个lossmask,原因如下,也体现了mcore的一个优势:

  • MCore 的设计目标
    • 提供更细粒度的控制(如将 loss_mask 直接传递给 Transformer 层,优化上下文并行中的损失计算)。
    • Transformer Engine (TE) 深度集成(例如 use_te=True 时需显式传递 loss_mask)。
  • Legacy 的局限性
    • 损失掩码仅在 loss_func 中应用,无法在模型内部利用(例如无法在注意力层提前过滤无效位置)。

其他部分相对来说就次要一些了,我们发现这个pretrain_gpt其实是从megatron中提取的部分,其他内容如下:

数据集构建 (train_valid_test_datasets_provider 函数)

  • 功能: 构建训练、验证和测试数据集。
  • 实现细节:
    • 根据配置参数构建GPT数据集。
    • 支持使用模拟数据 (MockGPTDataset) 或真实数据 (GPTDataset)。
    • 使用 BlendedMegatronDatasetBuilder 来构建混合数据集。

6. 核心GPT数据集配置 (core_gpt_dataset_config_from_args 函数)

  • 功能: 从命令行参数中提取配置并构建GPT数据集的配置对象。
  • 实现细节:
    • 配置包括随机种子、序列长度、数据混合比例、数据集分割、缓存路径等。

7. 主程序入口 (if __name__ == "__main__":)

  • 功能: 启动预训练流程。
  • 实现细节:
    • 调用 pretrain 函数,传入数据集提供者、模型提供者、模型类型、前向传播函数等参数。
    • 设置默认的tokenizer类型为 GPT2BPETokenizer

8. 其他模块

  • StragglerDetector: 用于检测训练中的慢节点。
  • mpu (Model Parallel Utilities): 提供模型并行相关的工具函数,如判断当前是否是管道的第一个或最后一个阶段。
  • get_args: 获取命令行参数。
  • get_tokenizer: 获取tokenizer。
  • get_timers: 获取计时器,用于性能分析。

Print Friendly, PDF & Email