Transformer代码实现#

import math
import pandas as pd
import torch
from torch import nn
import inspect
from d2l import torch as d2l

基于位置的前馈网络#

基于位置的前馈网络对序列中的所有位置的表示进行变换时使用的是同一个多层感知机(MLP),这就是称前馈网络是基于位置的(positionwise)的原因。在下面的实现中,输入X的形状(批量大小,时间步数或序列长度,隐单元数或特征维度)将被一个两层的感知机转换成形状为(批量大小,时间步数,ffn_num_outputs)的输出张量。

class PositionWiseFFN(nn.Module): # 基于位置的前馈网络
    """
    ffn_num_input: 输入特征的维度,即每个 token 的特征数。
    ffn_num_hiddens: 隐藏层的神经元数量,控制中间层的复杂度。
    ffn_num_outputs: 输出特征的维度。
    """
    def __init__(self, ffn_num_hiddens, ffn_num_outputs):
        super().__init__()
        # # 这是第一个全连接层(线性变换),它将输入从 ffn_num_input 维度映射到 ffn_num_hiddens 维度。这一步的作用是进行维度变换和线性组合,帮助模型捕捉更多复杂的特征
        self.dense1 = nn.LazyLinear(ffn_num_hiddens)
        # ReLU 的作用是将输入中小于 0 的部分置为 0,保留大于 0 的部分不变,从而为网络引入非线性特征,使得模型可以更好地拟合复杂的非线性关系
        self.relu = nn.ReLU()
        # 这是第二个全连接层,将隐藏层的输出转换回 ffn_num_outputs 维度
        self.dense2 = nn.LazyLinear( ffn_num_outputs)

    def forward(self, X):
        return self.dense2(self.relu(self.dense1(X)))

下面的例子显示,改变张量的最里层维度的尺寸,会改变成基于位置的前馈网络的输出尺寸。因为用同一个多层感知机对所有位置上的输入进行变换,所以当所有这些位置的输入相同时,它们的输出也是相同的。

ffn = PositionWiseFFN(4, 8)  # 设置好维度
ffn.eval() # 用来将模型切换到评估模式。在评估模式下,像 Dropout 和 BatchNorm 等训练时特有的操作会被关闭,从而确保模型行为一致
ffn(torch.ones((2, 3, 4))) # 2 个样本,每个样本有 3 个 token(或位置),每个 token 有 4 个特征
tensor([[[-0.4082, -0.9014,  0.3697, -0.3377,  0.0818,  0.4002, -0.5420,
          -0.3813],
         [-0.4082, -0.9014,  0.3697, -0.3377,  0.0818,  0.4002, -0.5420,
          -0.3813],
         [-0.4082, -0.9014,  0.3697, -0.3377,  0.0818,  0.4002, -0.5420,
          -0.3813]],

        [[-0.4082, -0.9014,  0.3697, -0.3377,  0.0818,  0.4002, -0.5420,
          -0.3813],
         [-0.4082, -0.9014,  0.3697, -0.3377,  0.0818,  0.4002, -0.5420,
          -0.3813],
         [-0.4082, -0.9014,  0.3697, -0.3377,  0.0818,  0.4002, -0.5420,
          -0.3813]]], grad_fn=<ViewBackward0>)

残差连接和层规范化#

以下代码对比不同维度的层规范化和批量规范化的效果。

ln = nn.LayerNorm(2)  # 层归一化(Layer Normalization)
bn = nn.LazyBatchNorm1d(2) # 批归一化(Batch Normalization)
X = torch.tensor([[1, 2], [2, 3]], dtype=torch.float32)
# 在训练模式下计算X的均值和方差
print('layer norm:', ln(X), '\nbatch norm:', bn(X))
layer norm: tensor([[-1.0000,  1.0000],
        [-1.0000,  1.0000]], grad_fn=<NativeLayerNormBackward0>) 
batch norm: tensor([[-0.3333, -0.3333],
        [ 0.3333,  0.3333]], grad_fn=<NativeBatchNormBackward0>)

LayerNorm 计算#

LayerNorm 是对每个样本的特征进行归一化,因此它对每一行独立计算均值和标准差。

公式:

对于每个样本\(X_{i}\):

\[\text { normalized }=\frac{X_{i}-\mu_{i}}{\sigma_{i}}\]
  • \(\mu_{i}\)是样本\(i\)的均值。

  • \(\sigma_{i}\)是样本\(i\)的标准差。

具体到\(x\)中的每一行:

  • 第一行X[0] = [1, 2]的均值为\(\mu=1.5\),标准差为\(\sigma=0.5\)

