Transformer

模型

上图是Transformer结构(包括中文和英文版),下面会解析结构并使用PyTorch进行实现。

模型结构

层规范化(LayerNorm)

层规范化(Layer Normalization)是一种深度神经网络中的规范化技术,由Jimmy Lei Ba 等人在2016年提出,旨在解决训练过程中的内部协变量偏移(Internal Covariate Shift)问题。其核心思想是对单个样本同一层内所有神经元的输出进行标准化,提升训练稳定性与收敛速度。

对于输入向量\(\mathbf{x}\in\mathbb{R}^{d}\)\(d\)为层维度),LayerNorm的计算流程如下:

  1. 计算均值与方差 \[ \begin{aligned} &\mu = \frac{1}{d}\sum_{i=1}^d x_i\\ &\sigma^2 = \frac{1}{d}\sum_{i=1}^d (x_i-\mu)^2 \end{aligned} \]

  2. 标准化 \[ \hat x_i = \frac{x_i-\mu}{\sqrt{\sigma^2+\epsilon}} \]

  3. 缩放与平移 \[ y_i = \gamma\hat x_i + \beta \]

对于一个时序样本\(\mathbf{X}\in\mathbb{R}^{m\times d}\)\(m\)是总共的时间步,\(d\)是一个时间步上的向量维度,则层规范化是在最后一个维度\(d\)上进行的,也就是分别对每个时间步上的向量进行规范化。

LayerNorm与BatchNorm的对比

特性 LayerNorm BatchNorm
规范化维度 样本内同一层所有神经元(特征维度) 批量内同一神经元的所有样本(批量维度)
依赖关系 不依赖批量大小,适合小批量或单样本训练 依赖足够大的批量,小批量时统计不稳定
适用场景 RNN、Transformer、生成模型等序列模型 CNN、全连接网络等固定结构网络
训练/推理差异 无需区分训练与推理模式 推理时需使用训练阶段的移动平均统计量
参数数量 2d(每个特征维度独立缩放和平移) 2C(CNN中每通道独立参数)

LayerNorm无需区分训练与推理模式,因为其规范化是对同一个样本的同一层神经元的输出进行的,无需考虑批量的分布(均值与方差)。

对于一些时间序列模型,比如机器翻译模型,输入的样本很可能具有不同的时间步,那么这样就难以使用BatchNorm进行规范化,因为BatchNorm是在小批量上对不同样本的同一个特征位置进行标准化,而机器翻译输入的样本长度不相同,无法进行标准化。对于这类问题,应该使用LayerNorm,因为LayerNorm是在同一个样本上进行的。

Pytorch中LayerNorm的定义与使用:

1
2
3
4
5
6
7
8
9
10
11
import torch
import torch.nn as nn

batch_size = 2
seq_len = 3
d_model = 4 # 特征维度
norm = nn.LayerNorm(d_model)
x = torch.randn(batch_size, seq_len, d_model)
y = norm(x)
print(x)
print(y)

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
tensor([[[ 1.7553,  1.2146, -0.3583,  0.6030],
[ 1.5075, 0.1550, -0.2058, 1.1268],
[ 0.0724, 0.6387, -0.7122, -1.0038]],

[[-1.1730, -0.1724, -0.8566, 0.2516],
[ 0.1484, 1.1121, -1.2942, 1.3017],
[-0.9100, -0.1195, -0.6697, -1.8060]]])
tensor([[[ 1.2123, 0.5235, -1.4802, -0.2556],
[ 1.2373, -0.7049, -1.2229, 0.6905],
[ 0.5000, 1.3750, -0.7123, -1.1627]],

[[-1.2252, 0.5635, -0.6597, 1.3214],
[-0.1640, 0.7735, -1.5675, 0.9580],
[-0.0553, 1.2437, 0.3396, -1.5280]]],
grad_fn=<NativeLayerNormBackward0>)

基于位置的前馈网络

顾名思义,基于位置的前馈神经网络(Positionwise Feed-Forward Networks)就是在同一个位置(时间步)上使用同一个多层感知机(MLP)。

Positionalwise FFN对序列中每个位置(token)独立应用相同的线性和非线性(激活函数)变换,保持位置信息的独立性。

