循环神经网络的进阶

深度循环神经网络

深度循环神经网络

假设在时间步\(t\)有一个小批量的输入数据\(\mathbf{X}_t\in\mathbb{R}^{n\times d}\)(样本数:\(n\),每个样本中的输入数:\(d\))。同时,将\(l^{\mathrm{th}}\)隐藏层(\(l=1, \cdots, L\))的隐藏状态设为\(\mathbf{H}_t^{(l)}\in\mathbb{R}^{n\times h}\)(隐藏单元数:\(h\)),输出层变量设置为\(\mathbf{O}_t\in\mathbb{R}^{n\times q}\)(输出数:\(q\))。设置\(\mathbf{H}_t^{(0)}=\mathbf{X}_t\),第\(l\)个隐藏层的隐状态使用激活函数\(\phi_l\),则: \[ \mathbf{H}_t^{(l)} = \phi_l(\mathbf{H}_t^{(l-1)}\mathbf{W}_{xh}^{(l)} + \mathbf{H}_{t-1}^{(l)}\mathbf{W}_{hh}^{(l)} + \mathbf{b}_h^{(l)}) \] 其中,权重\(\mathbf{W}_{xh}^{(l)}\in\mathbb{R}^{h\times h}\)\(\mathbf{W}_{hh}^{(l)}\in\mathbb{R}^{h\times h}\)和偏置\(\mathbf{b}_h^{(l)}\in\mathbb{R}^{1\times h}\)都是第\(l\)个隐藏层的模型参数

最后,输出层的计算仅基于第\(l\)个隐藏层的最终隐状态: \[ \mathbf{O}_t = \mathbf{H}_t^{(L)}\mathbf{W}_{hq} + \mathbf{b}_q \] 其中,权重\(\mathbf{W}_{hq}\in\mathbb{R}^{h\times q}\)和偏置\(\mathbf{b}_q\in\mathbb{R}^{1\times q}\)都是输出层的模型参数

在PyTorch中,nn.RNN()nn.GRU()nn.LSTM()都可以简洁实现深度网络,只需要设置参数num_layers即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import torch
import torch.nn as nn

class GRU(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers=1):
self.hidden_size = hidden_size
self.num_layers = num_layers

self.embedding = nn.Embedding(vocab_size, embed_size)
self.gru = nn.GRU(embed_size, hidden_size, num_layers=num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, vocab_size)

def forward(self, x, hidden):
x = self.embedding(x)
out, hidden = self.gru(x, hidden)
out = self.fc(out)
return out, hidden

def init_hidden(self, batch_size, device):
return torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
vocab_size = 10000
embed_size = 256
hedden_size = 512
num_layers = 3
model = GRU(vocab_size, embed_size, hidden_size, num_layers).to(device)

不论是RNN、GRU还是LSTM,每个隐藏层的状态都是一样的: \[ \mathbf{H}_t^{(l)}\in\mathbb{R}^{n\times h} \] 其中\(n\)是一个小批量的大小,\(h\)是一个隐藏层单元数量,那么,对于整个隐藏层而言: \[ \mathbf{H}_t = \left[ \begin{matrix} \mathbf{H}_t^{(1)}\\ \mathbf{H}_t^{(2)}\\ \vdots\\ \mathbf{H}_t^{(L)} \end{matrix} \right] \]\[ \mathbf{H}_t\in\mathbb{R}^{L\times n\times h} \] hidden为最后一个时间步的隐藏层输出,维度为:(num_layers, batch_size, hidden_size)

双向循环神经网络

双向循环神经网络

对于单向循环神经网络,第\(t\)个时间步的输出仅考虑当前的输入\(\mathbf{X}_t\)和之前的信息\(\mathbf{H}_{t-1}\),而不会考虑未来的信息。这样在一些特殊的任务重可能会导致一些问题,比如:

  • I am ______
  • I am ______ hungry.
  • I am ______ hungry, and I can eat half a pig.

根据可以获得的信息量,我们可以用不同的词填空,如“很高兴”(“happy”)、“不”(“not”)、和“非常”(“very”)。很明显,如果不考虑后面的信息,第二个和第三个可能都无法填写出合适的内容。

