文本预处理 下载数据集 我这里使用的是Kaggle的环境。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 !pip install d2l import collectionsimport refrom d2l import torch as d2ld2l.DATA_HUB['time_machine' ] = (d2l.DATA_URL + 'timemachine.txt' ,'090b5e7e70c295757f55df93cb0a180b9691891a' ) def read_time_machine (): with open (d2l.download('time_machine' ), 'r' ) as f: lines = f.readlines() return [re.sub('[^A-Za-z]+' , ' ' , line).strip().lower() for line in lines] lines = read_time_machine() print (f'# 文本总行数: {len (lines)} ' )print (lines[0 ])print (lines[10 ])
tokenize 1 2 3 4 5 6 7 8 9 10 11 12 def tokenize (lines, token='word' ): if token == 'word' : return [line.split() for line in lines] elif token == 'char' : return [list (line) for line in lines] else : print ('错误:未知词元类型:' + token) tokens = tokenize(lines) for i in range (11 ): print (tokens[i])
词表 词表是将token都映射为一个数字。根据他们的出现频率来映射。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 class Vocab : """文本词表""" def __init__ (self, tokens=None , min_freq=0 , reserved_tokens=None ): if tokens is None : tokens = [] if reserved_tokens is None : reserved_tokens = [] counter = count_corpus(tokens) self ._token_freqs = sorted (counter.items(), key=lambda x: x[1 ],reverse=True ) self .idx_to_token = ['<unk>' ] + reserved_tokens self .token_to_idx = {token: idx for idx, token in enumerate (self .idx_to_token)} for token, freq in self ._token_freqs: if freq < min_freq: break if token not in self .token_to_idx: self .idx_to_token.append(token) self .token_to_idx[token] = len (self .idx_to_token) - 1 def __len__ (self ): return len (self .idx_to_token) def __getitem__ (self, tokens ): if not isinstance (tokens, (list , tuple )): return self .token_to_idx.get(tokens, self .unk) return [self .__getitem__(token) for token in tokens] def to_tokens (self, indices ): if not isinstance (indices, (list , tuple )): return self .idx_to_token[indices] return [self .idx_to_token[index] for index in indices] @property def unk (self ): return 0 @property def token_freqs (self ): return self ._token_freqs def count_corpus (tokens ): """统计词元的频率""" if len (tokens) == 0 or isinstance (tokens[0 ], list ): tokens = [token for line in tokens for token in line] return collections.Counter(tokens)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def load_corpus_time_machine (max_tokens=-1 ): """返回时光机器数据集的词元索引列表和词表""" lines = read_time_machine() tokens = tokenize(lines, 'char' ) vocab = Vocab(tokens) corpus = [vocab[token] for line in tokens for token in line] if max_tokens > 0 : corpus = corpus[:max_tokens] return corpus, vocab corpus, vocab = load_corpus_time_machine() len (corpus), len (vocab)
读取长序列 采用随机采样和顺序分区
随机采样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 def seq_data_iter_random (corpus, batch_size, num_steps ): """使用随机抽样生成一个小批量子序列""" corpus = corpus[random.randint(0 , num_steps - 1 ):] num_subseqs = (len (corpus) - 1 ) // num_steps initial_indices = list (range (0 , num_subseqs * num_steps, num_steps)) random.shuffle(initial_indices) def data (pos ): return corpus[pos: pos + num_steps] num_batches = num_subseqs // batch_size for i in range (0 , batch_size * num_batches, batch_size): initial_indices_per_batch = initial_indices[i: i + batch_size] X = [data(j) for j in initial_indices_per_batch] Y = [data(j + 1 ) for j in initial_indices_per_batch] yield torch.tensor(X), torch.tensor(Y)
顺序采样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def seq_data_iter_sequential (corpus, batch_size, num_steps ): """使用顺序分区生成一个小批量子序列""" offset = random.randint(0 , num_steps) num_tokens = ((len (corpus) - offset - 1 ) // batch_size) * batch_size Xs = torch.tensor(corpus[offset: offset + num_tokens]) Ys = torch.tensor(corpus[offset + 1 : offset + 1 + num_tokens]) Xs, Ys = Xs.reshape(batch_size, -1 ), Ys.reshape(batch_size, -1 ) num_batches = Xs.shape[1 ] // num_steps for i in range (0 , num_steps * num_batches, num_steps): X = Xs[:, i: i + num_steps] Y = Ys[:, i: i + num_steps] yield X, Y
循环神经网络 结构
代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 def get_params (vocab_size, num_hiddens, device ): num_inputs = num_outputs = vocab_size def normal (shape ): return torch.randn(size=shape, device=device) * 0.01 W_xh = normal((num_inputs, num_hiddens)) W_hh = normal((num_hiddens, num_hiddens)) b_h = torch.zeros(num_hiddens, device=device) W_hq = normal((num_hiddens, num_outputs)) b_q = torch.zeros(num_outputs, device=device) params = [W_xh, W_hh, b_h, W_hq, b_q] for param in params: param.requires_grad_(True ) return params
1 2 3 4 5 6 7 8 9 10 11 def rnn (inputs, state, params ): W_xh, W_hh, b_h, W_hq, b_q = params H, = state outputs = [] for X in inputs: H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h) Y = torch.mm(H, W_hq) + b_q outputs.append(Y) return torch.cat(outputs, dim=0 ), (H,)
梯度裁剪 1 2 3 4 5 6 7 8 9 10 def grad_clipping (net, theta ): """裁剪梯度""" if isinstance (net, nn.Module): params = [p for p in net.parameters() if p.requires_grad] else : params = net.params norm = torch.sqrt(sum (torch.sum ((p.grad ** 2 )) for p in params)) if norm > theta: for param in params: param.grad[:] *= theta / norm
门控处理单元
侯选隐状态 $ \tilde{\mathbf{H}}t = \tanh(\mathbf{X}t \mathbf{W} {xh} + \left(\mathbf{R}t \odot \mathbf{H} {t-1}\right) \mathbf{W} {hh} + \mathbf{b}_h),$
候选隐状态为了给模型提供一个“潜在的更新方向”,但不强制更新,而是通过门控机制决定更新多少。
捕获短期依赖。
隐状态 $\mathbf{H}_t = \mathbf{Z}t \odot \mathbf{H} {t-1} + (1 - \mathbf{Z}_t) \odot \tilde{\mathbf{H}}_t.$
新的隐状态 在多大程度上来自旧的状态 $ 𝐻 𝑡 − 1$ 和 新的候选状态 $ 𝐻 ~ 𝑡 $。
捕获长期依赖。
代码 1 2 3 4 5 6 7 8 9 10 11 12 def gru (inputs, state, params ): W_xz, W_hz, b_z, W_xr, W_hr, b_r, W_xh, W_hh, b_h, W_hq, b_q = params H, = state outputs = [] for X in inputs: Z = torch.sigmoid((X @ W_xz) + (H @ W_hz) + b_z) R = torch.sigmoid((X @ W_xr) + (H @ W_hr) + b_r) H_tilda = torch.tanh((X @ W_xh) + ((R * H) @ W_hh) + b_h) H = Z * H + (1 - Z) * H_tilda Y = H @ W_hq + b_q outputs.append(Y) return torch.cat(outputs, dim=0 ), (H,)
长短期记忆网络 输入,输出和遗忘门
$ \begin{split}\begin{aligned}\mathbf{I}t &= \sigma(\mathbf{X}t \mathbf{W} {xi} + \mathbf{H} {t-1} \mathbf{W}_{hi} + \mathbf{b}i),\ \mathbf{F}t &= \sigma(\mathbf{X}t \mathbf{W} {xf} + \mathbf{H} {t-1} \mathbf{W} {hf} + \mathbf{b}f),\ \mathbf{O}t &= \sigma(\mathbf{X}t \mathbf{W} {xo} + \mathbf{H} {t-1} \mathbf{W} {ho} + \mathbf{b}_o), \end{aligned}\end{split} $
候选记忆元
$ \tilde{\mathbf{C}}t = \text{tanh}(\mathbf{X}t \mathbf{W} {xc} + \mathbf{H} {t-1} \mathbf{W}_{hc} + \mathbf{b}_c) $
隐状态
$ \mathbf{H}_t = \mathbf{O}_t \odot \tanh(\mathbf{C}_t).$
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 def lstm (inputs, state, params ): [W_xi, W_hi, b_i, W_xf, W_hf, b_f, W_xo, W_ho, b_o, W_xc, W_hc, b_c, W_hq, b_q] = params (H, C) = state outputs = [] for X in inputs: I = torch.sigmoid((X @ W_xi) + (H @ W_hi) + b_i) F = torch.sigmoid((X @ W_xf) + (H @ W_hf) + b_f) O = torch.sigmoid((X @ W_xo) + (H @ W_ho) + b_o) C_tilda = torch.tanh((X @ W_xc) + (H @ W_hc) + b_c) C = F * C + I * C_tilda H = O * torch.tanh(C) Y = (H @ W_hq) + b_q outputs.append(Y) return torch.cat(outputs, dim=0 ), (H, C)
解码器,编码器架构
从技术上讲,编码器将长度可变的输入序列转换成 形状固定的上下文变量 𝑐 , 并且将输入序列的信息在该上下文变量中进行编码
seqtoseq 独立的循环神经网络解码器是基于输入序列的编码信息 和输出序列已经看见的或者生成的词元来预测下一个词元
预测
在 (Sutskever et al. , 2014 )的设计中, 正是基于这种设计将输入序列的编码信息送入到解码器中来生成输出序列的。 在其他一些设计中 (Cho et al. , 2014 ), 如 图9.7.1 所示, 编码器最终的隐状态在每一个时间步都作为解码器的输入序列的一部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 def train_seq2seq (net, data_iter, lr, num_epochs, tgt_vocab, device ): """训练序列到序列模型""" def xavier_init_weights (m ): if type (m) == nn.Linear: nn.init.xavier_uniform_(m.weight) if type (m) == nn.GRU: for param in m._flat_weights_names: if "weight" in param: nn.init.xavier_uniform_(m._parameters[param]) net.apply(xavier_init_weights) net.to(device) optimizer = torch.optim.Adam(net.parameters(), lr=lr) loss = MaskedSoftmaxCELoss() net.train() animator = d2l.Animator(xlabel='epoch' , ylabel='loss' , xlim=[10 , num_epochs]) for epoch in range (num_epochs): timer = d2l.Timer() metric = d2l.Accumulator(2 ) for batch in data_iter: optimizer.zero_grad() X, X_valid_len, Y, Y_valid_len = [x.to(device) for x in batch] bos = torch.tensor([tgt_vocab['<bos>' ]] * Y.shape[0 ], device=device).reshape(-1 , 1 ) dec_input = torch.cat([bos, Y[:, :-1 ]], 1 ) Y_hat, _ = net(X, dec_input, X_valid_len) l = loss(Y_hat, Y, Y_valid_len) l.sum ().backward() d2l.grad_clipping(net, 1 ) num_tokens = Y_valid_len.sum () optimizer.step() with torch.no_grad(): metric.add(l.sum (), num_tokens) if (epoch + 1 ) % 10 == 0 : animator.add(epoch + 1 , (metric[0 ] / metric[1 ],)) print (f'loss {metric[0 ] / metric[1 ]:.3 f} , {metric[1 ] / timer.stop():.1 f} ' f'tokens/sec on {str (device)} ' )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def predict_seq2seq (net, src_sentence, src_vocab, tgt_vocab, num_steps, device, save_attention_weights=False ): """序列到序列模型的预测""" net.eval () src_tokens = src_vocab[src_sentence.lower().split(' ' )] + [ src_vocab['<eos>' ]] enc_valid_len = torch.tensor([len (src_tokens)], device=device) src_tokens = d2l.truncate_pad(src_tokens, num_steps, src_vocab['<pad>' ]) enc_X = torch.unsqueeze( torch.tensor(src_tokens, dtype=torch.long, device=device), dim=0 ) enc_outputs = net.encoder(enc_X, enc_valid_len) dec_state = net.decoder.init_state(enc_outputs, enc_valid_len) dec_X = torch.unsqueeze(torch.tensor( [tgt_vocab['<bos>' ]], dtype=torch.long, device=device), dim=0 ) output_seq, attention_weight_seq = [], [] for _ in range (num_steps): Y, dec_state = net.decoder(dec_X, dec_state) dec_X = Y.argmax(dim=2 ) pred = dec_X.squeeze(dim=0 ).type (torch.int32).item() if save_attention_weights: attention_weight_seq.append(net.decoder.attention_weights) if pred == tgt_vocab['<eos>' ]: break output_seq.append(pred) return ' ' .join(tgt_vocab.to_tokens(output_seq)), attention_weight_seq