模型
上图是Transformer结构(包括中文和英文版),下面会解析结构并使用PyTorch进行实现。
模型结构
层规范化(LayerNorm)
层规范化(Layer
Normalization)是一种深度神经网络中的规范化技术,由Jimmy Lei Ba
等人在2016年提出,旨在解决训练过程中的内部协变量偏移(Internal Covariate
Shift)问题。其核心思想是对单个样本同一层内所有神经元的输出进行标准化 ,提升训练稳定性与收敛速度。
对于输入向量\(\mathbf{x}\in\mathbb{R}^{d}\) (\(d\) 为层维度),LayerNorm的计算流程如下:
计算均值与方差 \[
\begin{aligned}
&\mu = \frac{1}{d}\sum_{i=1}^d x_i\\
&\sigma^2 = \frac{1}{d}\sum_{i=1}^d (x_i-\mu)^2
\end{aligned}
\]
标准化 \[
\hat x_i = \frac{x_i-\mu}{\sqrt{\sigma^2+\epsilon}}
\]
缩放与平移 \[
y_i = \gamma\hat x_i + \beta
\]
对于一个时序样本\(\mathbf{X}\in\mathbb{R}^{m\times
d}\) ,\(m\) 是总共的时间步,\(d\) 是一个时间步上的向量维度,则层规范化是在最后一个维度\(d\) 上进行的,也就是分别对每个时间步上的向量进行规范化。
LayerNorm与BatchNorm的对比 :
规范化维度
样本内同一层所有神经元(特征维度)
批量内同一神经元的所有样本(批量维度)
依赖关系
不依赖批量大小,适合小批量或单样本训练
依赖足够大的批量,小批量时统计不稳定
适用场景
RNN、Transformer、生成模型等序列模型
CNN、全连接网络等固定结构网络
训练/推理差异
无需区分训练与推理模式
推理时需使用训练阶段的移动平均统计量
参数数量
2d (每个特征维度独立缩放和平移)
2C (CNN中每通道独立参数)
LayerNorm无需区分训练与推理模式,因为其规范化是对同一个样本的同一层神经元的输出进行的,无需考虑批量的分布(均值与方差)。
对于一些时间序列模型,比如机器翻译模型,输入的样本很可能具有不同的时间步,那么这样就难以使用BatchNorm进行规范化,因为BatchNorm是在小批量上对不同样本的同一个特征位置进行标准化,而机器翻译输入的样本长度不相同,无法进行标准化。对于这类问题,应该使用LayerNorm,因为LayerNorm是在同一个样本上进行的。
Pytorch中LayerNorm的定义与使用:
1 2 3 4 5 6 7 8 9 10 11 import torchimport torch.nn as nn batch_size = 2 seq_len = 3 d_model = 4 norm = nn.LayerNorm(d_model) x = torch.randn(batch_size, seq_len, d_model) y = norm(x)print (x)print (y)
输出结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 tensor([[[ 1.7553 , 1.2146 , -0.3583 , 0.6030 ], [ 1.5075 , 0.1550 , -0.2058 , 1.1268 ], [ 0.0724 , 0.6387 , -0.7122 , -1.0038 ]], [[-1.1730 , -0.1724 , -0.8566 , 0.2516 ], [ 0.1484 , 1.1121 , -1.2942 , 1.3017 ], [-0.9100 , -0.1195 , -0.6697 , -1.8060 ]]]) tensor([[[ 1.2123 , 0.5235 , -1.4802 , -0.2556 ], [ 1.2373 , -0.7049 , -1.2229 , 0.6905 ], [ 0.5000 , 1.3750 , -0.7123 , -1.1627 ]], [[-1.2252 , 0.5635 , -0.6597 , 1.3214 ], [-0.1640 , 0.7735 , -1.5675 , 0.9580 ], [-0.0553 , 1.2437 , 0.3396 , -1.5280 ]]], grad_fn=<NativeLayerNormBackward0>)
基于位置的前馈网络
顾名思义,基于位置的前馈神经网络(Positionwise Feed-Forward
Networks)就是在同一个位置(时间步)上使用同一个多层感知机(MLP)。
Positionalwise
FFN对序列中每个位置(token)独立应用相同的线性和非线性(激活函数)变换,保持位置信息的独立性。
对于一个时序样本\(\mathbf{X}\in\mathbb{R}^{m\times
d}\) ,\(m\) 是总共的时间步,\(d\) 是一个时间步上的向量维度,则Positionalwise
FFN是在最后一个维度\(d\) 上进行的,也就是分别对每个时间步上的向量通过一个全连接层。
Positionalwise
FFN一共使用了两个线性层,通过两次线性变换实现特征空间的扩展与收缩(比如512$\(2048\) $512),增强特征融合能力:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import torchimport torch.nn as nnclass PositionalwiseFFN (nn.Module): def __init__ (self, d_model, hidden_dim, dropout=0.1 ): super ().__init__() self .linear1 = nn.Linear(d_model, hidden_dim) self .linear2 = nn.Linear(hidden_dim, d_model) self .relu = nn.ReLU() self .dropout = nn.Dropout(dropout) def forward (self, x ): x = self .linear1(x) x = self .relu(x) x = self .dropout(x) x = self .linear2(x) return x
多头自注意力机制
之前已有内容,此处仅给出代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 import torchimport torch.nn as nnimport mathclass MultiheadSelfattention (nn.Module): def __init__ (self, embed_dim, num_heads, dropout=0.1 ): super ().__init__() self .embed_dim = embed_dim self .num_heads = num_heads self .head_dim = embed_dim // num_heads self .q_proj = nn.Linear(embed_dim, embed_dim) self .k_proj = nn.Linear(embed_dim, embed_dim) self .v_proj = nn.Linear(embed_dim, embed_dim) self .out_proj = nn.Linear(embed_dim, embed_dim) self .dropout = nn.Dropout(dropout) self .scale = 1.0 / math.sqrt(self .head_dim) def forward (self, x ): batch_size, seq_len, _ = x.shape Q = self .q_proj(x) K = self .k_proj(x) V = self .v_proj(x) Q = Q.view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) K = K.view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) V = V.view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) attn_score = torch.matmul(Q, K.transpose(-2 , -1 )) * self .scale attn_weights = torch.softmax(attn_score, dim=-1 ) attn_weights = self .dropout(attn_weights) context = torch.matmul(attn_weights, V) context = context.transpose(1 , 2 ).contiguous().view(batch_size, seq_len, -1 ) output = self .out_proj(context) return output, attn_weights
注意:对于多个样本,不同样本的时间步可能会不一样,而自注意力机制对多个样本所有时间步都是一同并行计算的,因此无法像RNN类网络一样使用nn.pack_padded_sequence()
,自注意机制的处理方式是将所有样本的时间步都填充到当前批量最长时间步,然后在前向传播中传递Mask数据:
mask
的维度形状是[batch_size, seq_len]
,每个元素为0
(表示该时间步是padding,需要被忽略)或者1
(表示该时间步是真实token,需要被保留)
加入mask的多头自注意力机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class MultiheadSelfattention (nn.Module): def __init__ (self, embed_dim, num_heads, dropout=0.1 ): super ().__init__() self .embed_dim = embed_dim self .num_heads = num_heads self .head_dim = embed_dim // num_heads self .q_proj = nn.Linear(embed_dim, embed_dim) self .v_proj = nn.Linear(embed_dim, embed_dim) self .k_proj = nn.Linear(embed_dim, embed_dim) self .out_proj = nn.Linear(embed_dim, embed_dim) self .dropout = nn.Dropout(dropout) self .scale = 1.0 / math.sqrt(self .head_dim) def forward (self, x, mask=None ): batch_size, seq_len, _ = x.shape Q = self .q_proj(x).view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) K = self .k_proj(x).view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) V = self .v_proj(x).view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) attn_score = torch.matmul(Q, K.transpose(-2 , -1 )) * self .scale if mask is not None : mask = mask.view(batch_size, 1 , 1 , seq_len) attn_score = attn_score.masked_fill(mask == 0 , float ('-inf' )) attn_weights = torch.softmax(attn_score, dim=-1 ) attn_weights = self .dropout(attn_weights) context = torch.matmul(attn_weights, V) context = context.transpose(1 , 2 ).contiguous().view(batch_size, seq_len, -1 ) output = self .out_proj(context) return output, attn_weights
对mask代码部分的解释:
1 2 3 4 5 if mask is not None : mask = mask.view(batch_size, 1 , 1 , seq_len) attn_score = attn_score.masked_fill(mask == 0 , float ('-inf' ))
只考虑一个样本中的一个注意力头,此时mask为一个向量(原本mask
扩充维度后为[batch_size, 1, 1, seq_len]
,广播机制会自动将其广播到[batch_size, num_heads, seq_len, seq_len]
,因此,对于每个num_heads
中的每行,都是同样的mask最后一个维度的向量,维度为seq_len
)如果该样本前\(s\) 个时间步之后为扩充的时间维度,均进行padding,未mask前:
\[
\mathrm{attn\_score} = \frac{\mathbf{Q}\mathbf{K}^\top}{\sqrt{d}} =
\left[
\begin{matrix}
a(\mathbf{q}_1, \mathbf{k}_1)&a(\mathbf{q}_1,
\mathbf{k}_2)&\cdots&a(\mathbf{q}_1, \mathbf{k}_m)\\
a(\mathbf{q}_2, \mathbf{k}_1)&a(\mathbf{q}_2,
\mathbf{k}_2)&\cdots&a(\mathbf{q}_2, \mathbf{k}_m)\\
\vdots&\vdots& &\vdots\\
a(\mathbf{q}_m, \mathbf{k}_1)&a(\mathbf{q}_m,
\mathbf{k}_2)&\cdots&a(\mathbf{q}_m, \mathbf{k}_m)
\end{matrix}
\right]\in\mathbb{R}^{m\times m}
\] mask为: \[
\mathrm{mask} = [\begin{matrix}
1&\cdots&1&0&\cdots0
\end{matrix}]\in\mathbb{R}^{m}
\] 其中前面\(s\) 个元素为\(1\) ,后面\(m-s\) 个元素为\(0\)
那么每行都会进行同样的mask处理,经过mask处理后的attn_score
为:
\[
\mathrm{attn\_score} = \left[
\begin{matrix}
a(\mathbf{q}_1, \mathbf{k}_1)&\cdots&a(\mathbf{q}_1,
\mathbf{k}_s)&-\infty&\cdots&-\infty\\
a(\mathbf{q}_2, \mathbf{k}_1)&\cdots&a(\mathbf{q}_2,
\mathbf{k}_s)&-\infty&\cdots&-\infty\\
\vdots& & \vdots&\vdots& &\vdots\\
a(\mathbf{q}_s, \mathbf{k}_1)&\cdots&a(\mathbf{q}_s,
\mathbf{k}_s)&-\infty&\cdots&-\infty\\
\vdots& & \vdots&\vdots& &\vdots\\
a(\mathbf{q}_m, \mathbf{k}_1)&\cdots&a(\mathbf{q}_m,
\mathbf{k}_s)&-\infty&\cdots&-\infty
\end{matrix}
\right]
\] 则 \[
\begin{aligned}
\mathrm{context} &=
\mathrm{softmax}\left(\frac{\mathbf{Q}\mathbf{K}^\top}{\sqrt{d}}\right)\mathbf{V}\\
&= \mathrm{softmax}\left(\left[
\begin{matrix}
a(\mathbf{q}_1, \mathbf{k}_1)&\cdots&a(\mathbf{q}_1,
\mathbf{k}_s)&-\infty&\cdots&-\infty\\
a(\mathbf{q}_2, \mathbf{k}_1)&\cdots&a(\mathbf{q}_2,
\mathbf{k}_s)&-\infty&\cdots&-\infty\\
\vdots& & \vdots&\vdots& &\vdots\\
a(\mathbf{q}_s, \mathbf{k}_1)&\cdots&a(\mathbf{q}_s,
\mathbf{k}_s)&-\infty&\cdots&-\infty\\
\vdots& & \vdots&\vdots& &\vdots\\
a(\mathbf{q}_m, \mathbf{k}_1)&\cdots&a(\mathbf{q}_m,
\mathbf{k}_s)&-\infty&\cdots&-\infty
\end{matrix}
\right]\right)\cdot\left[
\begin{matrix}
\mathbf{v}_1\\
\mathbf{v}_2\\
\vdots\\
\mathbf{v}_m
\end{matrix}
\right]\\ \\
&= \left[
\begin{matrix}
\alpha(\mathbf{q}_1, \mathbf{k}_1)&\cdots&\alpha(\mathbf{q}_1,
\mathbf{k}_s)&0&\cdots&0\\
\alpha(\mathbf{q}_2, \mathbf{k}_1)&\cdots&\alpha(\mathbf{q}_2,
\mathbf{k}_s)&0&\cdots&0\\
\vdots& & \vdots&\vdots& &\vdots\\
\alpha(\mathbf{q}_s, \mathbf{k}_1)&\cdots&\alpha(\mathbf{q}_s,
\mathbf{k}_s)&0&\cdots&0\\
\vdots& & \vdots&\vdots& &\vdots\\
\alpha(\mathbf{q}_m, \mathbf{k}_1)&\cdots&\alpha(\mathbf{q}_m,
\mathbf{k}_s)&0&\cdots&0
\end{matrix}
\right]\cdot\left[
\begin{matrix}
\mathbf{v}_1\\
\mathbf{v}_2\\
\vdots\\
\mathbf{v}_m
\end{matrix}
\right]\\ \\
&= \left[
\begin{matrix}
\alpha(\mathbf{q}_1,
\mathbf{k}_1)\mathbf{v}_1&\cdots&\alpha(\mathbf{q}_1,
\mathbf{k}_s)\mathbf{v}_s&0&\cdots&0\\
\alpha(\mathbf{q}_2,
\mathbf{k}_1)\mathbf{v}_1&\cdots&\alpha(\mathbf{q}_2,
\mathbf{k}_s)\mathbf{v}_s&0&\cdots&0\\
\vdots& &\vdots&\vdots& &\vdots\\
\alpha(\mathbf{q}_s,
\mathbf{k}_1)\mathbf{v}_1&\cdots&\alpha(\mathbf{q}_s,
\mathbf{k}_s)\mathbf{v}_s&0&\cdots&0\\
\vdots& &\vdots&\vdots& &\vdots\\
\alpha(\mathbf{q}_m,
\mathbf{k}_1)\mathbf{v}_1&\cdots&\alpha(\mathbf{q}_m,
\mathbf{k}_s)\mathbf{v}_s&0&\cdots&0
\end{matrix}
\right] = \left[
\begin{matrix}
f(\mathbf{q}_1)\\
f(\mathbf{q}_2)\\
\vdots\\
f(\mathbf{q}_s)\\
\vdots\\
f(\mathbf{q}_m)
\end{matrix}
\right]
\end{aligned}
\]
由此可以看出,mask方法只保证了前面真正的时间步不会使用padding的数据,但是padding本身会使用前面真正时间步的数据(即\(f(\mathbf{q}_{s+1})\) 到\(f(\mathbf{q}_m)\) 数据并不是0,而是“注意了”前面\(s\) 个真实有效的时间步)
这么设计的可能原因:
padding部分的数据并不重要,因此其如何参考前面的时间步得到的信息也不重要。
模型可能会学习如何忽略padding数据。
前面真正的数据本身则不能考虑padding的结果,防止真实的信息收到影响。
经过多头自注意力机制后,输出与输入的维度相同,后续在通过更多的多头自注意力机制的时候,真实的数据也会利用mask方法忽略掉填充部分的信息,因此前面填充部分计算得到的结果在后面的多头自注意力机制中也不会用到。
减少逻辑运算,利用广播机制加快模型运行速度。
在上面的代码中,我们加入了mask方法,但是\(\mathbf{K}\) 、\(\mathbf{Q}\) 、\(\mathbf{V}\) 的计算仍然是分开的三个步骤,我们可以继续优化,使得\(\mathbf{K}\) 、\(\mathbf{Q}\) 、\(\mathbf{V}\) 的计算可以并行,加快模型运行速度 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 class MultiheadSelfAttention (nn.Module): def __init__ (self, embed_dim, num_heads, dropout=0.1 ): super ().__init__() assert embed_dim % num_heads == 0 , "Embedding dimension must be divisible by number of heads" self .embed_dim = embed_dim self .num_heads = num_heads self .head_dim = embed_dim // num_heads self .qkv_proj = nn.Linear(embed_dim, 3 *embed_dim) self .out_proj = nn.Linear(embed_dim, embed_dim) self .dropout = nn.Dropout(dropout) def forward (self, x, mask=None ): batch_size, seq_len, _ = x.shape qkv = self .qkv_proj(x) Q, K, V = qkv.chunk(3 , dim=-1 ) Q = Q.view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) K = K.view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) V = V.view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) attn_score = torch.matmul(Q, K.transpose(-2 , -1 )) / (self .head_dim ** 0.5 ) if mask is not None : mask = mask.unsqueeze(1 ).unsqueeze(2 ) attn_score = attn_score.masked_fill(mask == 0 , float ('-inf' )) attn_weights = torch.softmax(attn_score, dim=-1 ) attn_weights = self .dropout(attn_weights) context = torch.matmul(attn_weights, V) context = context.transpose(1 , 2 ).contiguous().view(batch_size, seq_len, self .embed_dim) output = self .out_proj(context) return output
位置编码
之前已有内容,此处仅给出代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 class PositionalEncoding (nn.Module): def __init__ (self, embed_dim, max_len=5000 ): super ().__init__() pe = torch.zeros(max_len, embed_dim) position = torch.arange(0 , max_len, dtype=torch.float ).unsqueeze(1 ) div_term = torch.exp(torch.arange(0 , embed_dim, 2 ).float () * (-math.log(10000.0 ) / embed_dim)) pe[:, 0 ::2 ] = torch.sin(position * div_term) pe[:, 1 ::2 ] = torch.cos(position * div_term) pe = pe.unsqueeze(0 ) self .register_buffer('pe' , pe) def forward (self, x ): x = x + self .pe[:, :x.size(1 )] return x
词嵌入
在Transformer的原论文中,假设词嵌入的初始方差为\(\frac{1}{d}\) ,因此需要乘以\(\sqrt{d}\) 使最终方差为1: \[
\mathrm{Var}(X_{\mathrm{emb}}\cdot\sqrt{d}) = \frac{1}{d}\cdot d = 1
\]
这样可以保证前向传播和反向传播数据的稳定性,防止出现梯度爆炸或梯度消失。
在PyTorch中,nn.Embedding()
对词向量初始化为标准正态分布\(\mathcal{N}(0, 1)\) ,此时词嵌入的方差为
\[
\mathrm{Var}(X_{\mathrm{emb}}) = 1
\] 无需再乘以\(\sqrt{d}\)
残差连接
Transformer结构中多处使用残差连接,可以增强模型拟合能力,防止梯度消失。
比如,在数据输入嵌入层并加上位置编码矩阵后,数据通过多头注意力机制得到新的数据,然后原本的数据与新的数据相加(残差连接),再进行层规范化,经过层规范化的数据既通过Positionalwise
FFN得到新的数据,又与该新的数据相加(残差连接),再进行层规范化。
Masked 多头自注意力机制
在解码器中,Target经过嵌入层和加上位置编码后,通过的是Masked多头自注意力机制(Masked
Multi-head
Self-Attention),与传统的RNN类模型不同的是,Transformer的Target无需按照时间步逐个输入模型,而是可以并行输入模型。比如对于机器翻译模型,训练过程中,解码器可以一次性接受整个批量的翻译数据,包括第一个时间步到最后一个时间步的内容,但是,如果将整个数据直接通过多头自注意力机制,那么前面时间步的数据也会看到后面时间步的数据,而在推理过程中,则是先预测前面时间步的翻译文本,再根据前面翻译的内容去预测后面翻译的文本,也就是说,前面的时间步无法参考后面时间步的输出,那么训练与推理就出现了偏差。因此,为了保证模型训练后能实现正常的推理,在训练过程中,Target中的一个样本前面时间步的内容不应该注意到后面时间步的内容,因此要加入掩码(Mask)机制:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 class MaskedMultiheadSelfAttention (nn.Module): def __init__ (self, embed_dim, num_heads, device='cpu' , dropout=0.1 ): super ().__init__() assert embed_dim % num_heads == 0 , "Embedding dimension must be divisible by number of heads" self .embed_dim = embed_dim self .num_heads = num_heads self .head_dim = embed_dim // num_heads self .device = device self .qkv_proj = nn.Linear(embed_dim, 3 *embed_dim) self .out_proj = nn.Linear(embed_dim, embed_dim) self .dropout = nn.Dropout(dropout) def forward (self, x, mask=None ): batch_size, seq_len, _ = x.shape qkv = self .qkv_proj(x) Q, K, V = qkv.chunk(3 , dim=-1 ) Q = Q.view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) K = K.view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) V = V.view(batch_size, seq_len, self .num_heads, self .head_dim).transpose(1 , 2 ) attn_score = torch.matmul(Q, K.transpose(-1 , -2 )) / (self .head_dim ** 0.5 ) causal_mask = torch.tril(torch.ones(seq_len, seq_len, dtype=torch.long, device=self .device)).unsqueeze(0 ).unsqueeze(1 ) if mask is not None : mask = mask.unsqueeze(1 ).unsqueeze(2 ) causal_mask = causal_mask & mask attn_score = attn_score.masked_fill(causal_mask == 0 , float ('-inf' )) attn_weights = torch.softmax(attn_score, dim=-1 ) attn_weights = self .dropout(attn_weights) context = torch.matmul(attn_weights, V) context = context.transpose(1 , 2 ).contiguous().view(batch_size, seq_len, self .embed_dim) output = self .out_proj(context) return output
上面的代码中创建了一个下三角掩码causal_mask
,用于代表当前时间步不访问后面的时间步(矩阵第\(i\) 行的第\(i+1\) 列及后面的列值为0),再将该掩码与mask
(处理padding)结合,可以同时时间当前位置不关注后面的时间步以及不会关注padding的时间步。
编码器 - 解码器注意力机制
在Transformer的解码器中,Target数据通过Masked多头自注意力机制并经过残差连接和LayerNorm后,作为\(\mathbf{Q}\) ,而编码器的输出作为\(\mathbf{K}\) 和\(\mathbf{V}\) ,进入多头注意力机制模块,这个部分与之前加入注意力机制的Seq2Seq模型很像,都是以解码器当前时间步作为查询\(\mathbf{q}\) ,以编码器输出作为源数据特征\(\mathbf{K}\) 和源数据信息\(\mathbf{V}\) ,需要注意的是,\(\mathbf{K}\) 、\(\mathbf{V}\) 、\(\mathbf{Q}\) 仍然需要经过线性变换,使模型具有可学习的能力
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 class EncoderDecoderAttention (nn.Module): def __init__ (self, embed_dim, num_heads, dropout=0.1 ): super ().__init__() assert embed_dim % num_heads == 0 , "Embedding dimension must be divisible by number of heads" self .embed_dim = embed_dim self .num_heads = num_heads self .head_dim = embed_dim // num_heads self .q_proj = nn.Linear(embed_dim, embed_dim) self .kv_proj = nn.Linear(embed_dim, 2 *embed_dim) self .out_proj = nn.Linear(embed_dim, embed_dim) self .dropout = nn.Dropout(dropout) def forward (self, q, kv, src_mask=None ): batch_size, seq_len_q, _ = q.shape _, seq_len_kv, _ = kv.shape Q = self .q_proj(q).view(batch_size, seq_len_q, self .num_heads, self .head_dim).transpose(1 , 2 ) kv = self .kv_proj(kv) K, V = kv.chunk(2 , dim=-1 ) K = K.view(batch_size, seq_len_kv, self .num_heads, self .head_dim).transpose(1 , 2 ) V = V.view(batch_size, seq_len_kv, self .num_heads, self .head_dim).transpose(1 , 2 ) attn_score = torch.matmul(Q, K.transpose(-2 , -1 )) / (self .head_dim ** 0.5 ) if src_mask is not None : src_mask = src.unsqueeze(1 ).unsqueeze(2 ) attn_score = attn_score.masked_fill(src_mask == 0 , float ('-inf' )) attn_weights = torch.softmax(attn_score, dim=-1 ) attn_weights = self .dropout(attn_weights) context = torch.matmul(attn_weights, V) context = context.transpose(1 , 2 ).contiguous().view(batch_size, seq_len_q, self .embed_dim) output = self .out_proj(context) return output
模型实现
编码器
构建一个编码层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class EncoderLayer (nn.Module): def __init__ (self, embed_dim, num_heads, hidden_dim, dropout=0.1 ): super ().__init__() self .self_attn = MultiheadSelfAttention(embed_dim, num_heads, dropout) self .ffn = PositionalwiseFFN(embed_dim, hidden_dim, dropout) self .norm1 = nn.LayerNorm(embed_dim) self .norm2 = nn.LayerNorm(embed_dim) self .dropout = nn.Dropout(dropout) def forward (self, x, mask=None ): attn_output = self .self_attn(x, mask) x = self .norm1(x + self .dropout(attn_output)) ffn_output = self .ffn(x) x = self .norm2(x + self .dropout(ffn_output)) return x
Transformer的编码器是由\(n\) 个EncoderLayer
构成的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class Encoder (nn.Module): def __init__ (self, vocab_size, num_layers, embed_dim, num_heads, hidden_dim, max_len=5000 , dropout=0.1 ): super ().__init__() self .embedding = nn.Embedding(vocab_size, embed_dim) self .positional_encoding = PositionalEncoding(embed_dim, max_len) self .layers = nn.ModuleList([ EncoderLayer(embed_dim, num_heads, hidden_dim, dropout) for _ in range (num_layers) ]) self .dropout = nn.Dropout(dropout) def forward (self, x, mask=None ): x = self .embedding(x) x = self .positional_encoding(x) x = self .dropout(x) for layer in self .layers: x = layer(x, mask) return x
解码器
构建一个解码层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 class DecoderLayer (nn.Module): def __init__ (self, embed_dim, num_heads, hidden_dim, device='cpu' , dropout=0.1 ): super ().__init__() self .device = device self .self_attn = MaskedMultiheadSelfAttention(embed_dim, num_heads, dropout) self .cross_attn = EncoderDecoderAttention(embed_dim, num_heads, dropout) self .ffn = PositionalwiseFFN(embed_dim, hidden_dim, dropout) self .norm1 = nn.LayerNorm(embed_dim) self .norm2 = nn.LayerNorm(embed_dim) self .norm3 = nn.LayerNorm(embed_dim) self .dropout = nn.Dropout(dropout) def forward (self, x, enc_output, src_mask=None , tgt_mask=None ): attn_output1 = self .self_attn(x, tgt_mask, self .device) x = self .norm1(x + self .dropout(attn_output1)) attn_output2 = self .cross_attn(x, enc_output, src_mask) x = self .norm2(x + self .dropout(attn_output2)) ffn_output = self .ffn(x) x = self .norm3(x + self .dropout(ffn_output)) return x
Transformer的解码器是由\(n\) 个DecoderLayer
构成的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Decoder (nn.Module): def __init__ (self, vocab_size, num_layers, embed_dim, num_heads, hidden_dim, device='cpu' , max_len = 5000 , dropout=0.1 ): super ().__init__() self .device=device self .embedding = nn.Embedding(vocab_size, embed_dim) self .positional_encoding = PositionalEncoding(embed_dim, max_len) self .layers = nn.ModuleList([ DecoderLayer(embed_dim, num_heads, hidden_dim, device, dropout) for _ in range (num_layers) ]) self .dropout = nn.Dropout(dropout) def forward (self, x, enc_output, src_mask=None , tgt_mask=None ): x = self .embedding(x) x = self .positional_encoding(x) x = self .dropout(x) for layer in self .layers: x = layer(x, enc_output, src_mask, tgt_mask) return x
根据前面完成的编码器和解码器,完成完成的Transformer模型:
1 2 3 4 5 6 7 8 9 10 11 12 13 class Transformer (nn.Module): def __init__ (self, src_vocab_size, tgt_vocab_size, num_layers, embed_dim, num_heads, hidden_dim, device='cpu' , max_len=5000 , dropout=0.1 ): super ().__init__() self .device = device self .encoder = Encoder(src_vocab_size, num_layers, embed_dim, num_heads, hidden_dim, max_len, dropout) self .decoder = Decoder(tgt_vocab_size, num_layers, embed_dim, num_heads, hidden_dim, device, max_len, dropout) self .fc = nn.Linear(embed_dim, tgt_vocab_size) def forward (self, src, tgt, src_mask=None , tgt_mask=None ): enc_output = self .encoder(src, src_mask) dec_output = self .decoder(tgt, enc_output, src_mask, tgt_mask) output = self .fc(dec_output) return output