对于一个时序样本\(\mathbf{X}\in\mathbb{R}^{m\times d}\)\(m\)是总共的时间步,\(d\)是一个时间步上的向量维度,则Positionalwise FFN是在最后一个维度\(d\)上进行的,也就是分别对每个时间步上的向量通过一个全连接层。

Positionalwise FFN一共使用了两个线性层,通过两次线性变换实现特征空间的扩展与收缩(比如512$\(2048\)$512),增强特征融合能力:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import torch
import torch.nn as nn

class PositionalwiseFFN(nn.Module):
def __init__(self, d_model, hidden_dim, dropout=0.1):
super().__init__()
self.linear1 = nn.Linear(d_model, hidden_dim) # 扩展维度
self.linear2 = nn.Linear(hidden_dim, d_model) # 压缩维度,将高维特征压缩回原始维度,保持输入输出一致性
self.relu = nn.ReLU()
self.dropout = nn.Dropout(dropout)

def forward(self, x):
x = self.linear1(x)
x = self.relu(x)
x = self.dropout(x)
x = self.linear2(x)
return x

多头自注意力机制

之前已有内容,此处仅给出代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import torch
import torch.nn as nn
import math

class MultiheadSelfattention(nn.Module):
def __init__(self, embed_dim, num_heads, dropout=0.1):
super().__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads

self.q_proj = nn.Linear(embed_dim, embed_dim)
self.k_proj = nn.Linear(embed_dim, embed_dim)
self.v_proj = nn.Linear(embed_dim, embed_dim)

self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)

self.scale = 1.0 / math.sqrt(self.head_dim)

def forward(self, x):
batch_size, seq_len, _ = x.shape

Q = self.q_proj(x)
K = self.k_proj(x)
V = self.v_proj(x)

Q = Q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
K = K.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
V = V.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

attn_score = torch.matmul(Q, K.transpose(-2, -1)) * self.scale
attn_weights = torch.softmax(attn_score, dim=-1)
attn_weights = self.dropout(attn_weights)

context = torch.matmul(attn_weights, V)
context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, -1)

output = self.out_proj(context)
return output, attn_weights

注意:对于多个样本,不同样本的时间步可能会不一样,而自注意力机制对多个样本所有时间步都是一同并行计算的,因此无法像RNN类网络一样使用nn.pack_padded_sequence(),自注意机制的处理方式是将所有样本的时间步都填充到当前批量最长时间步,然后在前向传播中传递Mask数据:

mask的维度形状是[batch_size, seq_len],每个元素为0(表示该时间步是padding,需要被忽略)或者1(表示该时间步是真实token,需要被保留)

加入mask的多头自注意力机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class MultiheadSelfattention(nn.Module):
def __init__(self, embed_dim, num_heads, dropout=0.1):
super().__init__()
self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads

self.q_proj = nn.Linear(embed_dim, embed_dim)
self.v_proj = nn.Linear(embed_dim, embed_dim)
self.k_proj = nn.Linear(embed_dim, embed_dim)

self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)

self.scale = 1.0 / math.sqrt(self.head_dim)

def forward(self, x, mask=None):
batch_size, seq_len, _ = x.shape

