8-Reason#
参数太小的模型直接通过冷启动SFT+GRPO几乎不可能获得任何推理效果,因此,使用冷启动 SFT + GRPO 训练方法对小模型推理能力的作用有限.因此,MiniMind 项目作者使用推理数据集对 MiniMind 系列模型进行黑盒蒸馏来训练推理模型.
使用的推理数据格式:
{
"conversations": [
{"role": "user", "content": "Q1?"},
{"role": "assistant", "content": "<think>T1</think>\n<answer>A1</answer>"},
{"role": "user", "content": "Q2?"},
{"role": "assistant", "content": "<think>T2</think>\n<answer>A2</answer>"}
]
}
此笔记本的完整实现见主仓库 ../raw/data/Minimind/train_distill_reason.py
# -------------------- 导入标准和第三方库 --------------------
# 导入 os 库,用于与操作系统交互,如处理文件路径
import os
# 导入 platform 库,用于获取系统信息,如操作系统类型
import platform
# 导入 argparse 库,用于解析从命令行传入的参数
import argparse
# 导入 time 库,用于计时,例如计算训练耗时
import time
# 导入 math 库,用于数学运算,例如在学习率调度中计算余弦值
import math
# 导入 warnings 库,用于控制 Python 警告信息的显示
import warnings
# 导入 pandas 库,一个用于数据处理和分析的强大工具
import pandas as pd
# 导入 PyTorch 核心库
import torch
# 导入 PyTorch 的 functional 模块,包含大量无状态的函数,如激活函数和损失函数
import torch.nn.functional as F
# 导入 PyTorch 的分布式训练库,用于多 GPU 训练
import torch.distributed as dist
# 导入一个上下文管理器,可以根据条件返回一个空操作的上下文,非常适合编写兼容 CPU 和 GPU 的代码
from contextlib import nullcontext
# 从 PyTorch 中导入 optim (包含各种优化器) 和 nn (构建神经网络的基础模块)
from torch import optim, nn
# 导入 PyTorch 的分布式数据并行工具 (DDP)
from torch.nn.parallel import DistributedDataParallel
# 导入 PyTorch 的数据加载器 (DataLoader) 和分布式采样器 (DistributedSampler)
from torch.utils.data import DataLoader, DistributedSampler
# -------------------- 导入 Hugging Face 和自定义模块 --------------------
# 从 transformers 库导入 AutoTokenizer 和 AutoModelForCausalLM
from transformers import AutoTokenizer, AutoModelForCausalLM
# 从我们自己的 model 文件夹中导入之前编写好的模块
from model.model import MiniMindLM # 导入我们的语言模型实现
from model.LMConfig import LMConfig # 导入模型配置类
# 推理蒸馏的数据格式与 SFT 类似,所以我们复用 SFTDataset
from model.dataset import SFTDataset
# 设置警告过滤器,让程序忽略所有的警告信息,这有助于保持输出的整洁
warnings.filterwarnings('ignore')
可选参数设置#
首先,查看训练的可选参数,这些参数在实际使用时通过解析命令行进行导入,我们用 class 进行包装.
class args:
# out_dir: str = "out" # pytorch 格式权重文件保存位置 我们只展示训练过程 所以不使用
epochs: int = 1 # 训练轮数
batch_size: int = 2 # pretrain 数据集仅两个样本,设置 batch 为 2
learning_rate: float = 5e-4 # 学习率
device: str = 'cuda' if torch.cuda.is_available() else 'cpu'
dtype: str = 'bfloat16' # 16 bit 浮点数:8 bit 指数 + 7 bit 尾数
# use_wandb: bool = False # 是否使用 wandb 我们不使用
wandb_project: str = 'MiniMind-Notebook'
num_workers: int = 1 # 工作进程数
# ddp:bool = False # 单机多卡
accumulation_steps: int = 1 # 梯度累积步数
grad_clip: float = 1.0 # 梯度剪裁
warmup_iters: int = 0 # 学习率热启动
log_interval: int = 1 # 每一步打印日志 仅用于观察
# save_interval: int = 100 # checkpoint 保存点 我们不使用
local_rank: int = 1 # device 设备号
dim: int = 512 # 词嵌入维度 模型超参数
n_layers: int = 2 # MiniMind Block 数量 模型超参数
max_seq_len: int = 512 # 序列长度阈值
use_moe: bool = False # 是否启用混合专家
data_path: str = '../raw/data/Minimind/toydata/r1_data.jsonl' # 数据集路径
# 打印出 `args` 类中定义的 `device` 属性
# 这是一个简单的检查,用于确认我们的程序将要在哪个硬件上运行(CPU 或 CUDA GPU)
print(f'查看工作设备 {args.device}')
查看工作设备 cuda
接下来,我们对分词器、MiniMind 学生模型以及数据迭代器执行初始化.
# 定义一个函数来封装模型的初始化过程
def init_model(lm_config):
# 1. 加载分词器
tokenizer = AutoTokenizer.from_pretrained('../raw/data/Minimind/model/minimind_tokenizer')
# 2. 根据传入的配置 `lm_config` 创建模型实例
model = MiniMindLM(lm_config)
# 在真实流程中,推理蒸馏也是在已有的模型基础上进行的,所以需要加载权重
moe_path = '_moe' if lm_config.use_moe else ''
# ckp = f'./out/rlhf_{lm_config.dim}{moe_path}.pth' # 比如加载一个 DPO 训练好的模型
# state_dict = torch.load(ckp, map_location=args.device)
# model.load_state_dict(state_dict, strict=False)
# 3. 计算并打印模型的可训练参数量
print(f'LLM总参数量:{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
# 4. 将模型移动到指定设备(如 GPU)
model = model.to(args.device)
# 5. 返回初始化好的模型和分词器
return model, tokenizer
# --- 1. 创建模型配置 ---
# 使用 `args` 中的参数创建 LMConfig 对象
lm_config = LMConfig(dim=args.dim, n_layers=args.n_layers, max_seq_len=args.max_seq_len, use_moe=args.use_moe)
# --- 2. 初始化模型和分词器 ---
# 调用 `init_model` 函数,获取模型和分词器实例
model, tokenizer = init_model(lm_config)
# --- 3. 初始化数据集 ---
# 推理蒸馏的数据格式(包含 "conversations" 键)与 SFT 数据集兼容,所以复用 `SFTDataset`
train_ds = SFTDataset(args.data_path, tokenizer, max_length=lm_config.max_seq_len)
# --- 4. 初始化数据加载器 (DataLoader) ---
train_loader = DataLoader(
train_ds,
batch_size=args.batch_size,
pin_memory=True,
drop_last=False,
shuffle=False,
)
# --- 5. 打印确认信息 ---
print(f'模型位于设备:{model.device}, 词表长度:{tokenizer.vocab_size}, DataLoader:{train_loader}')
LLM总参数量:8.915 百万
模型位于设备:cuda:0, 词表长度:6400, DataLoader:<torch.utils.data.dataloader.DataLoader object at 0x0000023110AC3BE0>
启动训练#
接下来,我们定义 MiniMind LoRA 微调所使用的优化器,损失函数和学习率调度,并进行一轮简单的训练.
# 1. 定义学习率调度函数 (余弦退火),用于在训练过程中动态调整学习率
def get_lr(current_step, total_steps, lr):
return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))
# 2. 初始化梯度缩放器 (GradScaler),用于混合精度训练,防止梯度下溢
scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))
# 3. 初始化优化器 (AdamW)
# 优化器接收模型的所有可训练参数 `model.parameters()`
optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)
# 4. 设置自动混合精度上下文 (Autocast)
# 根据设备类型决定是否启用混合精度,以加速训练
device_type = "cuda" if "cuda" in args.device else "cpu"
ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast()
接下来,我们来看看训练函数.
蒸馏思考数据集的训练过程与 SFT 类似,区别在于模型生成序列中,思考标签位置的预测错误惩罚被放大.
# 定义推理蒸馏的训练 epoch 函数
def train_epoch(epoch):
# --- 1. 准备特殊 token ID ---
# 将 Chain-of-Thought (CoT) 的特殊标签(如 <think>, </think>)转换为 token ID
start_of_think_ids = tokenizer('<think>').input_ids
end_of_think_ids = tokenizer('</think>').input_ids
start_of_answer_ids = tokenizer('<answer>').input_ids
end_of_answer_ids = tokenizer('</answer>').input_ids
# 定义损失函数:交叉熵损失,`reduction='none'` 表示返回每个位置的单独损失值
loss_fct = nn.CrossEntropyLoss(reduction='none')
start_time = time.time()
for step, (X, Y, loss_mask) in enumerate(train_loader):
# ... (数据移动到设备和学习率更新部分与之前完全相同)
X, Y, loss_mask = X.to(args.device), Y.to(args.device), loss_mask.to(args.device)
lr = get_lr(epoch * iter_per_epoch + step, args.epochs * iter_per_epoch, args.learning_rate)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
# --- 2. 前向传播和计算损失 ---
with ctx: # 在混合精度上下文中
res = model(X) # 模型前向传播
# a. 计算原始的、每个 token 位置的交叉熵损失
loss = loss_fct(res.logits.view(-1, res.logits.size(-1)), Y.view(-1)).view(Y.size())
# --- 关键步骤:加权损失 ---
# b. 找到所有特殊 CoT 标签在真实标签 Y 中的位置
# `torch.isin` 会检查 `Y` 中的每个元素是否存在于给定的列表中,返回一个布尔张量
sp_ids_mask = torch.isin(Y.view(-1), torch.tensor(
start_of_think_ids + end_of_think_ids + start_of_answer_ids + end_of_answer_ids
).to(args.device))
# c. 修改损失掩码,给特殊标签位置更高的权重
loss_mask_flat = loss_mask.view(-1)
# 保存原始掩码的和,用于后续的平均化,这可以防止总损失的尺度因权重调整而变化过大
loss_mask_sum = loss_mask.sum()
# 在 `sp_ids_mask` 为 True 的位置(即特殊标签的位置),将损失权重设为 10
loss_mask_flat[sp_ids_mask] = 10
# 将修改后的掩码恢复到原来的形状
loss_mask = loss_mask_flat.view(Y.size())
# d. 计算加权后的总损失
# `(loss * loss_mask).sum()`: 将每个位置的损失乘以其对应的权重,然后求和
# `/ loss_mask_sum`: 用原始的掩码和进行平均,这样就实现了对特殊标签位置损失的放大
loss = (loss * loss_mask).sum() / loss_mask_sum
# e. 加上 MoE 模型的辅助损失(如果适用)
loss += res.aux_loss
# f. 根据梯度累积步数对损失进行缩放
loss = loss / args.accumulation_steps
# --- 3. 反向传播和权重更新 (与之前完全相同) ---
scaler.scale(loss).backward()
if (step + 1) % args.accumulation_steps == 0:
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad(set_to_none=True)
# --- 4. 打印日志 (与之前完全相同) ---
if step % args.log_interval == 0:
spend_time = time.time() - start_time
print(
'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} epoch_Time:{}min:'.format(
epoch + 1,
args.epochs,
step,
iter_per_epoch,
loss.item(),
optimizer.param_groups[-1]['lr'],
spend_time / (step + 1) * iter_per_epoch // 60 - spend_time // 60))
接下来,我们启动一个 Epoch 的训练进行观察.
# 计算每个 epoch 有多少个迭代步骤 (batch)
# 在我们的例子中,数据集有 2 个样本,batch_size=2,所以结果是 1
iter_per_epoch = len(train_loader)
# 开始主训练循环
# `range(args.epochs)` 会生成一个从 0 到 epochs-1 的序列
# 因为我们设置了 `args.epochs = 1`,所以这个循环只会执行一次
for epoch in range(args.epochs):
# 调用我们上面定义的 `train_epoch` 函数,传入当前的 epoch 编号
# 开始推理蒸馏的训练
train_epoch(epoch)
Epoch:[1/1](0/1) loss:11.889 lr:0.000550000000 epoch_Time:0.0min:
# 训练演示结束后,删除模型对象 (`model`)
# 这会释放模型参数、梯度等占用的 GPU 显存,是一个良好的编程习惯
del model