\[\text { normalized } \_X[0]=\left[\frac{1-1.5}{0.5}, \frac{2-1.5}{0.5}\right]=[-1.0,1.0]\]
  • 第二行X[1] = [2, 3]的均值为\(\mu=2.5\),标准差为\(\sigma=0.5\)

\[\text { normalized } \_\mathrm{X}[1]=\left[\frac{2-2.5}{0.5}, \frac{3-2.5}{0.5}\right]=[-1.0,1.0]\]

BatchNorm1d 计算#

公式:

对于每个特征(列)\(X_{j}\):

\[\text { normalized }=\frac{X_{j}-\mu_{j}}{\sigma_{j}}\]
  • \(\mu_{j}\)是样本\(j\)的均值。

  • \(\sigma_{j}\)是样本\(j\)的标准差。

具体到\(x\)中的每一行:

  • 第一行X[:,0] = [1, 2]的均值为\(\mu=1.5\),标准差为\(\sigma=0.5\)

\[\text { normalized } \_X[:,0]=\left[\frac{1-1.5}{0.5}, \frac{2-1.5}{0.5}\right]=[-1.0,1.0]\]
  • 第二行X[:,1] = [2, 3]的均值为\(\mu=2.5\),标准差为\(\sigma=0.5\)

\[\text { normalized } \_\mathrm{X}[:,1]=\left[\frac{2-2.5}{0.5}, \frac{3-2.5}{0.5}\right]=[-1.0,1.0]\]

现在可使用残差连接和层规范化来实现AddNorm类。暂退法也被作为正则化方法使用。

class AddNorm(nn.Module): # 残差连接后进行层规范化
    """
    normalized_shape(层规范化的输入形状)
    dropout(丢弃率)
    """
    def __init__(self, normalized_shape, dropout):
        super().__init__()
        self.dropout = nn.Dropout(dropout)  # 用于在训练过程中随机丢弃一定比例的神经元,Dropout 在模型评估时会自动关闭
        self.ln = nn.LayerNorm(normalized_shape)

    def forward(self, X, Y):
        # 在 Dropout 之后,将 Y 与输入张量 X 进行逐元素相加。这个过程就是残差连接(Residual Connection)
        _=self.dropout(Y) + X
        # print(_)
        return self.ln(_) 

残差连接要求两个输入的形状相同,以便加法操作后输出张量的形状相同。

add_norm = AddNorm( 4, 0.5)
add_norm.eval()
output =add_norm(torch.ones((2, 3, 4)), torch.ones((2, 3, 4)))
print(output.shape)
print(output) # 因此,如果输入是常数张量(如全为 2 的张量),则经过 LayerNorm 标准化后,结果可能是全 0,因为标准化后的均值为 0
# 使用随机张量代替全 1 的张量
output = add_norm(torch.randn(2, 3, 4), torch.randn(2, 3, 4))
print(output)
torch.Size([2, 3, 4])
tensor([[[0., 0., 0., 0.],
         [0., 0., 0., 0.],
         [0., 0., 0., 0.]],

        [[0., 0., 0., 0.],
         [0., 0., 0., 0.],
         [0., 0., 0., 0.]]], grad_fn=<NativeLayerNormBackward0>)
tensor([[[-0.4455,  1.7027, -0.3918, -0.8654],
         [ 1.5680,  0.1495, -0.7098, -1.0076],
         [-0.2261, -1.2255, -0.1090,  1.5605]],

        [[-0.5173, -0.7151, -0.4931,  1.7256],
         [ 1.3754, -0.2801,  0.2980, -1.3932],
         [ 0.0335,  0.0546, -1.4568,  1.3688]]],
       grad_fn=<NativeLayerNormBackward0>)

编码器#

有了组成Transformer编码器的基础组件,现在可以先实现编码器中的一个层。下面的EncoderBlock类包含两个子层:多头自注意力和基于位置的前馈网络,这两个子层都使用了残差连接和紧随的层规范化。