Q = self.q_proj(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
K = self.k_proj(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
V = self.v_proj(x).view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

attn_score = torch.matmul(Q, K.transpose(-2, -1)) * self.scale # [batch_size, num_heads, seq_len, seq_len]

# 处理Padding Mask
if mask is not None:
# 将mask扩展维度以匹配注意力矩阵 [batch_size, 1, 1, seq_len]
mask = mask.view(batch_size, 1, 1, seq_len)
# 使用负无穷填充被mask的位置
attn_score = attn_score.masked_fill(mask == 0, float('-inf'))

attn_weights = torch.softmax(attn_score, dim=-1)
attn_weights = self.dropout(attn_weights)

context = torch.matmul(attn_weights, V)
context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, -1)

output = self.out_proj(context)
return output, attn_weights

对mask代码部分的解释:

1
2
3
4
5
if mask is not None:
mask = mask.view(batch_size, 1, 1, seq_len) # 扩充维度,以便后续计算时使用广播机制
attn_score = attn_score.masked_fill(mask == 0, float('-inf'))
# 在mask为0的位置将attn_score的值设为负无穷:float('-inf')
# 此处会使用广播机制,对每个注意力头得到的注意力评分矩阵,其将每行都进行mask处理

只考虑一个样本中的一个注意力头,此时mask为一个向量(原本mask扩充维度后为[batch_size, 1, 1, seq_len],广播机制会自动将其广播到[batch_size, num_heads, seq_len, seq_len],因此,对于每个num_heads中的每行,都是同样的mask最后一个维度的向量,维度为seq_len)如果该样本前\(s\)个时间步之后为扩充的时间维度,均进行padding,未mask前: \[ \mathrm{attn\_score} = \frac{\mathbf{Q}\mathbf{K}^\top}{\sqrt{d}} = \left[ \begin{matrix} a(\mathbf{q}_1, \mathbf{k}_1)&a(\mathbf{q}_1, \mathbf{k}_2)&\cdots&a(\mathbf{q}_1, \mathbf{k}_m)\\ a(\mathbf{q}_2, \mathbf{k}_1)&a(\mathbf{q}_2, \mathbf{k}_2)&\cdots&a(\mathbf{q}_2, \mathbf{k}_m)\\ \vdots&\vdots& &\vdots\\ a(\mathbf{q}_m, \mathbf{k}_1)&a(\mathbf{q}_m, \mathbf{k}_2)&\cdots&a(\mathbf{q}_m, \mathbf{k}_m) \end{matrix} \right]\in\mathbb{R}^{m\times m} \] mask为: \[ \mathrm{mask} = [\begin{matrix} 1&\cdots&1&0&\cdots0 \end{matrix}]\in\mathbb{R}^{m} \] 其中前面\(s\)个元素为\(1\),后面\(m-s\)个元素为\(0\)

那么每行都会进行同样的mask处理,经过mask处理后的attn_score为: \[ \mathrm{attn\_score} = \left[ \begin{matrix} a(\mathbf{q}_1, \mathbf{k}_1)&\cdots&a(\mathbf{q}_1, \mathbf{k}_s)&-\infty&\cdots&-\infty\\ a(\mathbf{q}_2, \mathbf{k}_1)&\cdots&a(\mathbf{q}_2, \mathbf{k}_s)&-\infty&\cdots&-\infty\\ \vdots& & \vdots&\vdots& &\vdots\\ a(\mathbf{q}_s, \mathbf{k}_1)&\cdots&a(\mathbf{q}_s, \mathbf{k}_s)&-\infty&\cdots&-\infty\\ \vdots& & \vdots&\vdots& &\vdots\\ a(\mathbf{q}_m, \mathbf{k}_1)&\cdots&a(\mathbf{q}_m, \mathbf{k}_s)&-\infty&\cdots&-\infty \end{matrix} \right] \]\[ \begin{aligned} \mathrm{context} &= \mathrm{softmax}\left(\frac{\mathbf{Q}\mathbf{K}^\top}{\sqrt{d}}\right)\mathbf{V}\\ &= \mathrm{softmax}\left(\left[ \begin{matrix} a(\mathbf{q}_1, \mathbf{k}_1)&\cdots&a(\mathbf{q}_1, \mathbf{k}_s)&-\infty&\cdots&-\infty\\ a(\mathbf{q}_2, \mathbf{k}_1)&\cdots&a(\mathbf{q}_2, \mathbf{k}_s)&-\infty&\cdots&-\infty\\ \vdots& & \vdots&\vdots& &\vdots\\ a(\mathbf{q}_s, \mathbf{k}_1)&\cdots&a(\mathbf{q}_s, \mathbf{k}_s)&-\infty&\cdots&-\infty\\ \vdots& & \vdots&\vdots& &\vdots\\ a(\mathbf{q}_m, \mathbf{k}_1)&\cdots&a(\mathbf{q}_m, \mathbf{k}_s)&-\infty&\cdots&-\infty \end{matrix} \right]\right)\cdot\left[ \begin{matrix} \mathbf{v}_1\\ \mathbf{v}_2\\ \vdots\\ \mathbf{v}_m \end{matrix} \right]\\ \\ &= \left[ \begin{matrix} \alpha(\mathbf{q}_1, \mathbf{k}_1)&\cdots&\alpha(\mathbf{q}_1, \mathbf{k}_s)&0&\cdots&0\\ \alpha(\mathbf{q}_2, \mathbf{k}_1)&\cdots&\alpha(\mathbf{q}_2, \mathbf{k}_s)&0&\cdots&0\\ \vdots& & \vdots&\vdots& &\vdots\\ \alpha(\mathbf{q}_s, \mathbf{k}_1)&\cdots&\alpha(\mathbf{q}_s, \mathbf{k}_s)&0&\cdots&0\\ \vdots& & \vdots&\vdots& &\vdots\\ \alpha(\mathbf{q}_m, \mathbf{k}_1)&\cdots&\alpha(\mathbf{q}_m, \mathbf{k}_s)&0&\cdots&0 \end{matrix} \right]\cdot\left[ \begin{matrix} \mathbf{v}_1\\ \mathbf{v}_2\\ \vdots\\ \mathbf{v}_m \end{matrix} \right]\\ \\ &= \left[ \begin{matrix} \alpha(\mathbf{q}_1, \mathbf{k}_1)\mathbf{v}_1&\cdots&\alpha(\mathbf{q}_1, \mathbf{k}_s)\mathbf{v}_s&0&\cdots&0\\ \alpha(\mathbf{q}_2, \mathbf{k}_1)\mathbf{v}_1&\cdots&\alpha(\mathbf{q}_2, \mathbf{k}_s)\mathbf{v}_s&0&\cdots&0\\ \vdots& &\vdots&\vdots& &\vdots\\ \alpha(\mathbf{q}_s, \mathbf{k}_1)\mathbf{v}_1&\cdots&\alpha(\mathbf{q}_s, \mathbf{k}_s)\mathbf{v}_s&0&\cdots&0\\ \vdots& &\vdots&\vdots& &\vdots\\ \alpha(\mathbf{q}_m, \mathbf{k}_1)\mathbf{v}_1&\cdots&\alpha(\mathbf{q}_m, \mathbf{k}_s)\mathbf{v}_s&0&\cdots&0 \end{matrix} \right] = \left[ \begin{matrix} f(\mathbf{q}_1)\\ f(\mathbf{q}_2)\\ \vdots\\ f(\mathbf{q}_s)\\ \vdots\\ f(\mathbf{q}_m) \end{matrix} \right] \end{aligned} \] 由此可以看出,mask方法只保证了前面真正的时间步不会使用padding的数据,但是padding本身会使用前面真正时间步的数据(即\(f(\mathbf{q}_{s+1})\)\(f(\mathbf{q}_m)\)数据并不是0,而是“注意了”前面\(s\)个真实有效的时间步)

这么设计的可能原因:

  1. padding部分的数据并不重要,因此其如何参考前面的时间步得到的信息也不重要。
  2. 模型可能会学习如何忽略padding数据。
  3. 前面真正的数据本身则不能考虑padding的结果,防止真实的信息收到影响。
  4. 经过多头自注意力机制后,输出与输入的维度相同,后续在通过更多的多头自注意力机制的时候,真实的数据也会利用mask方法忽略掉填充部分的信息,因此前面填充部分计算得到的结果在后面的多头自注意力机制中也不会用到。
  5. 减少逻辑运算,利用广播机制加快模型运行速度。

在上面的代码中,我们加入了mask方法,但是\(\mathbf{K}\)\(\mathbf{Q}\)\(\mathbf{V}\)的计算仍然是分开的三个步骤,我们可以继续优化,使得\(\mathbf{K}\)\(\mathbf{Q}\)\(\mathbf{V}\)的计算可以并行,加快模型运行速度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class MultiheadSelfAttention(nn.Module):
def __init__(self, embed_dim, num_heads, dropout=0.1):
super().__init__()
assert embed_dim % num_heads == 0, "Embedding dimension must be divisible by number of heads"

self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads

self.qkv_proj = nn.Linear(embed_dim, 3*embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)

def forward(self, x, mask=None):
batch_size, seq_len, _ = x.shape

# 生成Q, K, V
qkv = self.qkv_proj(x) # [batch_size, seq_len, 3*embed_size]
Q, K, V = qkv.chunk(3, dim=-1)

# 分割成多个头
Q = Q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
K = K.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
V = V.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

# 计算注意力分数
attn_score = torch.matmul(Q, K.transpose(-2, -1)) / (self.head_dim ** 0.5)

if mask is not None:
mask = mask.unsqueeze(1).unsqueeze(2) # [batch_size, 1, 1, seq_len]
attn_score = attn_score.masked_fill(mask == 0, float('-inf'))

attn_weights = torch.softmax(attn_score, dim=-1) # [batch_size, num_heads, seq_len, seq_len]
attn_weights = self.dropout(attn_weights)

context = torch.matmul(attn_weights, V) # [batch_size, num_heads, seq_len, head_dim]
context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_dim)

