3-Pretrain#

预训练是模型经历的第一个阶段,在该阶段,模型将会吸收知识,学习尽可能正确的下一词语预测范式。在这个笔记本中,我们仅对预训练的训练流程进行展示和学习,因此只给出必要的代码片段,如 wandb 和 ddp 不会在此笔记本中涉及。

此笔记本的完整实现见主仓库 ../raw/data/Minimind/train_pretrain.py

# -------------------- 导入标准和第三方库 --------------------
# 导入 os 库,用于与操作系统交互,比如处理文件路径
import os
# 导入 platform 库,用于获取系统信息,如操作系统类型
import platform
# 导入 argparse 库,用于解析命令行参数(在 .py 脚本中常用,这里我们用一个 class 模拟)
import argparse
# 导入 time 库,用于计时,比如计算训练时长
import time
# 导入 math 库,用于数学运算,比如计算学习率
import math
# 导入 warnings 库,用于控制警告信息的显示
import warnings
# 导入 pandas 库,一个强大的数据分析工具
import pandas as pd
# 导入 PyTorch 核心库
import torch
# 导入 PyTorch 的分布式训练库(用于多 GPU 训练)
import torch.distributed as dist
# 从 PyTorch 中导入 optim (优化器) 和 nn (神经网络模块)
from torch import optim, nn
# 从 PyTorch 中导入分布式数据并行工具 (DDP),用于在多个 GPU 上同步模型
from torch.nn.parallel import DistributedDataParallel
# 从 PyTorch 中导入一个学习率调度器,可以动态调整学习率
from torch.optim.lr_scheduler import CosineAnnealingLR
# 从 PyTorch 中导入 DataLoader (数据加载器) 和 DistributedSampler (用于分布式训练的采样器)
from torch.utils.data import DataLoader, DistributedSampler
# 导入一个上下文管理器,用于在不同条件下执行代码块(比如是否启用混合精度)
from contextlib import nullcontext

# -------------------- 导入 Hugging Face 和自定义模块 --------------------
# 从 transformers 库导入 AutoTokenizer,用于自动加载分词器
from transformers import AutoTokenizer

# 从我们自己的 model 文件夹中导入之前编写好的模块
from model.model import MiniMindLM # 导入我们完整的 MiniMind 语言模型
from model.LMConfig import LMConfig # 导入模型配置类
from model.dataset import PretrainDataset # 导入我们为预训练准备的数据集类
# 这行代码的作用是设置警告过滤器,让程序忽略所有的警告信息
# 在开发和演示中,这可以使输出更加整洁,避免被不重要的警告刷屏
# 在生产环境中,通常需要谨慎使用,因为某些警告可能预示着潜在问题
warnings.filterwarnings('ignore')

可选参数设置#

首先,查看训练的可选参数,这些参数在实际使用时通过解析命令行进行导入,我们用 class 进行包装.

# 在 Jupyter Notebook 中,我们无法像运行 .py 脚本那样从命令行传入参数
# 所以,这里定义一个 class `args` 来模拟命令行参数,方便管理和修改所有超参数
class args:
    # 训练过程超参数
    epochs: int = 1                  # 整个数据集要被训练多少遍
    batch_size: int = 2              # 每一步(step)训练多少个样本。因为我们的玩具数据集只有2个样本,所以设为2
    learning_rate: float = 5e-4      # 学习率,控制模型参数更新的幅度
    
    # 硬件和精度设置
    device: str = 'cuda' if torch.cuda.is_available() else 'cpu' # 自动检测是否有可用的 GPU (cuda),否则使用 CPU
    dtype: str = 'bfloat16'          # 使用的数据类型。'bfloat16' 是一种半精度浮点数,可以加速训练并减少显存占用
    
    # 日志和数据加载
    wandb_project: str = 'MiniMind-Notebook' # 用于 wandb 日志记录的项目名 (本 notebook 未使用)
    num_workers: int = 1             # DataLoader 加载数据时使用多少个子进程
    
    # 高级训练技巧
    accumulation_steps: int = 1      # 梯度累积步数。累积多少个小 batch 的梯度再统一更新一次模型,可以模拟更大的 batch size
    grad_clip: float = 1.0           # 梯度裁剪阈值。防止梯度爆炸,稳定训练
    warmup_iters: int = 0            # 学习率预热步数。在训练初期让学习率从小慢慢增长到设定值,有助于稳定训练
    log_interval: int = 1            # 每隔多少步打印一次训练日志
    
    # 分布式训练相关
    local_rank: int = 1              # 在多 GPU 训练中,当前 GPU 的编号 (本 notebook 未使用)

    # 模型结构超参数 (会传递给 LMConfig)
    dim: int = 512                   # 模型的内部维度 (词嵌入维度)
    n_layers: int = 2                # Transformer Block 的层数
    max_seq_len: int = 512           # 模型能处理的最大序列长度
    use_moe: bool = False            # 是否使用混合专家(MoE)模型结构

    # 数据路径
    data_path: str = '../raw/data/Minimind/toydata/pretrain_data.jsonl' # 预训练数据文件的路径