def masked_softmax(X, valid_lens):
    """在最后一个轴上执行 softmax 操作,并对指定元素进行遮掩(mask)。

    参数:
    - X: 3D 张量 (batch_size, sequence_length, embedding_dimension)。
    - valid_lens: 1D 或 2D 张量,表示每个序列的有效长度,用于遮掩无效部分。
    """

    # 内部函数,生成遮掩掩码并应用到输入张量 X 上
    def _sequence_mask(X, valid_len, value=0):
        maxlen = X.size(1)  # 获取序列的最大长度(第二个维度大小)
        
        # 创建掩码,mask: shape 是 (batch_size, maxlen)
        # torch.arange(maxlen): 生成 [0, 1, ..., maxlen-1] 序列
        # 比较生成的序列与有效长度 valid_len,生成 True/False 掩码
        mask = torch.arange(maxlen, dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None]
        
        # 将 X 中无效位置的值设置为指定值 (value, 默认为 0)
        X[~mask] = value
        return X

    # 如果没有提供 valid_lens,直接在最后一个维度上执行 softmax
    if valid_lens is None:
        return nn.functional.softmax(X, dim=-1)

    else:
        shape = X.shape  # 获取输入张量的形状 (batch_size, seq_len, embedding_dim)

        # 如果 valid_lens 是 1D 张量(代表每个 batch 的有效长度相同)
        if valid_lens.dim() == 1:
            # 将 valid_lens 扩展为与 X 的形状匹配的张量
            valid_lens = torch.repeat_interleave(valid_lens, shape[1])

        # 如果 valid_lens 是 2D 张量(可能每个序列和 query 对应的有效长度不同)
        else:
            # 将 valid_lens 展平为 1D 张量
            valid_lens = valid_lens.reshape(-1)

        # 对张量 X 的最后一个轴(即每个时间步的嵌入)进行遮掩处理
        # 将无效元素的值设置为 -1e6(在 softmax 中接近 -∞,其指数值会变为 0)
        X = _sequence_mask(X.reshape(-1, shape[-1]), valid_lens, value=-1e6)

        # 将遮掩后的 X 重新调整为原始形状,并在最后一个轴上执行 softmax
        return nn.functional.softmax(X.reshape(shape), dim=-1)

class DotProductAttention(nn.Module):
    """缩放点积注意力机制。

    参数:
    - dropout: 在注意力权重上应用的 dropout 概率。
    """
    def __init__(self, dropout):
        super().__init__()
        self.dropout = nn.Dropout(dropout)  # 初始化 dropout 层

    # 前向传播函数
    # 参数:
    # - queries: (batch_size, no. of queries, d),查询向量
    # - keys: (batch_size, no. of key-value pairs, d),键向量
    # - values: (batch_size, no. of key-value pairs, value dimension),值向量
    # - valid_lens: (batch_size,) 或 (batch_size, no. of queries),表示每个序列的有效长度
    def forward(self, queries, keys, values, valid_lens=None):
        d = queries.shape[-1]  # 获取查询向量的最后一个维度(嵌入维度)
        
        # 计算点积得分:queries @ keys.T / sqrt(d),缩放以防止梯度爆炸
        scores = torch.bmm(queries, keys.transpose(1, 2)) / math.sqrt(d)
        
        # 通过 masked_softmax 进行 softmax 并遮掩无效位置
        self.attention_weights = masked_softmax(scores, valid_lens)
        
        # 将注意力权重应用到值向量上,并返回加权结果
        return torch.bmm(self.dropout(self.attention_weights), values)

多头注意力#

实现如下