因此,在有些情况下,同时考虑过去和未来的信息可以实现更好的效果,而双向循环神经网络就可以实现同时考虑过去和未来的信息。

对于任意的时间步\(t\),给定一个小批量的输入数据\(\mathbf{X}_t\in\mathbb{R}^{n\times d}\)(样本数\(n\),每个示例中的输入数\(d\)),并且令隐藏层激活函数为\(\phi\)

在双向架构中,我们设该时间步的前向和反向隐藏状态分别为\(\mathbf{\overrightarrow H}_t\in\mathbb{R}^{n\times h}\)\(\mathbf{\overleftarrow H}_t\in\mathbb{R}^{n\times h}\),其中\(h\)是隐藏单元数目。前向和反向隐藏状态的更新如下: \[ \begin{aligned} &\mathbf{\overrightarrow H}_t = \phi(\mathbf{X}_t\mathbf{W}_{xh}^{(f)} + \mathbf{\overrightarrow H}_{t-1}\mathbf{W}_{hh}^{(f)} + \mathbf{b}_h^{(f)})\\ &\mathbf{\overleftarrow H}_t = \phi(\mathbf{X}_t\mathbf{W}_{xh}^{(b)} + \mathbf{\overleftarrow H}_{t+1}\mathbf{W}_{hh}^{(b)} + \mathbf{b}_h^{(b)}) \end{aligned} \] 其中,权重\(\mathbf{W}_{xh}^{(f)}, \mathbf{W}_{xh}^{(b)}\in\mathbb{R}^{d\times h}\)\(\mathbf{W}_{hh}^{(f)}, \mathbf{W}_{hh}^{(b)}\in\mathbb{R}^{h\times h}\)和偏置\(\mathbf{b}_h^{(f)}, \mathbf{b}_h^{(b)}\in\mathbb{R}^{1\times h}\)都是模型参数。

接下来,将前向隐状态\(\mathbf{\overrightarrow H}_t\)和反向隐状态\(\mathbf{\overleftarrow H}_t\)连接起来,获得需要送入输出层的隐状态\(\mathbf{H}_t\in\mathbb{R}^{n\times 2h}\) \[ \mathbf{H}_t = \left[ \begin{matrix} \mathbf{\overrightarrow H}_t & \mathbf{\overleftarrow H}_t \end{matrix} \right] \] 在具有多个隐藏层的深度双向循环神经网络中,该信息作为输入传递到下一个双向层。最后,输出层计算得到的输出为\(\mathbf{O}_t\in\mathbb{R}^{n\times q}\)\(q\)是输出单元的数目): \[ \mathbf{O}_t = \mathbf{H}_t\mathbf{W}_{hq} + \mathbf{b}_q \] 这里,权重矩阵\(\mathbf{W}_{hq}\in\mathbb{R}^{2h\times q}\)和偏置\(\mathbf{b}_q\in\mathbb{R}^{1\times q}\)是输出层的模型参数。事实上,这两个方向可以拥有不同数量的隐藏单元。

在PyTorch中,nn.RNN()nn.GRU()nn.LSTM()都可以简洁实现双向循环神经网络,只需要设置bidirectional=True即可。

对于PyTorch的双向循环网络,包括nn.RNN()nn.GRU()nn.LSTM(),其最后一个隐藏状态是正向隐藏状态(处理输入序列从第一个时间步到最后一个时间步的最终结果)和反向隐藏状态(处理输入序列从最后一个时间步到第一个时间步的最终结果)

对于单层双向循环网络的隐藏状态:

1
2
3
4
5
import torch.nn as nn

gru = nn.GRU(input_size=100, hidden_size=64, bidirectional=True, batch_first=True)

output, h_n = gru(inputs) # inputs形状为(batch_size, seq_length, input_size)

此时,h_n的形状为(2, batch_size, hidden_size)

  • h_n[0]是正向GRU最后一个时间步(第seq_length个时间步)的隐藏状态
  • h_n[1]是反向GRU最后一个时间步(第1个时间步,因为反向处理)的隐藏状态

若需要将正反向的隐藏状态合并为单个向量(例如用于分类任务),可以通过拼接实现:

1
hidden_cat = torch.cat([h_n[0], h_n[1]], dim=1)	# 形状为(batch_size, hidden_size*2)