# 打印出我们刚才在 `args` 中设置的 `device`
# 这可以帮助我们确认程序是否正确地检测到了 GPU
print(f'查看工作设备 {args.device}')
查看工作设备 cuda

初始化训练#

接下来,我们对一些重要模块进行初始化,我们已经了解过,分词器,模型和数据集是大模型的基本组件,我们对其进行初始化。

# 定义一个函数来封装模型和分词器的初始化过程,让主流程代码更整洁
def init_model(lm_config):
    # 1. 加载我们之前保存的分词器
    tokenizer = AutoTokenizer.from_pretrained('../raw/data/Minimind/model/minimind_tokenizer')
    
    # 2. 根据传入的 `lm_config` 配置,实例化我们的 MiniMindLM 模型
    #    `.to(args.device)` 会将模型的所有参数和缓冲区移动到指定的设备上(比如 GPU)
    model = MiniMindLM(lm_config).to(args.device)
    
    # 3. 计算并打印模型的总参数量
    #    - `model.parameters()`: 获取模型的所有参数
    #    - `if p.requires_grad`: 只计算需要进行梯度更新的参数(可训练参数)
    #    - `p.numel()`: 获取单个参数张量中元素的数量
    #    - `sum(...)`: 将所有参数的数量加起来
    #    - `/ 1e6`: 将单位从个转换为百万 (Million)
    print(f'LLM总参数量:{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
    
    # 4. 返回初始化好的模型和分词器
    return model, tokenizer
# --- 1. 创建模型配置 ---
# 使用 `args` 中定义的模型结构参数来创建一个 LMConfig 对象
lm_config = LMConfig(dim=args.dim, n_layers=args.n_layers, max_seq_len=args.max_seq_len, use_moe=args.use_moe)

# --- 2. 初始化模型和分词器 ---
# 调用我们刚才定义的 `init_model` 函数
model, tokenizer = init_model(lm_config)

# --- 3. 初始化数据集 ---
# 使用数据路径、分词器和最大长度来创建一个 PretrainDataset 对象
train_ds = PretrainDataset(args.data_path, tokenizer, max_length=lm_config.max_seq_len)

# --- 4. 初始化数据加载器 (DataLoader) ---
# DataLoader 会从 Dataset 中自动提取数据并打包成 batch
train_loader = DataLoader(
    train_ds,                      # 要加载的数据集
    batch_size=args.batch_size,    # 每个 batch 的大小
    pin_memory=True,               # 如果为 True,会将数据加载到 CUDA 的固定内存中,可以加速数据从 CPU 到 GPU 的传输
    drop_last=False,               # 如果最后一个 batch 不足 `batch_size`,是否丢弃。对于小数据集,设为 False 很重要
    shuffle=False,                 # 是否在每个 epoch 开始时打乱数据。为了演示的可复现性,这里设为 False
    num_workers=args.num_workers,  # 使用多少个子进程来加载数据
)

# --- 5. 打印确认信息 ---
print(f'模型位于设备:{model.device}, 词表长度:{tokenizer.vocab_size}, DataLoader:{train_loader}')
LLM总参数量:8.915 百万
模型位于设备:cuda:0, 词表长度:6400, DataLoader:<torch.utils.data.dataloader.DataLoader object at 0x000001CFAE49E680>
# 这段代码用来检查 DataLoader 是否按预期工作

# 1. `iter(train_loader)`: 将 DataLoader 转换成一个迭代器
#    你可以把它想象成一个可以逐个取出 batch 的“发牌器”
loader = iter(train_loader)

# 2. `next(loader)`: 从迭代器中取出下一个元素,也就是第一个 batch 的数据
#    打印出的内容是一个列表,包含三个张量:X_batch, Y_batch, loss_mask_batch
#    每个张量的第一个维度的大小都是 batch_size (这里是 2)
print(f'打印一个 iter 的数据:\n{next(loader)}\n')

# 3. 打印数据集和 DataLoader 的大小
#    - `len(train_ds)`: 数据集总共有多少个样本
#    - `len(train_loader)`: DataLoader 能产生多少个 batch。计算方式是 `ceil(len(dataset) / batch_size)`
print(f'数据集大小:{len(train_ds)}, DataLoader 大小:{len(train_loader)}')
打印一个 iter 的数据:
[tensor([[   1,   46,   46,  ...,    0,    0,    0],
        [   1, 5349, 1619,  ...,    0,    0,    0]]), tensor([[  46,   46,   47,  ...,    0,    0,    0],
        [5349, 1619, 2875,  ...,    0,    0,    0]]), tensor([[1, 1, 1,  ..., 0, 0, 0],
        [1, 1, 1,  ..., 0, 0, 0]])]

数据集大小:2, DataLoader 大小:1

我们发现,train loader 的每一个 iter 都包含一个长度为 3 的张量列表,这是因为 train_dataset 每一次取数据都会返回三个张量,分别为:

  • 样本 X: 包含 <bos> 在内的输入 content

  • 标签 Y: 包含 <eos> 在内的输出 content

  • 掩码 loss_mask: 指示需要计算损失的 token 位置

由于我们的数据集只有两条数据,而 batch size 设置为 2,因此我们的 dataloader 只有一个 iter.

启动训练#

训练一个深度学习模型,还涉及到了优化器,损失函数和学习率调度。接下来,我们查看 MiniMind 训练部分的代码,并进行一轮简单的训练。

# --- 1. 定义学习率调度函数 ---
# 这是一个手动实现的余弦退火学习率调度器
# 它的作用是让学习率在训练过程中像余弦曲线一样平滑地下降
# 这通常比固定学习率能取得更好的效果
def get_lr(current_step, total_steps, lr):
    # 这是一个变种的余弦退火,保证学习率最终不会降到0,而是 lr/10
    # 这里的 `current_step` 可以是总的训练步数
    return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))