output = self.out_proj(context)
return output

位置编码

之前已有内容,此处仅给出代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class PositionalEncoding(nn.Module):
def __init__(self, embed_dim, max_len=5000):
super().__init__()
# 创建一个位置编码矩阵,形状为(max_len, embed_dim)
pe = torch.zeros(max_len, embed_dim)

# 生成位置列向量
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # [max_len, 1]
# 计算分母,用于后续的正弦和余弦函数计算
div_term = torch.exp(torch.arange(0, embed_dim, 2).float() * (-math.log(10000.0) / embed_dim))
# 计算偶数位置的正弦值
pe[:, 0::2] = torch.sin(position * div_term)
# 计算奇数位置的余弦值
pe[:, 1::2] = torch.cos(position * div_term)
# 增加一个维度,使其形状变为[1, max_len, embed_dim],这样可以广播到批量数据上
pe = pe.unsqueeze(0)
# 将位置编码矩阵注册为缓冲区,这样它就不会被视为模型的参数,但会随模型一起保存和加载
self.register_buffer('pe', pe)

def forward(self, x):
# 将位置编码添加到输入的嵌入向量上
x = x + self.pe[:, :x.size(1)] # x的seq_len不一定等于max_len,因此此处按照x的seq_len来截取位置编码矩阵
return x