class MultiHeadAttention(d2l.Module):
    """多头注意力机制。
    
    参数:
    - num_hiddens: 隐藏层的大小,即输入/输出的嵌入维度。
    - num_heads: 注意力头的数量。
    - dropout: 在注意力权重上的 dropout 比率。
    - bias: 是否在线性层中使用偏置。
    """
    def __init__(self, num_hiddens, num_heads, dropout, bias=False, **kwargs):
        super().__init__()
        self.num_heads = num_heads  # 多头的数量
        self.attention = DotProductAttention(dropout)  # 点积注意力机制
        
        # 定义四个线性变换,分别用于 queries、keys、values 和最终的输出
        self.W_q = nn.LazyLinear(num_hiddens, bias=bias)  # 查询的线性变换
        self.W_k = nn.LazyLinear(num_hiddens, bias=bias)  # 键的线性变换
        self.W_v = nn.LazyLinear(num_hiddens, bias=bias)  # 值的线性变换
        self.W_o = nn.LazyLinear(num_hiddens, bias=bias)  # 输出的线性变换

    def forward(self, queries, keys, values, valid_lens):
        """
        参数:
        - queries: 查询向量,形状为 (batch_size, num_queries, num_hiddens)
        - keys: 键向量,形状为 (batch_size, num_kv_pairs, num_hiddens)
        - values: 值向量,形状为 (batch_size, num_kv_pairs, num_hiddens)
        - valid_lens: 有效长度,形状为 (batch_size,) 或 (batch_size, num_queries)
        
        返回:
        - 输出张量,形状为 (batch_size, num_queries, num_hiddens)
        """

        # 将 queries, keys, values 通过各自的线性层,并转换形状以适应多头注意力
        # 形状变为 (batch_size * num_heads, num_queries or num_kv_pairs, num_hiddens / num_heads)
        queries = self.transpose_qkv(self.W_q(queries))
        keys = self.transpose_qkv(self.W_k(keys))
        values = self.transpose_qkv(self.W_v(values))

        if valid_lens is not None:
            # 将 valid_lens 扩展为与每个注意力头相对应的形状
            # 重复 valid_lens num_heads 次,以适应多个头的并行计算
            valid_lens = torch.repeat_interleave(
                valid_lens, repeats=self.num_heads, dim=0)

        # 通过点积注意力计算结果,形状为 (batch_size * num_heads, num_queries, num_hiddens / num_heads)
        output = self.attention(queries, keys, values, valid_lens)
        
        # 将结果从多个头合并,形状变为 (batch_size, num_queries, num_hiddens)
        output_concat = self.transpose_output(output)
        
        # 返回经过线性变换后的输出结果
        return self.W_o(output_concat)

    def transpose_qkv(self, X):
        """用于并行计算多个注意力头,转换 queries, keys 或 values 的形状。
        
        参数:
        - X: 输入张量,形状为 (batch_size, num_queries or num_kv_pairs, num_hiddens)
        
        返回:
        - 转置后的张量,形状为 (batch_size * num_heads, num_queries or num_kv_pairs, num_hiddens / num_heads)
        """

        # 首先将 X 的最后一个维度分成 num_heads 份,即 (batch_size, num_queries or num_kv_pairs, num_heads, num_hiddens / num_heads)
        X = X.reshape(X.shape[0], X.shape[1], self.num_heads, -1)
        
        # 交换维度,使 num_heads 在第二个维度,即 (batch_size, num_heads, num_queries or num_kv_pairs, num_hiddens / num_heads)
        X = X.permute(0, 2, 1, 3)
        
        # 将前两个维度合并,形状变为 (batch_size * num_heads, num_queries or num_kv_pairs, num_hiddens / num_heads)
        return X.reshape(-1, X.shape[2], X.shape[3])

    def transpose_output(self, X):
        """逆转 transpose_qkv 操作,将多个头的输出结果重新组合。
        
        参数:
        - X: 输入张量,形状为 (batch_size * num_heads, num_queries, num_hiddens / num_heads)
        
        返回:
        - 合并后的张量,形状为 (batch_size, num_queries, num_hiddens)
        """
        
        # 将 X 转换回 (batch_size, num_heads, num_queries, num_hiddens / num_heads)
        X = X.reshape(-1, self.num_heads, X.shape[1], X.shape[2])
        
        # 交换维度回到 (batch_size, num_queries, num_heads, num_hiddens / num_heads)
        X = X.permute(0, 2, 1, 3)
        
        # 合并最后两个维度,返回 (batch_size, num_queries, num_hiddens)
        return X.reshape(X.shape[0], X.shape[1], -1)

编码器块#

class EncoderBlock(nn.Module):
    """Transformer 编码器块。
    
    参数:
    - num_hiddens: 隐藏层的维度大小。
    - ffn_num_hiddens: 前馈神经网络(FFN)隐藏层的维度大小。
    - num_heads: 多头注意力中的头的数量。
    - dropout: Dropout 概率,用于防止过拟合。
    - use_bias: 是否在线性层中使用偏置。
    """
    def __init__(self, num_hiddens, ffn_num_hiddens, num_heads, dropout,
                 use_bias=False):
        super().__init__()
        
        # 定义多头注意力机制
        self.attention = MultiHeadAttention(num_hiddens, num_heads, dropout, use_bias)
        
        # 第一次加法归一化层,通常在多头注意力后进行残差连接和归一化
        self.addnorm1 = AddNorm(num_hiddens, dropout)
        
        # 定义位置前馈神经网络(Position-Wise FFN)
        self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens)
        
        # 第二次加法归一化层,通常在前馈网络后进行残差连接和归一化
        self.addnorm2 = AddNorm(num_hiddens, dropout)

    def forward(self, X, valid_lens):
        """
        参数:
        - X: 输入张量,形状为 (batch_size, num_tokens, num_hiddens)。
        - valid_lens: 有效长度,形状为 (batch_size,) 或 (batch_size, num_queries)。
        
        返回:
        - 输出张量,形状为 (batch_size, num_tokens, num_hiddens)。
        """

        # 进行多头注意力计算,并通过第一个 AddNorm 执行残差连接和归一化
        Y = self.addnorm1(X, self.attention(X, X, X, valid_lens))
        
        # 通过前馈神经网络计算结果,再经过第二个 AddNorm 执行残差连接和归一化
        return self.addnorm2(Y, self.ffn(Y))

正如从代码中所看到的,Transformer编码器中的任何层都不会改变其输入的形状。

X = torch.ones((2, 100, 24))
valid_lens = torch.tensor([3, 2])
encoder_blk = EncoderBlock( 24,  48, 8, 0.5)
encoder_blk.eval()
output = encoder_blk(X, valid_lens)
print(output.shape)
torch.Size([2, 100, 24])

