Sequence to Sequence models, also referred to as encoder-decoder models, are a family of models that typically train 2 recurrent neural networks. The first RNN, the encoder, is trained to recieve input text and encode it sequentially. The second RNN, the decoder, receives that encoded sequence and performs a mapping to the text. This unique method of training 2 RNNs together was introduced by Cho et al. in https://arxiv.org/pdf/1406.1078v3.pdfand instantly gained popularity in NLP tasks where the input and output are explicit text pairs, such as translation and summarization.
In the following tutorial, we will explore how to create and train Seq2Seq models in PyTorch for English-German translation.
Overview:
- Imports and Data Loading
- Tokenization
- Creating Encoder RNN
- Creating Decoder RNN
- Setup and Training
- Evaluation
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import matplotlib.pyplot as plt
We use the Multi30k dataset, a popular dataset for translations from and to many languages. For our purposes, we use the English to German translation dataset:
train_path_en = "train.lc.norm.tok.en.txt"
train_path_de = "train.lc.norm.tok.de.txt"
test_path_en = "test_2017_flickr.lc.norm.tok.en.txt"
test_path_de = "test_2017_flickr.lc.norm.tok.de.txt"
Before working with PyTorch, make sure to set the device. This line of code selects a GPU if available.
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device
The data is in txt files, so we use the standard Python open method.
with open(train_path_en) as en_raw_train:
en_parsed_train = en_raw_train.readlines()
with open(train_path_de) as de_raw_train:
de_parsed_train = de_raw_train.readlines()
with open(test_path_en) as en_raw_test:
en_parsed_test = en_raw_test.readlines()
with open(test_path_de) as de_raw_test:
de_parsed_test = de_raw_test.readlines()
The number of instances in our training and test sets match those mentioned in the Github repository of the dataset.
print(len(en_parsed_train))
print(len(de_parsed_train))
print(len(en_parsed_test))
print(len(de_parsed_test))
We show 5 English-German instances below. The data is preprocessed and semi-tokenized (splitting by spaces is sufficient).
for i in range(5):
print("English: {} \n German: {} \n".format(en_parsed_train[i].strip(), de_parsed_train[i].strip()))
Creating tokenized version for all sets by splitting each sentence:
en_train = [sent.strip().split(" ") for sent in en_parsed_train]
en_test = [sent.strip().split(" ") for sent in en_parsed_test]
de_train = [sent.strip().split(" ") for sent in de_parsed_train]
de_test = [sent.strip().split(" ") for sent in de_parsed_test]
Since we have 2 languages in this tutorial, we will create 2 separate dictionaries:
en_index2word = ["<PAD>", "<SOS>", "<EOS>"]
de_index2word = ["<PAD>", "<SOS>", "<EOS>"]
for ds in [en_train, en_test]:
for sent in ds:
for token in sent:
if token not in en_index2word:
en_index2word.append(token)
for ds in [de_train, de_test]:
for sent in ds:
for token in sent:
if token not in de_index2word:
de_index2word.append(token)
Using index2word dictionaries to create the reverse mappings (word2index):
en_word2index = {token: idx for idx, token in enumerate(en_index2word)}
de_word2index = {token: idx for idx, token in enumerate(de_index2word)}
Making sure the mappings are done correctly for both vocabs:
en_index2word[20]
en_word2index["a"]
de_index2word[20]
de_word2index["ein"]
Unlike when working with tweets, we cannot simply assume a specific max sequence length. To have a good estimate, we calculate the average lengths of both languages in the training sets.
en_lengths = sum([len(sent) for sent in en_train])/len(en_train)
de_lengths = sum([len(sent) for sent in de_train])/len(de_train)
en_lengths
de_lengths
The average length for english instances is ~13 words and for German instances is ~12 words. We can assume that most instances are less than 20 words long, and use that as an upper bound for padding and truncating.
seq_length = 20
def encode_and_pad(vocab, sent, max_length):
sos = [vocab["<SOS>"]]
eos = [vocab["<EOS>"]]
pad = [vocab["<PAD>"]]
if len(sent) < max_length - 2: # -2 for SOS and EOS
n_pads = max_length - 2 - len(sent)
encoded = [vocab[w] for w in sent]
return sos + encoded + eos + pad * n_pads
else: # sent is longer than max_length; truncating
encoded = [vocab[w] for w in sent]
truncated = encoded[:max_length - 2]
return sos + truncated + eos
Created fixed-size tokenized sets:
en_train_encoded = [encode_and_pad(en_word2index, sent, seq_length) for sent in en_train]
en_test_encoded = [encode_and_pad(en_word2index, sent, seq_length) for sent in en_test]
de_train_encoded = [encode_and_pad(de_word2index, sent, seq_length) for sent in de_train]
de_test_encoded = [encode_and_pad(de_word2index, sent, seq_length) for sent in de_test]
Finally for data preparation, we create the PyTorch Datasets and DataLoaders required:
batch_size = 50
train_x = np.array(en_train_encoded)
train_y = np.array(de_train_encoded)
test_x = np.array(en_test_encoded)
test_y = np.array(de_test_encoded)
train_ds = TensorDataset(torch.from_numpy(train_x), torch.from_numpy(train_y))
test_ds = TensorDataset(torch.from_numpy(test_x), torch.from_numpy(test_y))
train_dl = DataLoader(train_ds, shuffle=True, batch_size=batch_size, drop_last=True)
test_dl = DataLoader(test_ds, shuffle=True, batch_size=batch_size, drop_last=True)
The Gated Recurrent Unit (GRU) is an RNN that is more efficient than LSTM in handling memory and has very similar performance. We use the GRU as a base model for both the encoder and the decoder.
class EncoderRNN(nn.Module):
def __init__(self, input_size, hidden_size):
super(EncoderRNN, self).__init__()
self.hidden_size = hidden_size
# Embedding layer
self.embedding = nn.Embedding(input_size, hidden_size, padding_idx=0)
# GRU layer. The input and output are both of the same size
# since embedding size = hidden size in this example
self.gru = nn.GRU(hidden_size, hidden_size, batch_first=True)
def forward(self, input, hidden):
# The inputs are first transformed into embeddings
embedded = self.embedding(input)
output = embedded
# As in any RNN, the new input and the previous hidden states are fed
# into the model at each time step
output, hidden = self.gru(output, hidden)
return output, hidden
def initHidden(self):
# This method is used to create the innitial hidden states for the encoder
return torch.zeros(1, batch_size, self.hidden_size)
class DecoderRNN(nn.Module):
def __init__(self, hidden_size, output_size):
super(DecoderRNN, self).__init__()
self.hidden_size = hidden_size
# Embedding layer
self.embedding = nn.Embedding(output_size, hidden_size, padding_idx=0)
# The GRU layer
self.gru = nn.GRU(hidden_size, hidden_size)
# Fully-connected layer for scores
self.out = nn.Linear(hidden_size, output_size)
# Applying Softmax to the scores
self.softmax = nn.LogSoftmax(dim=1)
def forward(self, input, hidden):
# Feeding input through embedding layer
output = self.embedding(input)
# Applying an activation function (ReLu)
output = F.relu(output)
# Feeding input and previous hidden state
output, hidden = self.gru(output, hidden)
# Outputting scores from the final time-step
output = self.softmax(self.out(output[0]))
return output, hidden
# We do not need an .initHidden() method for the decoder since the
# encoder output will act as input in the first decoder time-step
hidden_size = 128
Initialing encoder and decoder and sending to device.
encoder = EncoderRNN(len(en_index2word), hidden_size).to(device)
decoder = DecoderRNN(hidden_size, len(de_index2word)).to(device)
encoder
decoder
When training Seq2Seq models, you need 2 optimizers, one for the encoder and one for the decoder. They are trained simultaneously with each batch.
criterion = nn.CrossEntropyLoss()
enc_optimizer = torch.optim.Adam(encoder.parameters(), lr = 3e-3)
dec_optimizer = torch.optim.Adam(decoder.parameters(), lr = 3e-3)
losses = []
input_length = target_length = seq_length
SOS = en_word2index["<SOS>"]
EOS = en_word2index["<EOS>"]
epochs = 15
for epoch in range(epochs):
for idx, batch in enumerate(train_dl):
# Creating initial hidden states for the encoder
encoder_hidden = encoder.initHidden()
# Sending to device
encoder_hidden = encoder_hidden.to(device)
# Assigning the input and sending to device
input_tensor = batch[0].to(device)
# Assigning the output and sending to device
target_tensor = batch[1].to(device)
# Clearing gradients
enc_optimizer.zero_grad()
dec_optimizer.zero_grad()
# Enabling gradient calculation
with torch.set_grad_enabled(True):
# Feeding batch into encoder
encoder_output, encoder_hidden = encoder(input_tensor, encoder_hidden)
# This is a placeholder tensor for decoder outputs. We send it to device as well
dec_result = torch.zeros(target_length, batch_size, len(de_index2word)).to(device)
# Creating a batch of SOS tokens which will all be fed to the decoder
decoder_input = target_tensor[:, 0].unsqueeze(dim=0).to(device)
# Creating initial hidden states of the decoder by copying encoder hidden states
decoder_hidden = encoder_hidden
# For each time-step in decoding:
for i in range(1, target_length):
# Feed input and previous hidden states
decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
# Finding the best scoring word
best = decoder_output.argmax(1)
# Assigning next input as current best word
decoder_input = best.unsqueeze(dim=0)
# Creating an entry in the placeholder output tensor
dec_result[i] = decoder_output
# Creating scores and targets for loss calculation
scores = dec_result.transpose(1, 0)[1:].reshape(-1, dec_result.shape[2])
targets = target_tensor[1:].reshape(-1)
# Calculating loss
loss = criterion(scores, targets)
# Performing backprop and clipping excess gradients
loss.backward()
torch.nn.utils.clip_grad_norm_(encoder.parameters(), max_norm=1)
torch.nn.utils.clip_grad_norm_(decoder.parameters(), max_norm=1)
enc_optimizer.step()
dec_optimizer.step()
# Keeping track of loss
losses.append(loss.item())
if idx % 100 == 0:
print(idx, sum(losses)/len(losses))
We can see that the loss is decreasing steadily as training proceeds, which means the model is learning the task correctly.
plt.plot(losses)
Testing with an example sentence:
test_sentence = "the men are walking in the streets ."
# Tokenizing, Encoding, transforming to Tensor
test_sentence = torch.tensor(encode_and_pad(en_word2index, test_sentence.split(), seq_length)).unsqueeze(dim=0)
encoder_hidden = torch.zeros(1, 1, hidden_size)
encoder_hidden = encoder_hidden.to(device)
input_tensor = test_sentence.to(device)
enc_optimizer.zero_grad()
dec_optimizer.zero_grad()
result = []
encoder_outputs = torch.zeros(seq_length, encoder.hidden_size, device=device)
with torch.set_grad_enabled(False):
encoder_output, encoder_hidden = encoder(input_tensor, encoder_hidden)
dec_result = torch.zeros(target_length, 1, len(de_index2word)).to(device)
decoder_input = torch.tensor([SOS]).unsqueeze(dim=0).to(device)
decoder_hidden = encoder_hidden
for di in range(1, target_length):
decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden)
best = decoder_output.argmax(1)
result.append(de_index2word[best.to('cpu').item()])
if best.item() == EOS:
break
decoder_input = best.unsqueeze(dim=0)
dec_result[di] = decoder_output
scores = dec_result.reshape(-1, dec_result.shape[2])
targets = target_tensor.reshape(-1)
" ".join(result)
You can use Google Translate to verify the translation if you do not know German. Also, experiment with different example sentences to check model behavior in different situations.
Related Notebooks
- How to Create DataFrame in R Using Examples
- How To Solve Linear Equations Using Sympy In Python
- How to Visualize Data Using Python - Matplotlib
- How To Analyze Data Using Pyspark RDD
- Tweet Sentiment Analysis Using LSTM With PyTorch
- Amazon Review Summarization Using GPT-2 And PyTorch
- Word Embeddings Transformers In SVM Classifier Using Python
- How To Read CSV File Using Python PySpark
- How To Read JSON Data Using Python Pandas