词嵌入

在Transformer的原论文中,假设词嵌入的初始方差为\(\frac{1}{d}\),因此需要乘以\(\sqrt{d}\)使最终方差为1: \[ \mathrm{Var}(X_{\mathrm{emb}}\cdot\sqrt{d}) = \frac{1}{d}\cdot d = 1 \] 这样可以保证前向传播和反向传播数据的稳定性,防止出现梯度爆炸或梯度消失。

在PyTorch中,nn.Embedding()对词向量初始化为标准正态分布\(\mathcal{N}(0, 1)\),此时词嵌入的方差为 \[ \mathrm{Var}(X_{\mathrm{emb}}) = 1 \] 无需再乘以\(\sqrt{d}\)

残差连接

Transformer结构中多处使用残差连接,可以增强模型拟合能力,防止梯度消失。

比如,在数据输入嵌入层并加上位置编码矩阵后,数据通过多头注意力机制得到新的数据,然后原本的数据与新的数据相加(残差连接),再进行层规范化,经过层规范化的数据既通过Positionalwise FFN得到新的数据,又与该新的数据相加(残差连接),再进行层规范化。

Masked 多头自注意力机制

在解码器中,Target经过嵌入层和加上位置编码后,通过的是Masked多头自注意力机制(Masked Multi-head Self-Attention),与传统的RNN类模型不同的是,Transformer的Target无需按照时间步逐个输入模型,而是可以并行输入模型。比如对于机器翻译模型,训练过程中,解码器可以一次性接受整个批量的翻译数据,包括第一个时间步到最后一个时间步的内容,但是,如果将整个数据直接通过多头自注意力机制,那么前面时间步的数据也会看到后面时间步的数据,而在推理过程中,则是先预测前面时间步的翻译文本,再根据前面翻译的内容去预测后面翻译的文本,也就是说,前面的时间步无法参考后面时间步的输出,那么训练与推理就出现了偏差。因此,为了保证模型训练后能实现正常的推理,在训练过程中,Target中的一个样本前面时间步的内容不应该注意到后面时间步的内容,因此要加入掩码(Mask)机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class MaskedMultiheadSelfAttention(nn.Module):
def __init__(self, embed_dim, num_heads, device='cpu', dropout=0.1):
super().__init__()
assert embed_dim % num_heads == 0, "Embedding dimension must be divisible by number of heads"

self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads
self.device = device

self.qkv_proj = nn.Linear(embed_dim, 3*embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)