下面实现的Transformer编码器的代码中,堆叠了num_layersEncoderBlock类的实例。由于这里使用的是值范围在\(-1\)\(1\)之间的固定位置编码,因此通过学习得到的输入的嵌入表示的值需要先乘以嵌入维度的平方根进行重新缩放,然后再与位置编码相加。

class Encoder(nn.Module):
    """编码器--解码器架构的基础编码器接口"""
    
    def __init__(self):
        super().__init__()

    # 可以添加额外的参数 (例如排除填充的长度)
    def forward(self, X, *args):
        raise NotImplementedError  # 这是一个抽象方法,子类需要实现它

class PositionalEncoding(nn.Module):
    """位置编码,帮助模型捕捉序列中每个元素的位置"""
    
    def __init__(self, num_hiddens, dropout, max_len=1000):
        super().__init__()
        self.dropout = nn.Dropout(dropout)  # 定义 Dropout 层
        # 创建足够长的 P 矩阵来存储位置编码
        self.P = torch.zeros((1, max_len, num_hiddens))  # 初始化位置编码矩阵 P
        
        # 计算位置编码值
        X = torch.arange(max_len, dtype=torch.float32).reshape(-1, 1) / torch.pow(
            10000, torch.arange(0, num_hiddens, 2, dtype=torch.float32) / num_hiddens)
        self.P[:, :, 0::2] = torch.sin(X)  # 偶数维度使用 sin 函数
        self.P[:, :, 1::2] = torch.cos(X)  # 奇数维度使用 cos 函数

    def forward(self, X):
        # 将输入 X 与位置编码 P 相加,然后应用 Dropout
        X = X + self.P[:, :X.shape[1], :].to(X.device)
        return self.dropout(X)  # 返回加上位置编码后的结果


class TransformerEncoder(Encoder):
    """Transformer 编码器,包含多层 EncoderBlock"""
    
    def __init__(self, vocab_size, num_hiddens, ffn_num_hiddens,
                 num_heads, num_blks, dropout, use_bias=False):
        super().__init__()
        self.num_hiddens = num_hiddens
        # 嵌入层:将词汇表中的索引映射到 embedding(嵌入)向量
        self.embedding = nn.Embedding(vocab_size, num_hiddens)
        # 位置编码层:加上位置编码信息
        self.pos_encoding = PositionalEncoding(num_hiddens, dropout)
        
        # 使用 nn.Sequential 堆叠多个 EncoderBlock
        self.blks = nn.Sequential()
        for i in range(num_blks):
            self.blks.add_module("block" + str(i),
                EncoderBlock(num_hiddens, ffn_num_hiddens, num_heads, dropout, use_bias))

    def forward(self, X, valid_lens):
        # 词嵌入 + 位置编码
        X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        
        # 初始化 attention_weights 用于存储每一层的注意力权重
        self.attention_weights = [None] * len(self.blks)
        
        # 依次通过每一个 EncoderBlock
        for i, blk in enumerate(self.blks):
            X = blk(X, valid_lens)
            # 保存每个块中的注意力权重
            self.attention_weights[i] = blk.attention.attention.attention_weights
        
        return X  # 输出最终的编码器结果

下面我们指定了超参数来创建一个两层的Transformer编码器。 Transformer编码器输出的形状是(批量大小,时间步数目,num_hiddens)。

encoder = TransformerEncoder(
    200, 24, 48, 8, 2, 0.5)
encoder.eval()
encoder(torch.ones((2, 100), dtype=torch.long), valid_lens).shape
torch.Size([2, 100, 24])

解码器#

Transformer解码器也是由多个相同的层组成。在DecoderBlock类中实现的每个层包含了三个子层:解码器自注意力、“编码器-解码器”注意力和基于位置的前馈网络。这些子层也都被残差连接和紧随的层规范化围绕。

正如在本节前面所述,在掩蔽多头解码器自注意力层(第一个子层)中,查询、键和值都来自上一个解码器层的输出。关于序列到序列模型(sequence-to-sequence model),在训练阶段,其输出序列的所有位置(时间步)的词元都是已知的;然而,在预测阶段,其输出序列的词元是逐个生成的。因此,在任何解码器时间步中,只有生成的词元才能用于解码器的自注意力计算中。为了在解码器中保留自回归的属性,其掩蔽自注意力设定了参数dec_valid_lens,以便任何查询都只会与解码器中所有已经生成词元的位置(即直到该查询位置为止)进行注意力计算。

