深度循环神经网络
深度循环神经网络
假设在时间步\(t\) 有一个小批量的输入数据\(\mathbf{X}_t\in\mathbb{R}^{n\times
d}\) (样本数:\(n\) ,每个样本中的输入数:\(d\) )。同时,将\(l^{\mathrm{th}}\) 隐藏层(\(l=1, \cdots, L\) )的隐藏状态设为\(\mathbf{H}_t^{(l)}\in\mathbb{R}^{n\times
h}\) (隐藏单元数:\(h\) ),输出层变量设置为\(\mathbf{O}_t\in\mathbb{R}^{n\times
q}\) (输出数:\(q\) )。设置\(\mathbf{H}_t^{(0)}=\mathbf{X}_t\) ,第\(l\) 个隐藏层的隐状态使用激活函数\(\phi_l\) ,则: \[
\mathbf{H}_t^{(l)} = \phi_l(\mathbf{H}_t^{(l-1)}\mathbf{W}_{xh}^{(l)} +
\mathbf{H}_{t-1}^{(l)}\mathbf{W}_{hh}^{(l)} + \mathbf{b}_h^{(l)})
\] 其中,权重\(\mathbf{W}_{xh}^{(l)}\in\mathbb{R}^{h\times
h}\) ,\(\mathbf{W}_{hh}^{(l)}\in\mathbb{R}^{h\times
h}\) 和偏置\(\mathbf{b}_h^{(l)}\in\mathbb{R}^{1\times
h}\) 都是第\(l\) 个隐藏层的模型参数
最后,输出层的计算仅基于第\(l\) 个隐藏层的最终隐状态: \[
\mathbf{O}_t = \mathbf{H}_t^{(L)}\mathbf{W}_{hq} + \mathbf{b}_q
\] 其中,权重\(\mathbf{W}_{hq}\in\mathbb{R}^{h\times
q}\) 和偏置\(\mathbf{b}_q\in\mathbb{R}^{1\times
q}\) 都是输出层的模型参数
在PyTorch中,nn.RNN()
、nn.GRU()
、nn.LSTM()
都可以简洁实现深度网络,只需要设置参数num_layers
即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import torchimport torch.nn as nnclass GRU (nn.Module): def __init__ (self, vocab_size, embed_size, hidden_size, num_layers=1 ): self .hidden_size = hidden_size self .num_layers = num_layers self .embedding = nn.Embedding(vocab_size, embed_size) self .gru = nn.GRU(embed_size, hidden_size, num_layers=num_layers, batch_first=True ) self .fc = nn.Linear(hidden_size, vocab_size) def forward (self, x, hidden ): x = self .embedding(x) out, hidden = self .gru(x, hidden) out = self .fc(out) return out, hidden def init_hidden (self, batch_size, device ): return torch.zeros(self .num_layers, batch_size, self .hidden_size).to(device) device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu' ) vocab_size = 10000 embed_size = 256 hedden_size = 512 num_layers = 3 model = GRU(vocab_size, embed_size, hidden_size, num_layers).to(device)
不论是RNN、GRU还是LSTM,每个隐藏层的状态都是一样的: \[
\mathbf{H}_t^{(l)}\in\mathbb{R}^{n\times h}
\] 其中\(n\) 是一个小批量的大小,\(h\) 是一个隐藏层单元数量,那么,对于整个隐藏层而言:
\[
\mathbf{H}_t = \left[
\begin{matrix}
\mathbf{H}_t^{(1)}\\
\mathbf{H}_t^{(2)}\\
\vdots\\
\mathbf{H}_t^{(L)}
\end{matrix}
\right]
\] 则 \[
\mathbf{H}_t\in\mathbb{R}^{L\times n\times h}
\]
hidden
为最后一个时间步的隐藏层输出,维度为:(num_layers, batch_size, hidden_size)
双向循环神经网络
双向循环神经网络
对于单向循环神经网络,第\(t\) 个时间步的输出仅考虑当前的输入\(\mathbf{X}_t\) 和之前的信息\(\mathbf{H}_{t-1}\) ,而不会考虑未来的信息。这样在一些特殊的任务重可能会导致一些问题,比如:
I am ______
I am ______ hungry.
I am ______ hungry, and I can eat half a pig.
根据可以获得的信息量,我们可以用不同的词填空,如“很高兴”(“happy”)、“不”(“not”)、和“非常”(“very”)。很明显,如果不考虑后面的信息,第二个和第三个可能都无法填写出合适的内容。
因此,在有些情况下,同时考虑过去和未来的信息可以实现更好的效果,而双向循环神经网络就可以实现同时考虑过去和未来的信息。
对于任意的时间步\(t\) ,给定一个小批量的输入数据\(\mathbf{X}_t\in\mathbb{R}^{n\times
d}\) (样本数\(n\) ,每个示例中的输入数\(d\) ),并且令隐藏层激活函数为\(\phi\) 。
在双向架构中,我们设该时间步的前向和反向隐藏状态分别为\(\mathbf{\overrightarrow H}_t\in\mathbb{R}^{n\times
h}\) 和\(\mathbf{\overleftarrow
H}_t\in\mathbb{R}^{n\times h}\) ,其中\(h\) 是隐藏单元数目。前向和反向隐藏状态的更新如下:
\[
\begin{aligned}
&\mathbf{\overrightarrow H}_t =
\phi(\mathbf{X}_t\mathbf{W}_{xh}^{(f)} + \mathbf{\overrightarrow
H}_{t-1}\mathbf{W}_{hh}^{(f)} + \mathbf{b}_h^{(f)})\\
&\mathbf{\overleftarrow H}_t =
\phi(\mathbf{X}_t\mathbf{W}_{xh}^{(b)} + \mathbf{\overleftarrow
H}_{t+1}\mathbf{W}_{hh}^{(b)} + \mathbf{b}_h^{(b)})
\end{aligned}
\] 其中,权重\(\mathbf{W}_{xh}^{(f)},
\mathbf{W}_{xh}^{(b)}\in\mathbb{R}^{d\times h}\) ,\(\mathbf{W}_{hh}^{(f)},
\mathbf{W}_{hh}^{(b)}\in\mathbb{R}^{h\times h}\) 和偏置\(\mathbf{b}_h^{(f)},
\mathbf{b}_h^{(b)}\in\mathbb{R}^{1\times h}\) 都是模型参数。
接下来,将前向隐状态\(\mathbf{\overrightarrow
H}_t\) 和反向隐状态\(\mathbf{\overleftarrow
H}_t\) 连接起来,获得需要送入输出层的隐状态\(\mathbf{H}_t\in\mathbb{R}^{n\times 2h}\)
\[
\mathbf{H}_t = \left[
\begin{matrix}
\mathbf{\overrightarrow H}_t & \mathbf{\overleftarrow H}_t
\end{matrix}
\right]
\]
在具有多个隐藏层的深度双向循环神经网络中,该信息作为输入传递到下一个双向层。最后,输出层计算得到的输出为\(\mathbf{O}_t\in\mathbb{R}^{n\times
q}\) (\(q\) 是输出单元的数目):
\[
\mathbf{O}_t = \mathbf{H}_t\mathbf{W}_{hq} + \mathbf{b}_q
\] 这里,权重矩阵\(\mathbf{W}_{hq}\in\mathbb{R}^{2h\times
q}\) 和偏置\(\mathbf{b}_q\in\mathbb{R}^{1\times
q}\) 是输出层的模型参数。事实上,这两个方向可以拥有不同数量的隐藏单元。
在PyTorch中,nn.RNN()
、nn.GRU()
、nn.LSTM()
都可以简洁实现双向循环神经网络,只需要设置bidirectional=True
即可。
对于PyTorch的双向循环网络,包括nn.RNN()
、nn.GRU()
和nn.LSTM()
,其最后一个隐藏状态是正向隐藏状态(处理输入序列从第一个时间步到最后一个时间步的最终结果)和反向隐藏状态(处理输入序列从最后一个时间步到第一个时间步的最终结果)
对于单层双向循环网络的隐藏状态:
1 2 3 4 5 import torch.nn as nn gru = nn.GRU(input_size=100 , hidden_size=64 , bidirectional=True , batch_first=True ) output, h_n = gru(inputs)
此时,h_n
的形状为(2, batch_size, hidden_size)
,
h_n[0]
是正向GRU最后一个时间步(第seq_length
个时间步)的隐藏状态
h_n[1]
是反向GRU最后一个时间步(第1个时间步,因为反向处理)的隐藏状态
若需要将正反向的隐藏状态合并为单个向量(例如用于分类任务),可以通过拼接实现:
1 hidden_cat = torch.cat([h_n[0 ], h_n[1 ]], dim=1 )
output
的形状为(batch_size, seq_length, hidden_size*2)
,即自动将最后一层的双向隐藏状态进行了拼接。
比如,使用双向GRU进行情感分类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import torchimport torch.nn as nnclass BidiGRU (nn.Module): def __init__ (self, vocab_size, embed_size, hidden_size, num_layers=1 , dropout=0.5 ): super ().__init__() self .hidden_size = hidden_size self .num_layers = num_layers self .embedding = nn.Embedding(vocab_size, embed_size) self .gru = nn.GRU(embed_size, hidden_size, num_layers, bidirectional=True , dropout=dropout, batch_first=True ) self .dropout = nn.Dropout(dropout) self .fc = nn.Linear(2 *hidden_size, 1 ) def forward (self, x ): x = self .embedding(x) out, _ = self .gru(x) last_out = out[:, -1 , :] last_out = self .dropout(last_out) last_out = self .fc(last_out) return torch.sigmoid(last_out).squeeze()
不定长时间序列的输入
如果不同样本的时间步不同,无法直接使用DataLoader
进行批量加载数据,除非设置其参数batch_size=1
,因为DataLoader
返回的是多维度的张量,第一个维度为batch_size
,如果一个样本为文本序列,那么DataLoader
返回的就是(batch_size, seq_length)
形状的张量,如果不同样本的时间步数不一样,DataLoader
就会报错。
为了能够使得DataLoader
正常使用,我们需要对每个批次内的文本统一填充至该批次最大长度(不是全局最大长度,减少无效计算)。
在定义Dataset
时,依旧可以使用原始长度的序列,需要注意的一点是,由于后续需要告知DataLoader
每个序列的长度,此处需要多返回一个序列长度的张量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import torchfrom torch.utils.data import Dataset, DataLoaderclass dataset (Dataset ): def __init__ (self, data, target, word_to_idx ): self .data = data self .target = target self .word_to_idx = word_to_idx def __len__ (self ): return len (self .data) def __getitem__ (self, index ): word_data = [word_to_idx[word] if word in word_to_idx else word_to_idx['<UNK>' ] for word in self .data[index]] return ( torch.tensor(word_data, dtype=torch.long), torch.tensor(self .target[index], dtype=torch.long), torch.tensor(len (word_data), dtype=torch.long) )
在定义DataLoader
时,使用自定义的collate_fn
函数,将不同长度的序列进行填充:
传递给collate_fn
函数的参数取决于dataset
的设置:
比如Dataset
的__getitem__
返回的是(data, target)
,那么collate_fn
得到的batch
的每个元素是(data_i, target_i)
的元组;如果返回的是(data, target, length)
,那么collate_fn
得到的batch
的每个元素是(data_i, target_i, length_i)
1 2 3 4 5 6 7 8 9 10 11 from torch.nn.utils.rnn import pad_sequencedef collate_fn (batch ): sequences, targets, lengths = zip (*batch) padded_sequences = pad_sequence(sequences, batch_first=True , padding_value=0 ) lengths = torch.tensor(lengths) return padded_sequences, targets, lengths dataloader = DataLoader(dataset, batch_size=64 , collate_fn=collate_fn, shuffle=True )
假设一个批次包含3个序列,长度分别为[5, 3, 4]
:
1 2 3 4 5 sequences = [ tensor([1 , 2 , 3 , 4 , 5 ]), tensor([6 , 7 , 8 ]), tensor([9 , 10 , 11 , 12 ]) ]
经过pad_sequences()
动态填充后:
1 2 3 4 5 padded_sequences = [ tensor([1 , 2 , 3 , 4 , 5 ]), tensor([6 , 7 , 8 , 0 , 0 ]), tensor([9 , 10 , 11 , 12 , 0 ]) ]
排序优化: 为了提高RNN计算效率,返回的小批量数据需要按照序列长度降序排序,下面是优化过后的函数:
1 2 3 4 5 6 7 8 9 10 11 def collate_fn (batch ): sequences, targets, lengths = zip (*batch) sorted_indices = torch.argsort(torch.tensor(lengths), descending=True ) sorted_sequences = [sequences[i] for i in sorted_indices] sorted_targets = [targets[i] for i in sorted_indices] sorted_lengths = [lengths[i] for i in sorted_indices] padded = pad_sequence(sorted_sequences, batch_first=True , padding_value=0 ) return padded, torch.tensor(sorted_targets), torch.tensor(sorted_lengths)
在RNN
、GRU
、LSTM
等模型中,需要将填充的数据压缩,填充的数据不计入时间步,即填充的数据不进入模型进行计算
使用pack_padded_sequences
进行压缩,处理完成后再使用pad_packed_sequences
恢复数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import torch.nn as nnclass GRU (nn.Module): def __init__ (self, vocab_size, embed_size, hidden_size, num_layers=1 ): super ().__init__() self .hidden_size = hidden_size self .num_layers = num_layers self .embedding = nn.Embedding(vocab_size, embed_size) self .gru = nn.GRU(embed_size, hidden_size, num_layers=num_layers, batch_first=True ) self .fc = nn.Linear(hidden_size, vocab_size) def forward (self, x, lengths ): embedded = self .embedding(x) packed = nn.utils.rnn.pack_padded_sequences( embedded, lengths.cpu(), batch_first=True , enforce_sorted=True ) packed_out, h_n = self .gru(packed) output, _ = nn.utils.rnn.pad_packed_sequences( packed_out, batch_first=True , total_length=x.size(1 ) ) output = self .fc(output) return output, h_n
原理:
pack_padded_sequences
将数据存储为紧凑的PackedSequence
,仅包含有效时间步,RNN
等每个时间步仅处理有效样本数。需要注意的是,lengths
是一个包含每个样本实际有效长度的一维张量,比如,假设一个批次(batch)包含三个样本,其有效长度分别为5、3、4,则lengths=torch.tensor([5, 3, 4])
,形状为(3,)
由于lengths
不需要在GPU上进行运算,该参数需要在CPU上
压缩后RNN
等神经网络的计算方式:
比如,有5个样本,lengths=torch.tensor([3, 3, 2, 1, 1])
:
第1个时间步:5个样本有效(所有序列的第1个元素)
第2个时间步:3个样本有效(前3个序列的第2个元素)
第3个时间步:3个样本有效(前2个序列的第3个元素)
RNN会在每个时间步仅处理当前有效的样本,无效的样本会被自动跳过。
由上可知,在DataLoader
中设置序列由长到短排序后输出,能够在模型计算的时候加快速度。
编码器-解码器架构
对于像机器翻译这样的问题,模型的输入和输出都是长度可变的序列。为了处理这种类型的输入和输出,可以使用编码器-解码器架构(encoder-decoder):编码器接收一个长度可变的序列作为输入,并将其转换为具有固定形状的编码状态;解码器将固定形状的编码状态映射到长度可变的序列。
encoder-decoder
我们可以使用RNN、LSTM、GRU等实现编码器-解码器架构:
编码器就是将序列输入RNN得到的最后一层的最后一个隐藏状态作为编码,然后将此编码作为解码器RNN的隐藏状态传入,依次获得不同时间步的输出(第一个输出为<SOS>
,然后将每次的预测输出作为下一个时间步的输入,直到预测到结尾<EOS>
或达到手动设置的解码器最大可预测时间步长度)
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import torchimport torch.nn as nnclass Encoder (nn.Module): def __init__ (self, enc_vocab_size, enc_embed_size, enc_hidden_size, num_layers=1 ): super ().__init__() self .embedding = nn.Embedding(enc_vocab_size, enc_embed_size) self .rnn = nn.GRU(enc_embed_size, enc_hidden_size, num_layers=num_layers, batch_first=True ) def forward (self, x ): embedded = self .embedding(x) output, hidden = self .rnn(embedded) return hidden[-1 ] class Decoder (nn.Module): def __init__ (self, dec_vocab_size, dec_embed_size, dec_hidden_size ): super ().__init__() self .embedding = nn.Embedding(dec_vocab_size, dec_embed_size) self .rnn = nn.GRU(dec_embed_size, dec_hidden_size, batch_first=True ) self .fc = nn.Linear(dec_hidden_size, dec_vocab_size) def forward (self, trg, hidden ): embedded = self .embedding(trg) output, hidden = self .rnn(embedded, hidden) output = self .fc(output) return output
Seq2Seq(序列到序列)模型
Seq2Seq
对于最简易的编码器-解码器架构,编码器将输入序列编码后,作为隐藏单元数据传入给解码器,然后解码器再逐个获取解码内容,每过一个时间步,解码器的隐藏层数据\(h\) 都会改变,无法保留编码器最开始传递的编码。
Seq2Seq在其上面的改进是将编码不仅传递给解码器的隐藏层,而且将其作为输入数据在每个时间步都传递给循环神经元,这样就可以保证在每个时间步都能知道编码器的编码信息,而不会有信息损失。
对于编码器,为了能更加全面地获得输入数据的信息,我们通常使用双向循环神经网络,如果编码器的隐藏层维度与解码器的隐藏层维度不同,可以使用线性层将编码器隐藏层映射到解码器隐藏层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 import torchimport torch.nn as nnfrom torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequenceclass Encoder (nn.Module): def __init__ (self, enc_vocab_size, enc_embed_size, enc_hidden_size, num_layers=1 ): super ().__init__() self .embedding = nn.Embedding(enc_vocab_size, enc_embed_size) self .rnn = nn.GRU(enc_embed_size, enc_hidden_size, num_layers=num_layers, bidirectional=True , batch_first=True ) self .fc = nn.Linear(enc_hidden_size*2 , enc_hidden_size) def forward (self, src, src_len ): embedded = self .embedding(src) packed = pack_padded_sequence(embedded, src_len.cpu(), batch_first=True , enforce_sorted=True ) output, hidden = self .rnn(embedded) output, _ = pad_packed_sequence(output, batch_first=True ) hidden = torch.cat((hidden[-1 ], hidden[-2 ]), dim=2 ) output = hidden hidden = nn.Linear(hidden) return output, hidden class Decoder (nn.Module): def __init__ (self, dec_vocab_size, dec_embed_size, dec_hidden_size ): super ().__init__() self .embedding = nn.Embedding(dec_vocab_size, dec_embed_size) self .rnn = nn.GRU(dec_embed_size + dec_hidden_size*2 , dec_hidden_size) self .fc = nn.Linear(dec_hidden_size, dec_vocab_size) def forward (self, trg, hidden, encoder_output ): """ trg: [batch_size, trg_len] hidden: [batch_size, dec_hidden_size] encoder_output: [batch_size, enc_hidden_size*2] """ embedded = self .embedding(trg) rnn_input = torch.cat((embedded, hidden.unsqueeze(1 )), dim=2 ) output, hidden = self .rnn(rnn_input, hidden) output = self .fc(output) return output class Seq2Seq (nn.Module): def __init__ (self, encoder, decoder ): self .encoder = encoder self .decoder = decoder def forward (self, src, src_len, trg ): encoder_output, hidden = self .encoder(src, src_len) output = self .decoder(trg, hidden, encoder_output) return output