def forward(self, x, mask=None):
batch_size, seq_len, _ = x.shape
qkv = self.qkv_proj(x)
Q, K, V = qkv.chunk(3, dim=-1)

Q = Q.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
K = K.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)
V = V.view(batch_size, seq_len, self.num_heads, self.head_dim).transpose(1, 2)

attn_score = torch.matmul(Q, K.transpose(-1, -2)) / (self.head_dim ** 0.5)

# 创建下三角掩码
causal_mask = torch.tril(torch.ones(seq_len, seq_len, dtype=torch.long, device=self.device)).unsqueeze(0).unsqueeze(1) # [1, 1, seq_len, seq_len]

if mask is not None:
mask = mask.unsqueeze(1).unsqueeze(2) # [batch_size, 1, 1, seq_len]
causal_mask = causal_mask & mask # 自动广播,结果维度为 [batch_size, 1, seq_len, seq_len]

attn_score = attn_score.masked_fill(causal_mask == 0, float('-inf'))

attn_weights = torch.softmax(attn_score, dim=-1)
attn_weights = self.dropout(attn_weights)

context = torch.matmul(attn_weights, V) # [batch_size, num_heads, seq_len, head_dim]
context = context.transpose(1, 2).contiguous().view(batch_size, seq_len, self.embed_dim)

output = self.out_proj(context)
return output

上面的代码中创建了一个下三角掩码causal_mask,用于代表当前时间步不访问后面的时间步(矩阵第\(i\)行的第\(i+1\)列及后面的列值为0),再将该掩码与mask(处理padding)结合,可以同时时间当前位置不关注后面的时间步以及不会关注padding的时间步。

编码器 - 解码器注意力机制

在Transformer的解码器中,Target数据通过Masked多头自注意力机制并经过残差连接和LayerNorm后,作为\(\mathbf{Q}\),而编码器的输出作为\(\mathbf{K}\)\(\mathbf{V}\),进入多头注意力机制模块,这个部分与之前加入注意力机制的Seq2Seq模型很像,都是以解码器当前时间步作为查询\(\mathbf{q}\),以编码器输出作为源数据特征\(\mathbf{K}\)和源数据信息\(\mathbf{V}\),需要注意的是,\(\mathbf{K}\)\(\mathbf{V}\)\(\mathbf{Q}\)仍然需要经过线性变换,使模型具有可学习的能力

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class EncoderDecoderAttention(nn.Module):
def __init__(self, embed_dim, num_heads, dropout=0.1):
super().__init__()
assert embed_dim % num_heads == 0, "Embedding dimension must be divisible by number of heads"

self.embed_dim = embed_dim
self.num_heads = num_heads
self.head_dim = embed_dim // num_heads

self.q_proj = nn.Linear(embed_dim, embed_dim)
self.kv_proj = nn.Linear(embed_dim, 2*embed_dim)
self.out_proj = nn.Linear(embed_dim, embed_dim)
self.dropout = nn.Dropout(dropout)

def forward(self, q, kv, src_mask=None):
batch_size, seq_len_q, _ = q.shape
_, seq_len_kv, _ = kv.shape

Q = self.q_proj(q).view(batch_size, seq_len_q, self.num_heads, self.head_dim).transpose(1, 2)
kv = self.kv_proj(kv)
K, V = kv.chunk(2, dim=-1)
K = K.view(batch_size, seq_len_kv, self.num_heads, self.head_dim).transpose(1, 2)
V = V.view(batch_size, seq_len_kv, self.num_heads, self.head_dim).transpose(1, 2)

attn_score = torch.matmul(Q, K.transpose(-2, -1)) / (self.head_dim ** 0.5)

if src_mask is not None:
src_mask = src.unsqueeze(1).unsqueeze(2)
attn_score = attn_score.masked_fill(src_mask == 0, float('-inf'))

attn_weights = torch.softmax(attn_score, dim=-1)
attn_weights = self.dropout(attn_weights)

context = torch.matmul(attn_weights, V)
context = context.transpose(1, 2).contiguous().view(batch_size, seq_len_q, self.embed_dim)

output = self.out_proj(context)
return output

模型实现

编码器

