5-DPO#
直接偏好优化(Direct Preference Optimization,DPO)是后训练阶段中,使用正反样例激励大模型产生符合人类偏好的回答的策略,为人类反馈强化学习(Reinforcement Learning from Human Feedback, RLHF)提供了一个高效简化的替代方案。通过这一阶段的训练,大模型将会学会依照人类的喜好生成回复.
在这个笔记本中,我们仅对 DPO 的训练流程进行展示和学习,因此只给出必要的代码片段,如 wandb 和 ddp 不会在此笔记本中涉及.
此笔记本的完整实现见主仓库 ../raw/data/Minimind/train_dpo.py
# -------------------- 导入标准和第三方库 --------------------
# 导入 os 库,用于与操作系统交互
import os
# 导入 platform 库,用于获取系统信息
import platform
# 导入 argparse 库,用于解析命令行参数
import argparse
# 导入 time 库,用于计时
import time
# 导入 math 库,用于数学运算
import math
# 导入 warnings 库,用于控制警告信息
import warnings
# 导入 pandas 库,用于数据分析
import pandas as pd
# 导入 PyTorch 核心库
import torch
# 导入 PyTorch 的 functional 模块,包含常用函数
import torch.nn.functional as F
# 导入 PyTorch 的分布式训练库
import torch.distributed as dist
# 导入一个上下文管理器
from contextlib import nullcontext
# 导入 PyTorch 的优化器和神经网络模块
from torch import optim, nn
# 导入 PyTorch 的分布式数据并行工具
from torch.nn.parallel import DistributedDataParallel
# 导入 PyTorch 的数据加载器和分布式采样器
from torch.utils.data import DataLoader, DistributedSampler
# -------------------- 导入 Hugging Face 和自定义模块 --------------------
# 从 transformers 库导入 AutoTokenizer 和 AutoModelForCausalLM
from transformers import AutoTokenizer, AutoModelForCausalLM
# 从我们自己的 model 文件夹中导入之前编写好的模块
from model.model import MiniMindLM # 我们的语言模型
from model.LMConfig import LMConfig # 模型配置类
from model.dataset import DPODataset # 我们为 DPO 准备的数据集类
# 这行代码设置警告过滤器,让程序忽略所有的警告信息,使输出更整洁
warnings.filterwarnings('ignore')
可选参数设置#
首先,查看训练的可选参数,这些参数在实际使用时通过命令行导入,为了保持笔记本的易用性,选择用 class 进行包装.
# 使用一个 class 来模拟命令行参数,集中管理所有超参数
class args:
# 训练过程超参数
epochs: int = 1
batch_size: int = 2
# DPO 阶段的学习率通常设置得非常小(如注释中所说,建议 <= 1e-8)
# 因为我们是在微调模型的“偏好”,而不是教它新知识,改动幅度要小
# 但为了演示方便,这里仍使用较大的学习率
learning_rate: float = 5e-4
# 硬件和精度设置
device: str = 'cuda' if torch.cuda.is_available() else 'cpu'
dtype: str = 'bfloat16'
# 日志和数据加载
wandb_project: str = 'MiniMind-Notebook'
num_workers: int = 1
# 高级训练技巧
accumulation_steps: int = 1
grad_clip: float = 1.0
warmup_iters: int = 0
log_interval: int = 1
# 分布式训练相关
local_rank: int = 1
# 模型结构超参数 (关键变化!)
dim: int = 512
# DPO 训练需要同时在内存中保留两个模型(训练模型和参考模型)
# 为了减少演示时的显存占用,这里将模型层数 `n_layers` 减小到 1
n_layers: int = 1
max_seq_len: int = 512
use_moe: bool = False
# 数据路径 (关键变化!)
# 指向 DPO 的数据文件
data_path: str = '../raw/data/Minimind/toydata/dpo_data.jsonl'
# 打印出 `args` 中设置的 `device`,确认程序将在哪个设备上运行
print(f'查看工作设备 {args.device}')
查看工作设备 cuda
初始化训练#
接下来,我们对一些重要模块进行初始化,我们已经了解过,分词器,模型和数据集是大模型的基本组件,我们对其进行初始化.
在这一阶段 我们调整的是大模型的问答偏好 因此与 sft 阶段同理 我们需要载入在 sft 阶段微调好的问答模型
# 定义一个函数来封装 DPO 阶段的模型初始化过程
def init_model(lm_config):
# 1. 加载分词器
tokenizer = AutoTokenizer.from_pretrained('../raw/data/Minimind/model/minimind_tokenizer')
# --- 2. 初始化 actor model (被训练的模型) ---
# `model` 是我们将要训练的模型,在 DPO 语境中也称为 "policy model" 或 "actor model"
model = MiniMindLM(lm_config)
# 在真实流程中,这里会加载 SFT 阶段训练好的权重
# ckp = ...
# state_dict = torch.load(ckp, ...)
# model.load_state_dict(state_dict, ...)
# --- 3. 初始化 reference model (参考模型) ---
# DPO 算法需要一个“参考模型”来计算偏好差异
# 这个参考模型的权重和训练开始时的 actor model 完全一样
ref_model = MiniMindLM(lm_config)
# 在真实流程中,它也会加载 SFT 的权重
# ref_model.load_state_dict(state_dict, ...)
# 对参考模型进行特殊设置:
# a. `ref_model.eval()`: 将其设置为评估模式,关闭 Dropout 等
ref_model.eval()
# b. `ref_model.requires_grad_(False)`: 冻结其所有参数,告诉 PyTorch 不需要为它计算梯度
# 这样做可以节省大量计算和显存,因为参考模型在训练中是固定不变的
ref_model.requires_grad_(False)
# 4. 计算并打印可训练模型的参数量
print(f'LLM总参数量:{sum(p.numel() for p in model.parameters() if p.requires_grad) / 1e6:.3f} 百万')
# 5. 将两个模型都移动到指定设备
model = model.to(args.device)
ref_model = ref_model.to(args.device)
# 6. 返回 actor model, reference model 和分词器
return model, ref_model, tokenizer
# --- 1. 创建模型配置 ---
# 注意 `n_layers` 现在是 1
lm_config = LMConfig(dim=args.dim, n_layers=args.n_layers, max_seq_len=args.max_seq_len, use_moe=args.use_moe)
# --- 2. 初始化模型 ---
# 调用 `init_model` 函数,同时获得 actor model, reference model 和分词器
model, ref_model, tokenizer = init_model(lm_config)
# --- 3. 初始化数据集 (关键变化!) ---
# 使用 `DPODataset` 类来加载 DPO 数据
train_ds = DPODataset(args.data_path, tokenizer, max_length=lm_config.max_seq_len)
# --- 4. 初始化数据加载器 (DataLoader) ---
# 这部分设置与之前相同
train_loader = DataLoader(
train_ds,
batch_size=args.batch_size,
pin_memory=True,
drop_last=False,
shuffle=False,
num_workers=args.num_workers,
)
# --- 5. 打印确认信息 ---
print(f'模型位于设备:{model.device}, 词表长度:{tokenizer.vocab_size}, DataLoader:{train_loader}')
LLM总参数量:6.096 百万
模型位于设备:cuda:0, 词表长度:6400, DataLoader:<torch.utils.data.dataloader.DataLoader object at 0x000001CA0A4B33A0>
# 这段代码用来检查 DPO 的 DataLoader
# 1. 将 DataLoader 转换成一个迭代器
loader = iter(train_loader)
# 2. 取出第一个 batch 的数据并打印
# 返回的是一个字典,包含了 chosen 和 rejected 两组数据,每组都有 x, y, mask
print(f'打印一个 iter 的数据:\n{next(loader)}\n')
# 3. 打印数据集和 DataLoader 的大小
# DPO 玩具数据集有2个样本,batch_size=2,所以 DataLoader 大小是1
print(f'数据集大小:{len(train_ds)}, DataLoader 大小:{len(train_loader)}')
打印一个 iter 的数据:
{'x_chosen': tensor([[ 1, 85, 736, ..., 0, 0, 0],
[ 1, 85, 736, ..., 0, 0, 0]]), 'y_chosen': tensor([[ 85, 736, 201, ..., 0, 0, 0],
[ 85, 736, 201, ..., 0, 0, 0]]), 'mask_chosen': tensor([[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0]]), 'x_rejected': tensor([[ 1, 85, 736, ..., 0, 0, 0],
[ 1, 85, 736, ..., 0, 0, 0]]), 'y_rejected': tensor([[ 85, 736, 201, ..., 0, 0, 0],
[ 85, 736, 201, ..., 0, 0, 0]]), 'mask_rejected': tensor([[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0]])}
数据集大小:2, DataLoader 大小:1
我们发现,train loader 的每一个 iter 都包含一个拥有六个键值对的字典,这是因为 train_dataset 每一次取数据都会返回:
chosen 样本 X: 包含 <bos> 在内的输入 content
chosen 标签 Y: 包含 <eos> 在内的输出 content
chosen 掩码 loss_mask: 指示需要计算损失的 token 位置
rejected 样本 X: 包含 <bos> 在内的输入 content
rejected 标签 Y: 包含 <eos> 在内的输出 content
rejected 掩码 loss_mask: 指示需要计算损失的 token 位置
由于我们的数据集只有两条数据,而 batch size 设置为 2,因此我们的 dataloader 只有一个 iter.
启动训练#
训练一个深度学习模型,还涉及到了优化器,损失函数和学习率调度. 接下来,我们查看 MiniMind 训练部分的代码,并进行一轮简单的训练.
DPO 阶段涉及 DPO 损失函数涉及 因此与前两个阶段相比内容略有增加 不过整体流程与逻辑类似
# 这部分代码与前两个阶段完全相同,重新设置了训练所需的辅助工具
# 1. 定义学习率调度函数 (余弦退火)
def get_lr(current_step, total_steps, lr):
return lr / 10 + 0.5 * lr * (1 + math.cos(math.pi * current_step / total_steps))
# 2. 设置混合精度训练的梯度缩放器 (Scaler)
scaler = torch.cuda.amp.GradScaler(enabled=(args.dtype in ['float16', 'bfloat16']))
# 3. 初始化优化器 (AdamW)
# 注意,这里只传入了 `model.parameters()`,优化器只会更新 actor model 的参数
# `ref_model` 的参数因为 `requires_grad=False`,所以不会被优化器管理
optimizer = optim.AdamW(model.parameters(), lr=args.learning_rate)
# 4. 设置自动混合精度上下文 (Autocast)
device_type = "cuda" if "cuda" in args.device else "cpu"
ctx = nullcontext() if device_type == "cpu" else torch.cuda.amp.autocast()
DPO 的原理是增加偏好样本的对数概率与减小非偏好样本响应的对数概率.
该阶段引入 DPO 损失函数,通过计算选择样本和拒绝样本的对数比率,然后基于这些比率计算 DPO 损失,适用于偏好学习任务.
# 这是一个辅助函数,用于从模型的输出 logits 中提取特定序列的对数概率
def logits_to_probs(logits, labels):
# `logits` 是模型对每个词的预测分数,形状 (batch, seq_len, vocab_size)
# `labels` 是真实的下一个词的 token ID,形状 (batch, seq_len)
# 1. `F.log_softmax(logits, dim=2)`: 将 logits 转换为对数概率
log_probs = F.log_softmax(logits, dim=2)
# 2. `torch.gather(...)`: 从 vocab_size 维度中,只挑选出 `labels` 对应的那个对数概率
# `labels.unsqueeze(2)`: 将 labels 形状变为 (batch, seq_len, 1) 以匹配 gather 的要求
# `.squeeze(-1)`: 去掉多余的维度,得到形状 (batch, seq_len)
probs = torch.gather(log_probs, dim=2, index=labels.unsqueeze(2)).squeeze(-1)
return probs
# DPO 损失函数的核心实现
def dpo_loss(ref_probs, probs, beta):
# `ref_probs`: 参考模型生成的对数概率
# `probs`: actor 模型生成的对数概率
# `beta`: 一个超参数,控制对偏好差异的惩罚强度
# DPO 损失是在整个序列上计算的,所以我们先求每个 token 概率的平均值
# 注意:这里有一个简化的实现。在原始 DPO 论文中,应该是对序列的对数概率求和,而不是平均。
# 但对于这个实现,我们按代码逻辑注释。
ref_probs = ref_probs.sum(dim=1) # 应该求和
probs = probs.sum(dim=1) # 应该求和
# 因为我们将 chosen 和 rejected 数据拼接在了一起,所以现在需要把它们分开
# batch 的前半部分是 chosen, 后半部分是 rejected
batch_size = ref_probs.shape[0]
chosen_ref_logps, reject_ref_logps = ref_probs.chunk(2)
chosen_logps, reject_logps = probs.chunk(2)
# DPO 损失公式的核心部分:
# Loss = -log_sigmoid(beta * ( (pi_chosen - pi_rejected) - (ref_chosen - ref_rejected) ))
# 1. 计算 actor model 的偏好差异
pi_logratios = chosen_logps - reject_logps
# 2. 计算 reference model 的偏好差异
ref_logratios = chosen_ref_logps - reject_ref_logps
# 3. 计算两个差异之间的差异
logits = pi_logratios - ref_logratios
# 4. 应用 log_sigmoid 函数并取反,得到最终损失
loss = -F.logsigmoid(beta * logits)
# 对 batch 中所有样本的损失求平均
return loss.mean()
接下来,我们来看看 MiniMind 的训练函数
# 定义 DPO 的训练 epoch 函数
def train_epoch(epoch):
start_time = time.time()
# 遍历 DPO DataLoader
for step, batch in enumerate(train_loader):
# --- 1. 准备数据 ---
# a. 从字典中提取所有数据并移动到 GPU
x_chosen = batch['x_chosen'].to(args.device)
x_rejected = batch['x_rejected'].to(args.device)
y_chosen = batch['y_chosen'].to(args.device)
y_rejected = batch['y_rejected'].to(args.device)
mask_chosen = batch['mask_chosen'].to(args.device)
mask_rejected = batch['mask_rejected'].to(args.device)
# b. 将 chosen 和 rejected 数据在 batch 维度上拼接起来
# 这样我们可以一次性地将它们都送入模型,提高计算效率
x = torch.cat([x_chosen, x_rejected], dim=0)
y = torch.cat([y_chosen, y_rejected], dim=0)
mask = torch.cat([mask_chosen, mask_rejected], dim=0)
# --- 2. 更新学习率 ---
current_total_step = epoch * iter_per_epoch + step
total_steps = args.epochs * iter_per_epoch
lr = get_lr(current_total_step, total_steps, args.learning_rate)
for param_group in optimizer.param_groups:
param_group['lr'] = lr
# --- 3. 前向传播和计算 DPO 损失 ---
with ctx:
# a. 计算参考模型的对数概率
# `with torch.no_grad()`: 在这个代码块中不计算梯度,因为 ref_model 是冻结的
with torch.no_grad():
ref_outputs = ref_model(x)
ref_logits = ref_outputs.logits
ref_probs = logits_to_probs(ref_logits, y)
ref_probs = ref_probs * mask # 应用掩码,只保留 assistant 回答部分的概率
# b. 计算 actor 模型的对数概率
outputs = model(x)
logits = outputs.logits
probs = logits_to_probs(logits, y)
probs = probs * mask # 应用掩码
# c. 调用 dpo_loss 函数计算最终损失
loss = dpo_loss(ref_probs, probs, beta=0.1)
loss = loss / args.accumulation_steps # 梯度累积缩放
# --- 4. 反向传播和权重更新 (与之前相同) ---
scaler.scale(loss).backward()
if (step + 1) % args.accumulation_steps == 0:
scaler.unscale_(optimizer)
torch.nn.utils.clip_grad_norm_(model.parameters(), args.grad_clip)
scaler.step(optimizer)
scaler.update()
optimizer.zero_grad(set_to_none=True)
# --- 5. 打印日志 (与之前相同) ---
if step % args.log_interval == 0:
spend_time = time.time() - start_time
print(
'Epoch:[{}/{}]({}/{}) loss:{:.3f} lr:{:.12f} ...'.format(
epoch + 1, args.epochs,
step, iter_per_epoch,
loss.item(), # DPO 损失不需要乘以累积步数
optimizer.param_groups[-1]['lr'],
# ...
)
)
准备完毕,我们尝试一轮长度 1 个 iter 的训练.
# 计算每个 epoch 的迭代次数
iter_per_epoch = len(train_loader) # 结果是 1
# 开始主训练循环
# 由于 `args.epochs` 设置为 1,这个循环只会执行一次
for epoch in range(args.epochs):
# 调用 `train_epoch` 函数,开始 DPO 训练
train_epoch(epoch)
Epoch:[1/1](0/1) loss:0.693 lr:0.000550000000 epoch_Time:0.0min:
# DPO 训练演示结束后,删除模型对象以释放 GPU 显存
# 注意:这里只删除了 actor model,实际上 ref_model 也应该被删除
del model
# del ref_model # 完整的清理