class TransformerDecoderBlock(nn.Module):
    """Transformer 解码器的第 i 个块"""

    def __init__(self, num_hiddens, ffn_num_hiddens, num_heads, dropout, i):
        super().__init__()
        self.i = i  # 记录该块的索引 i
        
        # 定义第一个多头注意力机制 (自注意力)
        self.attention1 = MultiHeadAttention(num_hiddens, num_heads, dropout)
        self.addnorm1 = AddNorm(num_hiddens, dropout)  # Add & Norm 操作
        
        # 定义第二个多头注意力机制 (编码器-解码器注意力)
        self.attention2 = MultiHeadAttention(num_hiddens, num_heads, dropout)
        self.addnorm2 = AddNorm(num_hiddens, dropout)  # Add & Norm 操作
        
        # 定义前馈神经网络 (FFN)
        self.ffn = PositionWiseFFN(ffn_num_hiddens, num_hiddens)
        self.addnorm3 = AddNorm(num_hiddens, dropout)  # Add & Norm 操作

    def forward(self, X, state):
        enc_outputs, enc_valid_lens = state[0], state[1]  # 提取编码器的输出和有效长度
        # state[2][self.i] 存储的是到当前时间步为止,第 i 个块的解码输出

        # 在预测阶段,需要将先前的解码结果连接到当前输入上
        if state[2][self.i] is None:
            key_values = X  # 如果这是第一次解码,直接使用 X
        else:
            key_values = torch.cat((state[2][self.i], X), dim=1)  # 将之前的结果与当前输入拼接
        state[2][self.i] = key_values  # 更新状态
        
        # 如果是在训练阶段,生成解码器的有效长度
        if self.training:
            batch_size, num_steps, _ = X.shape
            # 生成形如 [1, 2, ..., num_steps] 的有效长度矩阵
            dec_valid_lens = torch.arange(1, num_steps + 1, device=X.device).repeat(batch_size, 1)
        else:
            dec_valid_lens = None  # 在预测阶段,无需提供解码器的有效长度
        
        # **自注意力**:对输入 X 进行自注意力计算,使用 key_values 作为注意力键和值
        X2 = self.attention1(X, key_values, key_values, dec_valid_lens)
        Y = self.addnorm1(X, X2)  # 将自注意力的输出与输入 X 相加并规范化
        
        # **编码器-解码器注意力**:对编码器的输出 (enc_outputs) 进行注意力计算
        Y2 = self.attention2(Y, enc_outputs, enc_outputs, enc_valid_lens)
        Z = self.addnorm2(Y, Y2)  # 将编码器-解码器注意力的输出与 Y 相加并规范化
        
        # **前馈神经网络**:对 Z 进行前馈神经网络处理
        return self.addnorm3(Z, self.ffn(Z)), state  # 输出规范化后的结果和更新后的状态

为了便于在“编码器-解码器”注意力中进行缩放点积计算和残差连接中进行加法计算,编码器和解码器的特征维度都是num_hiddens

decoder_blk = TransformerDecoderBlock(24, 48, 8, 0.5, 0)
X = torch.ones((2, 100, 24))
state = [encoder_blk(X, valid_lens), valid_lens, [None]]
decoder_blk(X, state)[0].shape
torch.Size([2, 100, 24])

现在我们构建了由num_layersDecoderBlock实例组成的完整的Transformer解码器。最后,通过一个全连接层计算所有vocab_size个可能的输出词元的预测值。解码器的自注意力权重和编码器解码器注意力权重都被存储下来,方便日后可视化的需要。

class Decoder(nn.Module):
    """编码器-解码器架构的基本解码器接口"""
    def __init__(self):
        super().__init__()

    def init_state(self, enc_all_outputs, *args):
        # 初始化状态
        raise NotImplementedError

    def forward(self, X, state):
        # 解码器的前向传播
        raise NotImplementedError

class AttentionDecoder(Decoder):
    """基于注意力机制的解码器接口"""
    def __init__(self):
        super().__init__()

    @property
    def attention_weights(self):
        # 注意力权重的获取
        raise NotImplementedError