# --- 2. 设置混合精度训练的梯度缩放器 (Scaler) ---
# 在使用半精度(如 bfloat16)训练时,一些非常小的梯度可能会因为精度限制而变成0(数值下溢)
# GradScaler 的作用是:
#  - 在反向传播前,将损失值乘以一个很大的缩放因子 (scale factor)
#  - 这样,所有的梯度都会被同等放大,避免了下溢问题
#  - 在更新权重前,再将梯度除以缩放因子,恢复到原来的大小
# `enabled=...`: 仅当数据类型为 'float16' 或 'bfloat16' 时才启用 GradScaler
scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))

# --- 3. 初始化优化器 ---
# AdamW 是 Adam 优化器的一个改进版本,通常在大模型训练中表现更好
# `model.parameters()`: 告诉优化器它需要更新哪些参数
# `lr=...`: 设置初始学习率
optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)

# --- 4. 设置自动混合精度上下文 (Autocast) ---
# `torch.cuda.amp.autocast()` 是一个上下文管理器
# 在 `with autocast():` 代码块中的 CUDA 运算,会自动地以半精度(如 bfloat16)执行,以提高速度和减少显存
# 如果设备是 CPU,我们使用 `nullcontext()`,它什么也不做,训练将以全精度(float32)进行
device_type = "cuda" if "cuda" in args.device else "cpu"
ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast()

接下来,我们来看看 MiniMind 的训练函数