构建一个编码层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class EncoderLayer(nn.Module):
def __init__(self, embed_dim, num_heads, hidden_dim, dropout=0.1):
super().__init__()
self.self_attn = MultiheadSelfAttention(embed_dim, num_heads, dropout)
self.ffn = PositionalwiseFFN(embed_dim, hidden_dim, dropout)
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
self.dropout = nn.Dropout(dropout)

def forward(self, x, mask=None):
# 多头注意力
attn_output = self.self_attn(x, mask)
# 残差+规范化
x = self.norm1(x + self.dropout(attn_output))
# PositionalwiseFFN
ffn_output = self.ffn(x)
# 残差+规范化
x = self.norm2(x + self.dropout(ffn_output))
return x

Transformer的编码器是由\(n\)EncoderLayer构成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Encoder(nn.Module):
def __init__(self, vocab_size, num_layers, embed_dim, num_heads, hidden_dim, max_len=5000, dropout=0.1):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.positional_encoding = PositionalEncoding(embed_dim, max_len)
self.layers = nn.ModuleList([
EncoderLayer(embed_dim, num_heads, hidden_dim, dropout)
for _ in range(num_layers)
])
self.dropout = nn.Dropout(dropout)

def forward(self, x, mask=None):
x = self.embedding(x)
x = self.positional_encoding(x)
x = self.dropout(x)
for layer in self.layers:
x = layer(x, mask)
return x

解码器

构建一个解码层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class DecoderLayer(nn.Module):
def __init__(self, embed_dim, num_heads, hidden_dim, device='cpu', dropout=0.1):
super().__init__()
self.device = device
self.self_attn = MaskedMultiheadSelfAttention(embed_dim, num_heads, dropout)
self.cross_attn = EncoderDecoderAttention(embed_dim, num_heads, dropout)
self.ffn = PositionalwiseFFN(embed_dim, hidden_dim, dropout)
self.norm1 = nn.LayerNorm(embed_dim)
self.norm2 = nn.LayerNorm(embed_dim)
self.norm3 = nn.LayerNorm(embed_dim)
self.dropout = nn.Dropout(dropout)

def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
# 掩码多头自注意力机制
attn_output1 = self.self_attn(x, tgt_mask, self.device)
x = self.norm1(x + self.dropout(attn_output1))

# 编码器 - 解码器注意力
attn_output2 = self.cross_attn(x, enc_output, src_mask)
x = self.norm2(x + self.dropout(attn_output2))

# PositionalwiseFFN
ffn_output = self.ffn(x)
x = self.norm3(x + self.dropout(ffn_output))
return x

Transformer的解码器是由\(n\)DecoderLayer构成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class Decoder(nn.Module):
def __init__(self, vocab_size, num_layers, embed_dim, num_heads, hidden_dim, device='cpu', max_len = 5000, dropout=0.1):
super().__init__()
self.device=device
self.embedding = nn.Embedding(vocab_size, embed_dim)
self.positional_encoding = PositionalEncoding(embed_dim, max_len)
self.layers = nn.ModuleList([
DecoderLayer(embed_dim, num_heads, hidden_dim, device, dropout)
for _ in range(num_layers)
])
self.dropout = nn.Dropout(dropout)

def forward(self, x, enc_output, src_mask=None, tgt_mask=None):
x = self.embedding(x)
x = self.positional_encoding(x)
x = self.dropout(x)

for layer in self.layers:
x = layer(x, enc_output, src_mask, tgt_mask)
return x

Transformer

根据前面完成的编码器和解码器,完成完成的Transformer模型:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, num_layers, embed_dim, num_heads, hidden_dim, device='cpu', max_len=5000, dropout=0.1):
super().__init__()
self.device = device
self.encoder = Encoder(src_vocab_size, num_layers, embed_dim, num_heads, hidden_dim, max_len, dropout)
self.decoder = Decoder(tgt_vocab_size, num_layers, embed_dim, num_heads, hidden_dim, device, max_len, dropout)
self.fc = nn.Linear(embed_dim, tgt_vocab_size)

def forward(self, src, tgt, src_mask=None, tgt_mask=None):
enc_output = self.encoder(src, src_mask)
dec_output = self.decoder(tgt, enc_output, src_mask, tgt_mask)
output = self.fc(dec_output)
return output

Transformer
https://blog.shinebook.net/2025/04/23/人工智能/理论基础/深度学习/Transformer/
作者
X
发布于
2025年4月23日
许可协议