output的形状为(batch_size, seq_length, hidden_size*2),即自动将最后一层的双向隐藏状态进行了拼接。

比如,使用双向GRU进行情感分类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import torch
import torch.nn as nn

class BidiGRU(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers=1, dropout=0.5):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers

self.embedding = nn.Embedding(vocab_size, embed_size)
self.gru = nn.GRU(embed_size, hidden_size, num_layers, bidirectional=True, dropout=dropout, batch_first=True)
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(2*hidden_size, 1)

def forward(self, x):
x = self.embedding(x)
# 不传入初始隐藏状态,默认初始隐藏状态为全零张量
out, _ = self.gru(x) # out的维度为(batch_size, seq_length, hidden_size*2),为最后一个隐藏层的双向隐藏状态
# 取最后一个时间步的输出(双向拼接)
last_out = out[:, -1, :] # 维度为(batch_size, hidden_size*2)
last_out = self.dropout(last_out)
last_out = self.fc(last_out)
return torch.sigmoid(last_out).squeeze()


不定长时间序列的输入

如果不同样本的时间步不同,无法直接使用DataLoader进行批量加载数据,除非设置其参数batch_size=1,因为DataLoader返回的是多维度的张量,第一个维度为batch_size,如果一个样本为文本序列,那么DataLoader返回的就是(batch_size, seq_length)形状的张量,如果不同样本的时间步数不一样,DataLoader就会报错。

为了能够使得DataLoader正常使用,我们需要对每个批次内的文本统一填充至该批次最大长度(不是全局最大长度,减少无效计算)。

  1. 在定义Dataset时,依旧可以使用原始长度的序列,需要注意的一点是,由于后续需要告知DataLoader每个序列的长度,此处需要多返回一个序列长度的张量:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import torch
from torch.utils.data import Dataset, DataLoader

class dataset(Dataset):
def __init__(self, data, target, word_to_idx):
# `data`是原始文本序列数据,`target`是原始文本的标签,比如情感分类
self.data = data
self.target = target
self.word_to_idx = word_to_idx

def __len__(self):
return len(self.data)

def __getitem__(self, index):
word_data = [word_to_idx[word] if word in word_to_idx else word_to_idx['<UNK>'] for word in self.data[index]]
return (
torch.tensor(word_data, dtype=torch.long),
torch.tensor(self.target[index], dtype=torch.long),
torch.tensor(len(word_data), dtype=torch.long)
)
  1. 在定义DataLoader时,使用自定义的collate_fn函数,将不同长度的序列进行填充:

传递给collate_fn函数的参数取决于dataset的设置:

比如Dataset__getitem__返回的是(data, target),那么collate_fn得到的batch的每个元素是(data_i, target_i)的元组;如果返回的是(data, target, length),那么collate_fn得到的batch的每个元素是(data_i, target_i, length_i)

1
2
3
4
5
6
7
8
9
10
11
from torch.nn.utils.rnn import 	pad_sequence
def collate_fn(batch):
# 解包数据和长度
sequences, targets, lengths = zip(*batch)
# 填充到批次内的最长长度
padded_sequences = pad_sequence(sequences, batch_first=True, padding_value=0) # 以0作为填充值,以批量大小作为第一个维度
lengths = torch.tensor(lengths)
# 保证返回的都是pytorch的张量类型数据
return padded_sequences, targets, lengths

dataloader = DataLoader(dataset, batch_size=64, collate_fn=collate_fn, shuffle=True)

假设一个批次包含3个序列,长度分别为[5, 3, 4]

1
2
3
4
5
sequences = [
tensor([1, 2, 3, 4, 5]),
tensor([6, 7, 8]),
tensor([9, 10, 11, 12])
]

经过pad_sequences()动态填充后:

1
2
3
4
5
padded_sequences = [
tensor([1, 2, 3, 4, 5]),
tensor([6, 7, 8, 0, 0]),
tensor([9, 10, 11, 12, 0])
]

排序优化:为了提高RNN计算效率,返回的小批量数据需要按照序列长度降序排序,下面是优化过后的函数:

1
2
3
4
5
6
7
8
9
10
11
def collate_fn(batch):
sequences, targets, lengths = zip(*batch)
# 按照序列长度降序排列
sorted_indices = torch.argsort(torch.tensor(lengths), descending=True) # 按照数据从大到小排序,排序的结果为对应索引的位置
sorted_sequences = [sequences[i] for i in sorted_indices]
sorted_targets = [targets[i] for i in sorted_indices]
sorted_lengths = [lengths[i] for i in sorted_indices]

# 填充到批次最长长度
padded = pad_sequence(sorted_sequences, batch_first=True, padding_value=0)
return padded, torch.tensor(sorted_targets), torch.tensor(sorted_lengths)
  1. RNNGRULSTM等模型中,需要将填充的数据压缩,填充的数据不计入时间步,即填充的数据不进入模型进行计算

使用pack_padded_sequences进行压缩,处理完成后再使用pad_packed_sequences恢复数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import torch.nn as nn

class GRU(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, num_layers=1):
super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers

self.embedding = nn.Embedding(vocab_size, embed_size)
self.gru = nn.GRU(embed_size, hidden_size, num_layers=num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, vocab_size)

def forward(self, x, lengths):
# x: (batch_size, padded_seq_len), lengths: (batch_size,)
# lengths是一个包含每个样本实际有效参数的一维张量
embedded = self.embedding(x)

# 压缩填充部分(需要确保lengths是CPU张量)
packed = nn.utils.rnn.pack_padded_sequences(
embedded, lengths.cpu(), batch_first=True, enforce_sorted=True
)
# 将压缩后的数据通过神经网络
packed_out, h_n = self.gru(packed)

# 解包恢复填充形状
output, _ = nn.utils.rnn.pad_packed_sequences(
packed_out, batch_first=True, total_length=x.size(1)
)

output = self.fc(output)

return output, h_n

原理:

pack_padded_sequences将数据存储为紧凑的PackedSequence,仅包含有效时间步,RNN等每个时间步仅处理有效样本数。需要注意的是,lengths是一个包含每个样本实际有效长度的一维张量,比如,假设一个批次(batch)包含三个样本,其有效长度分别为5、3、4,则lengths=torch.tensor([5, 3, 4]),形状为(3,)

由于lengths不需要在GPU上进行运算,该参数需要在CPU上

压缩后RNN等神经网络的计算方式:

比如,有5个样本,lengths=torch.tensor([3, 3, 2, 1, 1])

  • 第1个时间步:5个样本有效(所有序列的第1个元素)
  • 第2个时间步:3个样本有效(前3个序列的第2个元素)
  • 第3个时间步:3个样本有效(前2个序列的第3个元素)

RNN会在每个时间步仅处理当前有效的样本,无效的样本会被自动跳过。

由上可知,在DataLoader中设置序列由长到短排序后输出,能够在模型计算的时候加快速度。

编码器-解码器架构

对于像机器翻译这样的问题,模型的输入和输出都是长度可变的序列。为了处理这种类型的输入和输出,可以使用编码器-解码器架构(encoder-decoder):编码器接收一个长度可变的序列作为输入,并将其转换为具有固定形状的编码状态;解码器将固定形状的编码状态映射到长度可变的序列。

encoder-decoder

我们可以使用RNN、LSTM、GRU等实现编码器-解码器架构:

编码器就是将序列输入RNN得到的最后一层的最后一个隐藏状态作为编码,然后将此编码作为解码器RNN的隐藏状态传入,依次获得不同时间步的输出(第一个输出为<SOS>,然后将每次的预测输出作为下一个时间步的输入,直到预测到结尾<EOS>或达到手动设置的解码器最大可预测时间步长度)

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import torch
import torch.nn as nn

# 基于encoder-decoder架构构建一个简单的机器翻译模型

class Encoder(nn.Module):
def __init__(self, enc_vocab_size, enc_embed_size, enc_hidden_size, num_layers=1):
super().__init__()
self.embedding = nn.Embedding(enc_vocab_size, enc_embed_size)
self.rnn = nn.GRU(enc_embed_size, enc_hidden_size, num_layers=num_layers, batch_first=True)

def forward(self, x):
embedded = self.embedding(x)
output, hidden = self.rnn(embedded)
return hidden[-1]