class TransformerDecoder(AttentionDecoder):
    """Transformer解码器"""
    def __init__(self, vocab_size, num_hiddens, ffn_num_hiddens, num_heads,
                 num_blks, dropout):
        super().__init__()
        self.num_hiddens = num_hiddens  # 隐藏层大小
        self.num_blks = num_blks  # 解码器块的数量
        self.embedding = nn.Embedding(vocab_size, num_hiddens)  # 词嵌入层
        self.pos_encoding = PositionalEncoding(num_hiddens, dropout)  # 位置编码
        self.blks = nn.Sequential()  # 使用Sequential容器堆叠解码块
        for i in range(num_blks):
            # 添加多个解码器块,每个块都是一个TransformerDecoderBlock
            self.blks.add_module("block" + str(i), TransformerDecoderBlock(
                num_hiddens, ffn_num_hiddens, num_heads, dropout, i))
        self.dense = nn.LazyLinear(vocab_size)  # 全连接层,用于生成预测结果

    def init_state(self, enc_outputs, enc_valid_lens):
        # 初始化解码器的状态,包含编码器的输出、有效长度和每个解码块的历史状态
        return [enc_outputs, enc_valid_lens, [None] * self.num_blks]

    def forward(self, X, state):
        # 前向传播
        # 对输入X进行词嵌入,然后加上位置编码
        X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens))
        # 初始化注意力权重,用于保存自注意力和编码器-解码器注意力的权重
        self._attention_weights = [[None] * len(self.blks) for _ in range(2)]
        for i, blk in enumerate(self.blks):
            # 通过每个解码块进行前向传播
            X, state = blk(X, state)
            # 记录自注意力权重
            self._attention_weights[0][i] = blk.attention1.attention.attention_weights
            # 记录编码器-解码器注意力权重
            self._attention_weights[1][i] = blk.attention2.attention.attention_weights
        # 将最后的输出通过全连接层生成最终的预测结果
        return self.dense(X), state

    @property
    def attention_weights(self):
        # 返回保存的注意力权重
        return self._attention_weights

#

训练 依照Transformer架构来实例化编码器-解码器模型。在这里,指定Transformer的编码器和解码器都是2层,都使用4头注意力。为了进行序列到序列的学习,下面在“英语-法语”机器翻译数据集上训练Transformer模型。

# 从 d2l 库中获取数据集,数据集为英法翻译,使用批量大小 128
data = d2l.MTFraEng(batch_size=128)

# 定义模型参数
num_hiddens, num_blks, dropout = 256, 2, 0.2  # 隐藏层大小、编码器块数、dropout 概率
ffn_num_hiddens, num_heads = 64, 4  # 前馈神经网络隐藏层大小和多头注意力头数

# 创建编码器
encoder = TransformerEncoder(
    len(data.src_vocab), num_hiddens, ffn_num_hiddens, num_heads,
    num_blks, dropout
)

# 创建解码器
decoder = TransformerDecoder(
    len(data.tgt_vocab), num_hiddens, ffn_num_hiddens, num_heads,
    num_blks, dropout
)

# 创建 Seq2Seq 模型,指定解码器的填充符号和学习率
model = d2l.Seq2Seq(encoder, decoder, tgt_pad=data.tgt_vocab['<pad>'],
                    lr=0.001)

# 创建训练器,设置最大训练轮数、梯度裁剪值和 GPU 数量
trainer = d2l.Trainer(max_epochs=30, gradient_clip_val=1, num_gpus=1)

# 训练模型
trainer.fit(model, data)
../../_images/93c5870a24444130dd9d0ea4b1897db6c448843a2c46a5e70d8c34781c7debe7.svg

训练结束后,使用Transformer模型将一些英语句子翻译成法,并且计算它们的BLEU分数。

# 定义一组英语句子和对应的法语句子
engs = ['go .', 'i lost .', 'he\'s calm .', 'i\'m home .']
fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .']

# 使用模型进行预测
preds, _ = model.predict_step(
    data.build(engs, fras), d2l.try_gpu(), data.num_steps
)

# 遍历每个输入句子、参考句子和模型预测结果
for en, fr, p in zip(engs, fras, preds):
    translation = []
    # 将预测结果转换为目标语言的 tokens
    for token in data.tgt_vocab.to_tokens(p):
        if token == '<eos>':  # 遇到结束符时停止
            break
        translation.append(token)  # 添加到翻译列表中
    # 输出源句子、翻译结果及其 BLEU 分数
    print(f'{en} => {translation}, bleu,'
          f'{d2l.bleu(" ".join(translation), fr, k=2):.3f}')
go . => ['va', '!'], bleu,1.000
i lost . => ["j'ai", 'perdu', '.'], bleu,1.000
he's calm . => ['il', 'est', 'mouillé', '.'], bleu,0.658
i'm home . => ['je', 'suis', 'chez', 'moi', '.'], bleu,1.000

当进行最后一个英语到法语的句子翻译工作时,让我们可视化Transformer的注意力权重。编码器自注意力权重的形状为(编码器层数,注意力头数,num_steps或查询的数目,num_steps或“键-值”对的数目)。

# 使用模型的 predict_step 方法预测最后一对英语句子和法语句子,并获取解码器注意力权重
_, dec_attention_weights = model.predict_step(
    data.build([engs[-1]], [fras[-1]]), d2l.try_gpu(), data.num_steps, True
)

# 连接编码器的所有注意力权重
enc_attention_weights = torch.cat(model.encoder.attention_weights, 0)

# 重新调整编码器注意力权重的形状
shape = (num_blks, num_heads, -1, data.num_steps)
enc_attention_weights = enc_attention_weights.reshape(shape)

