PyTorch Beginner's Tutorial (4) - Sentiment Analysis of Text Using LSTM
This article is based on the code from Long Short-Term Memory: From Zero to Hero with PyTorch, with some modifications and added annotations for clarity. The referenced article provides an in-depth introduction to LSTM. For those unfamiliar with LSTM, reading it first might be helpful.
Here, we use an Amazon review dataset to train a classifier that can detect the sentiment of text.
Get Dataset:
Data Preprocessing
First, import the necessary libraries:
```python import bz2 # For reading bz2 compressed files from collections import Counter # For word frequency statistics import re # Regular expressions import nltk # Text preprocessing import numpy as np ```
Extract the data samples to a "data" directory in the current folder. It should contain two files: "train.ft.txt.bz2" and "test.ft.txt.bz2".
After extraction, read in the training and testing data:
```python train_file = bz2.BZ2File('../data/amazon_reviews/train.ft.txt.bz2') test_file = bz2.BZ2File('../data/amazon_reviews/test.ft.txt.bz2') train_file = train_file.readlines() test_file = test_file.readlines() print(train_file[0]) ```
``` b'__label__2 Stuning even for the non-gamer: This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^\n' ```
As shown, each data entry consists of two parts: Label and Data. Here:
represents a negative review, which we will encode as 0.__label__2
represents a positive review, which we will encode as 1.
Due to the large dataset size, we'll use only 1 million records for training. The dataset will be split into an 80:20 ratio for training and testing.
```python num_train = 800000 num_test = 200000 train_file = [x.decode('utf-8') for x in train_file[:num_train]] test_file = [x.decode('utf-8') for x in test_file[:num_test]] ```
In this example, we use
because the source file is stored as binary, which is indicated by theb''
In the source file, data and labels are combined, so we need to separate them:
```python # Encode '__label__1' as 0 (negative review) and '__label__2' as 1 (positive review) train_labels = [0 if x.split(' ')[0] == '__label__1' else 1 for x in train_file] test_labels = [0 if x.split(' ')[0] == '__label__1' else 1 for x in test_file] """ `split(' ', 1)[1]`: Separates label from data and retrieves the data part. `[:-1]`: Removes the last character (\n). `lower()`: Converts to lowercase, as case sensitivity doesn’t aid sentiment analysis and increases encoding complexity. """ train_sentences = [x.split(' ', 1)[1][:-1].lower() for x in train_file] test_sentences = [x.split(' ', 1)[1][:-1].lower() for x in test_file] ```
After separating the data, we perform some basic data cleaning:
Since numbers do not contribute significantly to sentiment classification, we replace all numbers with 0:
```python for i in range(len(train_sentences)): train_sentences[i] = re.sub('\d','0',train_sentences[i]) for i in range(len(test_sentences)): test_sentences[i] = re.sub('\d','0',test_sentences[i]) ```
The dataset also contains samples with website links, such as: Welcome to our website:
. Since URLs can interfere with data processing, we replace them with a placeholder: Welcome to our website: <url>
```python for i in range(len(train_sentences)): if 'www.' in train_sentences[i] or 'http:' in train_sentences[i] or 'https:' in train_sentences[i] or '.com' in train_sentences[i]: train_sentences[i] = re.sub(r"([^ ]+(?<=\.[a-z]{3}))", "<url>", train_sentences[i]) for i in range(len(test_sentences)): if 'www.' in test_sentences[i] or 'http:' in test_sentences[i] or 'https:' in test_sentences[i] or '.com' in test_sentences[i]: test_sentences[i] = re.sub(r"([^ ]+(?<=\.[a-z]{3}))", "<url>", test_sentences[i]) ```
After completing data cleaning, we need to tokenize the text and discard words that only appear once, as they hold minimal reference value:
```python words = Counter() # Tracks the frequency of each word for i, sentence in enumerate(train_sentences): words_list = nltk.word_tokenize(sentence) # Tokenize the sentence words.update(words_list) # Update the word frequency list train_sentences[i] = words_list # Store the tokenized words list if i%200000 == 0: # Print progress every 200,000 entries print(str((i*100)/num_train) + "% done") print("100% done") ```
``` 0.0% done 25.0% done 50.0% done 75.0% done 100% done ```
Next, we remove words that appear only once:
```python words = {k:v for k,v in words.items() if v>1} ```
We then sort words
in descending order of frequency and convert it to a list, forming our vocabulary. Later, word encoding will be based on this vocabulary:
```python words = sorted(words, key=words.get,reverse=True) print(words[:10]) # Display the 10 most frequent words ```
``` ['.', 'the', ',', 'i', 'and', 'a', 'to', 'it', 'of', 'this'] ```
To the vocabulary, we add a special token:
: This token represents padding, as we’ll standardize sentence length. Overly long sentences will be truncated, and shorter ones will be padded with this token.
```python words = ['_PAD'] + words ```
Once the vocabulary is prepared, we proceed with encoding the words by mapping each word to a numeric value. Here, we use each word’s position in the list as its encoded value.
```python word2idx = {o:i for i,o in enumerate(words)} idx2word = {i:o for i,o in enumerate(words)} ```
After preparing the mapping dictionary, we can convert the words in train_sentences
into numerical representations:
```python for i, sentence in enumerate(train_sentences): train_sentences[i] = [word2idx[word] if word in word2idx else 0 for word in sentence] for i, sentence in enumerate(test_sentences): test_sentences[i] = [word2idx[word.lower()] if word.lower() in word2idx else 0 for word in nltk.word_tokenize(sentence)] ```
In the code above,
else 0
indicates that if a word is not found in the dictionary, it’s assigned a code of 0, corresponding to_PAD
as noted earlier.
To facilitate model construction, we need to standardize the length of all sentences. Here, we set a fixed length of 200. Sentences shorter than this will be padded with 0
) at the beginning, while those exceeding this length will be truncated from the end:
```python def pad_input(sentences, seq_len): """ Standardizes sentence length to `seq_len`: truncates sentences that exceed this length from the end, and pads shorter ones with 0 at the beginning. """ features = np.zeros((len(sentences), seq_len),dtype=int) for ii, review in enumerate(sentences): if len(review) != 0: features[ii, -len(review):] = np.array(review)[:seq_len] return features # Standardize the length of sentences in the training and test datasets train_sentences = pad_input(train_sentences, 200) test_sentences = pad_input(test_sentences, 200) ```
In addition to standardizing the length, this function also converts the sequences into numpy arrays. The label datasets need a similar transformation:
```python train_labels = np.array(train_labels) test_labels = np.array(test_labels) ```
At this point, data preprocessing is nearly complete, and it’s time to move on to PyTorch.
Model Building
First, let's import the necessary packages for Pytorch:
```python import torch from import TensorDataset, DataLoader import torch.nn as nn ```
Next, we’ll set up DataLoaders for both the training and test datasets. Batch size is set to 200:
```python batch_size = 200 train_data = TensorDataset(torch.from_numpy(train_sentences), torch.from_numpy(train_labels)) test_data = TensorDataset(torch.from_numpy(test_sentences), torch.from_numpy(test_labels)) train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size) test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size) ```
If possible, it's recommended to use a GPU to accelerate computations:
```python device = torch.device('cuda') if torch.cuda.is_available() else torch.device("cpu") ```
Now, let’s proceed with building the model:
```python class SentimentNet(nn.Module): def __init__(self, vocab_size): super(SentimentNet, self).__init__() self.n_layers = n_layers = 2 # Number of LSTM layers # Dimension of hidden states, LSTM outputs hidden states of 512 dimensions self.hidden_dim = hidden_dim = 512 embedding_dim = 400 # Encode words as 400-dimensional vectors drop_prob=0.5 # Dropout probability # Define embedding layer to convert integers to vectors self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, # Dimension of input vectors hidden_dim, # Dimension of hidden states度 n_layers, # Number of LSTM layers dropout=drop_prob, batch_first=True # Set first dimension to batch size ) # Fully connected layer following LSTM self.fc = nn.Linear(in_features=hidden_dim, # Output from LSTM as input to this layer out_features=1 # For sentiment analysis, output is binary (0 or 1), hence dimension is 1 ) self.sigmoid = nn.Sigmoid() # Apply sigmoid to the output of the linear layer # Add Dropout to the final fully connected layer self.dropout = nn.Dropout(drop_prob) def forward(self, x, hidden): """ x: Input batch with size (batch_size, 200), where 200 is sentence length hidden: Hidden and cell states from the previous timestep, in the form (h, c) where both h and c have size (n_layers, batch_size, hidden_dim), i.e., (2, 200, 512) """ # The first dimension corresponds to batch size batch_size = x.size(0) # Convert x to LongTensor type as required by embedding layer x = x.long() # Encode x, changing size from (batch_size, 200) to (batch_size, 200, embedding_dim) embeds = self.embedding(x) # Pass encoded vectors and hidden states to LSTM # lstm_out size: (batch_size, 200, 512), where 200 corresponds to the number of words in a sentence # hidden is a tuple (hidden_state, cell_state) of size (2, batch_size, 512) due to the two LSTM layers lstm_out, hidden = self.lstm(embeds, hidden) # Flatten lstm_out for the fully connected layer, changing size to (batch_size * 200, hidden_dim) # Since each word's output passes through the fully connected layer, this effectively sets the batch size to 40000 lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim) # Apply Dropout to the fully connected layer out = self.dropout(lstm_out) # Pass through the fully connected layer # The output size is (40000, 1) out = self.fc(out) # Apply sigmoid out = self.sigmoid(out) # Reshape output to (batch_size, 200) so each word has a corresponding output out = out.view(batch_size, -1) # Only take the output of the last word # Final output size becomes (batch_size, 1) out = out[:,-1] # Return final output and hidden states (h, c) return out, hidden def init_hidden(self, batch_size): """ Initialize hidden states: the first input to LSTM has no previous hidden states, so we initialize with zeros. This is a tuple as LSTM requires both hidden and cell states. """ hidden = (torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device), torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device) ) return hidden ```
Model Definition Completed, Constructing the Model Object:
```python model = SentimentNet(len(words)) ```
``` SentimentNet( (embedding): Embedding(221497, 400) (lstm): LSTM(400, 512, num_layers=2, batch_first=True, dropout=0.5) (fc): Linear(in_features=512, out_features=1, bias=True) (sigmoid): Sigmoid() (dropout): Dropout(p=0.5, inplace=False) ) ```
Next, we define the loss function. Since this is a binary classification problem, we'll use Binary Cross Entropy (BCE):
```python criterion = nn.BCELoss() ```
We'll use the Adam optimizer:
```python lr = 0.005 optimizer = torch.optim.Adam(model.parameters(), lr=lr) ```
Now, let’s define the training code:
```python epochs = 2 # Number of training epochs counter = 0 # Counts training iterations print_every = 1000 # Print status every 1000 iterations for i in range(epochs): h = model.init_hidden(batch_size) # Initialize the first hidden state for inputs, labels in train_loader: # Retrieve a batch of inputs and labels from train_loader counter += 1 # Increment training count # Convert the previous hidden state to a tuple format # Since we use two layers, len(h) == 2 h = tuple([ for e in h]) # Move data to GPU inputs, labels =, # Zero the model gradients model.zero_grad() # Forward pass with the current inputs and hidden state, # then receive the output and the new hidden state output, h = model(inputs, h) # Calculate loss with the predicted and true labels loss = criterion(output, labels.float()) # Backpropagate loss.backward() # Clip gradients to prevent gradient explosion # For details, refer to: nn.utils.clip_grad_norm_(model.parameters(), max_norm=5) # Update weights optimizer.step() # Print the status at intervals if counter % print_every == 0: print("Epoch: {}/{}...".format(i+1, epochs), "Step: {}...".format(counter), "Loss: {:.6f}...".format(loss.item())) ```
``` Epoch: 1/2... Step: 1000... Loss: 0.270512... Epoch: 1/2... Step: 2000... Loss: 0.218537... ... Epoch: 2/2... Step: 7000... Loss: 0.163251... Epoch: 2/2... Step: 8000... Loss: 0.203283... ```
If you encounter a
RuntimeError: CUDA out of memory. Tried to allocate ...
error, try reducing thebatch_size
or clearing the GPU cache withtorch.cuda.empty_cache()
After training the model for a while, let’s evaluate its performance:
```python test_losses = [] # Track the losses on the test dataset num_correct = 0 # Track the number of correct predictions h = model.init_hidden(batch_size) # Initialize hidden_state and cell_state model.eval() # Set the model to evaluation mode # Start evaluating the model for inputs, labels in test_loader: h = tuple([ for each in h]) inputs, labels =, output, h = model(inputs, h) test_loss = criterion(output.squeeze(), labels.float()) test_losses.append(test_loss.item()) pred = torch.round(output.squeeze()) # Round predictions to 0 or 1 correct_tensor = pred.eq(labels.float().view_as(pred)) # Calculate correctly predicted data correct = np.squeeze(correct_tensor.cpu().numpy()) num_correct += np.sum(correct) print("Test loss: {:.3f}".format(np.mean(test_losses))) test_acc = num_correct / len(test_loader.dataset) print("Test accuracy: {:.3f}%".format(test_acc * 100)) ```
``` Test loss: 0.179 Test accuracy: 93.151% ```
After training, we achieved over 90% accuracy.
Let’s test it out by defining a predict(sentence)
function that takes in a sentence and outputs the prediction result:
```python def predict(sentence): # Tokenize the sentence and convert each word to its corresponding index sentences = [[word2idx[word.lower()] if word.lower() in word2idx else 0 for word in nltk.word_tokenize(sentence)]] # Pad the sentence to a fixed length of 200 sentences = pad_input(sentences, 200) # Move data to GPU sentences = torch.Tensor(sentences).long().to(device) # Initialize the hidden state h = (torch.Tensor(2, 1, 512).zero_().to(device), torch.Tensor(2, 1, 512).zero_().to(device)) h = tuple([ for each in h]) # Make a prediction if model(sentences, h)[0] >= 0.5: print("positive") else: print("negative") ```
```python predict("The film is so boring") predict("The actor is too ugly.") ```
``` negative negative ```
We tried a couple of sentences, and as you can see, both predictions were correct.