class Decoder(nn.Module):
def __init__(self, dec_vocab_size, dec_embed_size, dec_hidden_size):
super().__init__()
self.embedding = nn.Embedding(dec_vocab_size, dec_embed_size)
self.rnn = nn.GRU(dec_embed_size, dec_hidden_size, batch_first=True)
self.fc = nn.Linear(dec_hidden_size, dec_vocab_size)

def forward(self, trg, hidden):
embedded = self.embedding(trg)
output, hidden = self.rnn(embedded, hidden)
output = self.fc(output)
return output

Seq2Seq(序列到序列)模型

Seq2Seq

对于最简易的编码器-解码器架构,编码器将输入序列编码后,作为隐藏单元数据传入给解码器,然后解码器再逐个获取解码内容,每过一个时间步,解码器的隐藏层数据\(h\)都会改变,无法保留编码器最开始传递的编码。

Seq2Seq在其上面的改进是将编码不仅传递给解码器的隐藏层,而且将其作为输入数据在每个时间步都传递给循环神经元,这样就可以保证在每个时间步都能知道编码器的编码信息,而不会有信息损失。

对于编码器,为了能更加全面地获得输入数据的信息,我们通常使用双向循环神经网络,如果编码器的隐藏层维度与解码器的隐藏层维度不同,可以使用线性层将编码器隐藏层映射到解码器隐藏层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence

class Encoder(nn.Module):
def __init__(self, enc_vocab_size, enc_embed_size, enc_hidden_size, num_layers=1):
super().__init__()
self.embedding = nn.Embedding(enc_vocab_size, enc_embed_size)
self.rnn = nn.GRU(enc_embed_size, enc_hidden_size, num_layers=num_layers, bidirectional=True, batch_first=True)
self.fc = nn.Linear(enc_hidden_size*2, enc_hidden_size) # 映射

def forward(self, src, src_len):
embedded = self.embedding(src)
packed = pack_padded_sequence(embedded, src_len.cpu(), batch_first=True, enforce_sorted=True)
output, hidden = self.rnn(embedded) # output的维度为[batch_size, seq_len, enc_hidden_size*2]
output, _ = pad_packed_sequence(output, batch_first=True)
hidden = torch.cat((hidden[-1], hidden[-2]), dim=2) # 将最后一层的双向hidden进行拼接,维度为[batch_size, enc_hidden_size*2]
output = hidden
hidden = nn.Linear(hidden) # 映射到维度[batch_size, enc_hidden_size],用于当做解码器的初始隐藏hidden
return output, hidden

class Decoder(nn.Module):
def __init__(self, dec_vocab_size, dec_embed_size, dec_hidden_size):
super().__init__()
self.embedding = nn.Embedding(dec_vocab_size, dec_embed_size)
self.rnn = nn.GRU(dec_embed_size + dec_hidden_size*2, dec_hidden_size) # 这里我们默认enc_hidden_size与dec_hidden_size大小相同
self.fc = nn.Linear(dec_hidden_size, dec_vocab_size)

def forward(self, trg, hidden, encoder_output):
"""
trg: [batch_size, trg_len]
hidden: [batch_size, dec_hidden_size]
encoder_output: [batch_size, enc_hidden_size*2]
"""
embedded = self.embedding(trg) # [batch_size, trg_len, dec_embed_size]
rnn_input = torch.cat((embedded, hidden.unsqueeze(1)), dim=2) # [batch_size, trg_len, dec_embed_size + enc_hidden_size*2]
output, hidden = self.rnn(rnn_input, hidden)
# output: [batch_size, trg_len, dec_hidden_size]
output = self.fc(output) # [batch_size, trg_len, dec_vocab_size]
return output

class Seq2Seq(nn.Module):
def __init__(self, encoder, decoder):
self.encoder = encoder
self.decoder = decoder

def forward(self, src, src_len, trg):
encoder_output, hidden = self.encoder(src, src_len)
output = self.decoder(trg, hidden, encoder_output)
return output

循环神经网络的进阶
https://blog.shinebook.net/2025/04/19/人工智能/理论基础/深度学习/循环神经网络的进阶/
作者
X
发布于
2025年4月19日
许可协议