# 检查编码器注意力权重的形状是否符合预期
d2l.check_shape(enc_attention_weights,
                (num_blks, num_heads, data.num_steps, data.num_steps))

在编码器的自注意力中,查询和键都来自相同的输入序列。因为填充词元是不携带信息的,因此通过指定输入序列的有效长度可以避免查询与使用填充词元的位置计算注意力。接下来,将逐行呈现两层多头注意力的权重。每个注意力头都根据查询、键和值的不同的表示子空间来表示不同的注意力。

# 使用 d2l 库中的 show_heatmaps 函数来可视化编码器的注意力权重
d2l.show_heatmaps(
    enc_attention_weights.cpu(),  # 将注意力权重移到 CPU 以进行可视化
    xlabel='Key positions',  # X 轴标签,表示键的位置
    ylabel='Query positions',  # Y 轴标签,表示查询的位置
    titles=['Head %d' % i for i in range(1, 5)],  # 热图标题,表示不同的注意力头
    figsize=(7, 3.5)  # 设置图形的大小
)
../../_images/70fdf7295713c2d3e89da8f473a4f93eee5c4368f637131af481e1d87b38e46a.svg

为了可视化解码器的自注意力权重和“编码器-解码器”的注意力权重,我们需要完成更多的数据操作工作。例如用零填充被掩蔽住的注意力权重。值得注意的是,解码器的自注意力权重和“编码器-解码器”的注意力权重都有相同的查询:即以序列开始词元(beginning-of-sequence,BOS)打头,再与后续输出的词元共同组成序列。

# 将解码器的注意力权重转换为二维列表,填充为 0
dec_attention_weights_2d = [head[0].tolist()  # 提取第一个头的注意力权重,并转换为列表
                            for step in dec_attention_weights  # 遍历每个时间步
                            for attn in step  # 遍历每个块的注意力
                            for blk in attn  # 遍历每个块
                            for head in blk]  # 遍历每个头

# 使用 pandas DataFrame 来填充 NaN 值,填充为 0.0
dec_attention_weights_filled = torch.tensor(
    pd.DataFrame(dec_attention_weights_2d).fillna(0.0).values)  # 将填充后的数据转换为 PyTorch 张量

# 调整形状以分离解码器自注意力和交叉注意力
shape = (-1, 2, num_blks, num_heads, data.num_steps)  # 目标形状
dec_attention_weights = dec_attention_weights_filled.reshape(shape)  # 重塑为目标形状

# 通过 permute 来分离解码器的自注意力权重和交叉注意力权重
dec_self_attention_weights, dec_inter_attention_weights = \
    dec_attention_weights.permute(1, 2, 3, 0, 4)

# 检查解码器自注意力权重的形状
d2l.check_shape(dec_self_attention_weights,
                (num_blks, num_heads, data.num_steps, data.num_steps))  # 应为 (num_blks, num_heads, data.num_steps, data.num_steps)

# 检查解码器交叉注意力权重的形状
d2l.check_shape(dec_inter_attention_weights,
                (num_blks, num_heads, data.num_steps, data.num_steps))  # 应为 (num_blks, num_heads, data.num_steps, data.num_steps)

由于解码器自注意力的自回归属性,查询不会对当前位置之后的“键-值”对进行注意力计算。

# 使用 d2l.show_heatmaps 函数可视化解码器自注意力权重
d2l.show_heatmaps(
    dec_self_attention_weights[:, :, :, :],  # 提取所有块和头的自注意力权重
    xlabel='Key positions',  # X 轴标签表示 Key 位置
    ylabel='Query positions',  # Y 轴标签表示 Query 位置
    titles=['Head %d' % i for i in range(1, 5)],  # 为每个注意力头设置标题
    figsize=(7, 3.5)  # 设置图形大小
)
../../_images/120ec0719567d4212cb0f3ab71a959cdc781e78af0119de4de0420ba122f75a7.svg

与编码器的自注意力的情况类似,通过指定输入序列的有效长度,输出序列的查询不会与输入序列中填充位置的词元进行注意力计算。

# 使用 d2l.show_heatmaps 函数可视化解码器中编码器-解码器注意力权重
d2l.show_heatmaps(
    dec_inter_attention_weights,  # 解码器中编码器-解码器注意力权重
    xlabel='Key positions',  # X 轴标签表示 Key 位置
    ylabel='Query positions',  # Y 轴标签表示 Query 位置
    titles=['Head %d' % i for i in range(1, 5)],  # 为每个注意力头设置标题
    figsize=(7, 3.5)  # 设置图形大小
)
../../_images/20feff643fca332950c749c769908bbda7f548a064daf86bc43fdd8ada26d183.svg