# 定义一个函数来执行一个完整的训练 epoch
def train_epoch(epoch):
    # --- 1. 初始化 ---
    # 定义损失函数:交叉熵损失,用于分类任务(预测下一个词本质上是多分类问题)
    # `reduction='none'`: 表示不立即对损失求平均或求和,而是返回每个位置的单独损失值
    #                      这样我们才能应用自定义的 loss_mask
    loss_fct = nn.CrossEntropyLoss(reduction='none')
    start_time = time.time() # 记录 epoch 开始时间

    # --- 2. 遍历 DataLoader ---
    for step, (X, Y, loss_mask) in enumerate(train_loader):
        # a. 将数据移动到指定设备
        X = X.to(args.device)
        Y = Y.to(args.device)
        loss_mask = loss_mask.to(args.device)

        # b. 更新学习率
        current_total_step = epoch * iter_per_epoch + step
        total_steps = args.epochs * iter_per_epoch
        lr = get_lr(current_total_step, total_steps, args.learning_rate)
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr

        # --- 3. 前向传播和计算损失 (在混合精度上下文中) ---
        with ctx:
            # a. 前向传播:将输入 X 送入模型,得到输出 res
            res = model(X)
            # b. 计算原始损失
            #    `res.logits.view(-1, ...)`: 将 (batch, seq_len, vocab_size) 展平为 (batch*seq_len, vocab_size)
            #    `Y.view(-1)`: 将 (batch, seq_len) 展平为 (batch*seq_len)
            #    `.view(Y.size())`: 将计算出的损失重新变回 (batch, seq_len) 的形状
            loss = loss_fct(res.logits.view(-1, res.logits.size(-1)), Y.view(-1)).view(Y.size())
            # c. 应用损失掩码
            #    `loss * loss_mask`: 只保留 loss_mask 中为 1 的位置的损失
            #    `.sum() / loss_mask.sum()`: 对有效损失求平均
            loss = (loss * loss_mask).sum() / loss_mask.sum()
            # d. 加上 MoE 的辅助损失 (如果不是 MoE 模型,res.aux_loss 为 0)
            loss += res.aux_loss
            # e. 如果使用梯度累积,对损失进行缩放
            loss = loss / args.accumulation_steps

        # --- 4. 反向传播 ---
        # `scaler.scale(loss)`: 放大损失值
        # `.backward()`: 计算梯度
        scaler.scale(loss).backward()

        # --- 5. 更新模型权重 ---
        # 只有在累积了足够步数后才执行更新
        if (step + 1) % args.accumulation_steps == 0:
            # a. `scaler.unscale_(optimizer)`: 在裁剪梯度前,将梯度恢复到原始大小
            scaler.unscale_(optimizer)
            # b. `torch.nn.utils.clip_grad_norm_`: 进行梯度裁剪,防止梯度爆炸
            torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)
            # c. `scaler.step(optimizer)`: 优化器更新权重,scaler 会自动跳过无效的更新(如有 NaN/Inf 梯度)
            scaler.step(optimizer)
            # d. `scaler.update()`: 更新 scaler 的缩放因子
            scaler.update()
            # e. `optimizer.zero_grad()`: 清空梯度,为下一次累积做准备
            optimizer.zero_grad(set_to_none=True)

        # --- 6. 打印日志 ---
        if step % args.log_interval == 0:
            spend_time = time.time() - start_time
            print(
                'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} ...'.format(
                    epoch + 1, args.epochs,
                    step, iter_per_epoch,
                    loss.item() * args.accumulation_steps, # 打印真实的 batch loss
                    optimizer.param_groups[-1]['lr'],
                    # ... (计算剩余时间)
                )
            )

准备完毕,我们尝试一轮长度 1 个 iter 的训练.

# 计算每个 epoch 有多少个 iteration (step)
# 在我们的例子中,len(train_loader) 是 1
iter_per_epoch = len(train_loader)

# 开始主训练循环
# `range(args.epochs)` 会生成一个从 0 到 epochs-1 的序列
# 因为我们设置了 `args.epochs = 1`,所以这个循环只会执行一次
for epoch in range(args.epochs):
    # 调用我们上面定义的 `train_epoch` 函数,传入当前的 epoch 编号
    train_epoch(epoch)
Epoch:[1/1](0/1) loss:8.974 lr:0.000550000000 epoch_Time:0.0min:
# 训练结束后,删除模型对象以释放占用的 GPU 显存
# 这是一个好习惯,尤其是在 Jupyter Notebook 环境中
del model