PyTorch Beginner's Tutorial (4) - Sentiment Analysis of Text Using LSTM
Overview
This article is based on the code from Long Short-Term Memory: From Zero to Hero with PyTorch, with some modifications and added annotations for clarity. The referenced article provides an in-depth introduction to LSTM. For those unfamiliar with LSTM, reading it first might be helpful.
Here, we use an Amazon review dataset to train a classifier that can detect the sentiment of text.
Get Dataset:
``` Link: https://pan.baidu.com/s/1cK-scxLIliTsOPF-6byucQ Access code: yqbq ```
Data Preprocessing
First, import the necessary libraries:
```python import bz2 # For reading bz2 compressed files from collections import Counter # For word frequency statistics import re # Regular expressions import nltk # Text preprocessing import numpy as np ```
Extract the data samples to a "data" directory in the current folder. It should contain two files: "train.ft.txt.bz2" and "test.ft.txt.bz2".
After extraction, read in the training and testing data:
```python
train_file = bz2.BZ2File('../data/amazon_reviews/train.ft.txt.bz2')
test_file = bz2.BZ2File('../data/amazon_reviews/test.ft.txt.bz2')
train_file = train_file.readlines()
test_file = test_file.readlines()
print(train_file[0])
```
``` b'__label__2 Stuning even for the non-gamer: This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^\n' ```
As shown, each data entry consists of two parts: Label and Data. Here:
__label__1represents a negative review, which we will encode as 0.__label__2represents a positive review, which we will encode as 1.
Due to the large dataset size, we'll use only 1 million records for training. The dataset will be split into an 80:20 ratio for training and testing.
```python
num_train = 800000
num_test = 200000
train_file = [x.decode('utf-8') for x in train_file[:num_train]]
test_file = [x.decode('utf-8') for x in test_file[:num_test]]
```
In this example, we use
decode('utf-8')because the source file is stored as binary, which is indicated by theb''format.
In the source file, data and labels are combined, so we need to separate them:
```python
# Encode '__label__1' as 0 (negative review) and '__label__2' as 1 (positive review)
train_labels = [0 if x.split(' ')[0] == '__label__1' else 1 for x in train_file]
test_labels = [0 if x.split(' ')[0] == '__label__1' else 1 for x in test_file]
"""
`split(' ', 1)[1]`: Separates label from data and retrieves the data part.
`[:-1]`: Removes the last character (\n).
`lower()`: Converts to lowercase, as case sensitivity doesn’t aid sentiment analysis and increases encoding complexity.
"""
train_sentences = [x.split(' ', 1)[1][:-1].lower() for x in train_file]
test_sentences = [x.split(' ', 1)[1][:-1].lower() for x in test_file]
```
After separating the data, we perform some basic data cleaning:
Since numbers do not contribute significantly to sentiment classification, we replace all numbers with 0:
```python
for i in range(len(train_sentences)):
train_sentences[i] = re.sub('\d','0',train_sentences[i])
for i in range(len(test_sentences)):
test_sentences[i] = re.sub('\d','0',test_sentences[i])
```
The dataset also contains samples with website links, such as: Welcome to our website: www.pohabo.com. Since URLs can interfere with data processing, we replace them with a placeholder: Welcome to our website: <url>.
```python
for i in range(len(train_sentences)):
if 'www.' in train_sentences[i] or 'http:' in train_sentences[i] or 'https:' in train_sentences[i] or '.com' in train_sentences[i]:
train_sentences[i] = re.sub(r"([^ ]+(?<=\.[a-z]{3}))", "<url>", train_sentences[i])
for i in range(len(test_sentences)):
if 'www.' in test_sentences[i] or 'http:' in test_sentences[i] or 'https:' in test_sentences[i] or '.com' in test_sentences[i]:
test_sentences[i] = re.sub(r"([^ ]+(?<=\.[a-z]{3}))", "<url>", test_sentences[i])
```
After completing data cleaning, we need to tokenize the text and discard words that only appear once, as they hold minimal reference value:
```python
words = Counter() # Tracks the frequency of each word
for i, sentence in enumerate(train_sentences):
words_list = nltk.word_tokenize(sentence) # Tokenize the sentence
words.update(words_list) # Update the word frequency list
train_sentences[i] = words_list # Store the tokenized words list
if i%200000 == 0: # Print progress every 200,000 entries
print(str((i*100)/num_train) + "% done")
print("100% done")
```
``` 0.0% done 25.0% done 50.0% done 75.0% done 100% done ```
Next, we remove words that appear only once:
```python
words = {k:v for k,v in words.items() if v>1}
```
We then sort words in descending order of frequency and convert it to a list, forming our vocabulary. Later, word encoding will be based on this vocabulary:
```python words = sorted(words, key=words.get,reverse=True) print(words[:10]) # Display the 10 most frequent words ```
``` ['.', 'the', ',', 'i', 'and', 'a', 'to', 'it', 'of', 'this'] ```
To the vocabulary, we add a special token:
_PAD: This token represents padding, as we’ll standardize sentence length. Overly long sentences will be truncated, and shorter ones will be padded with this token.
```python words = ['_PAD'] + words ```
Once the vocabulary is prepared, we proceed with encoding the words by mapping each word to a numeric value. Here, we use each word’s position in the list as its encoded value.
```python
word2idx = {o:i for i,o in enumerate(words)}
idx2word = {i:o for i,o in enumerate(words)}
```
After preparing the mapping dictionary, we can convert the words in train_sentences into numerical representations:
```python
for i, sentence in enumerate(train_sentences):
train_sentences[i] = [word2idx[word] if word in word2idx else 0 for word in sentence]
for i, sentence in enumerate(test_sentences):
test_sentences[i] = [word2idx[word.lower()] if word.lower() in word2idx else 0 for word in nltk.word_tokenize(sentence)]
```
In the code above,
else 0indicates that if a word is not found in the dictionary, it’s assigned a code of 0, corresponding to_PADas noted earlier.
To facilitate model construction, we need to standardize the length of all sentences. Here, we set a fixed length of 200. Sentences shorter than this will be padded with 0 (_PAD) at the beginning, while those exceeding this length will be truncated from the end:
```python
def pad_input(sentences, seq_len):
"""
Standardizes sentence length to `seq_len`: truncates sentences that exceed this length from the end, and pads shorter ones with 0 at the beginning.
"""
features = np.zeros((len(sentences), seq_len),dtype=int)
for ii, review in enumerate(sentences):
if len(review) != 0:
features[ii, -len(review):] = np.array(review)[:seq_len]
return features
# Standardize the length of sentences in the training and test datasets
train_sentences = pad_input(train_sentences, 200)
test_sentences = pad_input(test_sentences, 200)
```
In addition to standardizing the length, this function also converts the sequences into numpy arrays. The label datasets need a similar transformation:
```python train_labels = np.array(train_labels) test_labels = np.array(test_labels) ```
At this point, data preprocessing is nearly complete, and it’s time to move on to PyTorch.
Model Building
First, let's import the necessary packages for Pytorch:
```python import torch from torch.utils.data import TensorDataset, DataLoader import torch.nn as nn ```
Next, we’ll set up DataLoaders for both the training and test datasets. Batch size is set to 200:
```python batch_size = 200 train_data = TensorDataset(torch.from_numpy(train_sentences), torch.from_numpy(train_labels)) test_data = TensorDataset(torch.from_numpy(test_sentences), torch.from_numpy(test_labels)) train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size) test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size) ```
If possible, it's recommended to use a GPU to accelerate computations:
```python
device = torch.device('cuda') if torch.cuda.is_available() else torch.device("cpu")
```
Now, let’s proceed with building the model:
```python
class SentimentNet(nn.Module):
def __init__(self, vocab_size):
super(SentimentNet, self).__init__()
self.n_layers = n_layers = 2 # Number of LSTM layers
# Dimension of hidden states, LSTM outputs hidden states of 512 dimensions
self.hidden_dim = hidden_dim = 512
embedding_dim = 400 # Encode words as 400-dimensional vectors
drop_prob=0.5 # Dropout probability
# Define embedding layer to convert integers to vectors
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, # Dimension of input vectors
hidden_dim, # Dimension of hidden states度
n_layers, # Number of LSTM layers
dropout=drop_prob,
batch_first=True # Set first dimension to batch size
)
# Fully connected layer following LSTM
self.fc = nn.Linear(in_features=hidden_dim, # Output from LSTM as input to this layer
out_features=1 # For sentiment analysis, output is binary (0 or 1), hence dimension is 1
)
self.sigmoid = nn.Sigmoid() # Apply sigmoid to the output of the linear layer
# Add Dropout to the final fully connected layer
self.dropout = nn.Dropout(drop_prob)
def forward(self, x, hidden):
"""
x: Input batch with size (batch_size, 200), where 200 is sentence length
hidden: Hidden and cell states from the previous timestep, in the form (h, c)
where both h and c have size (n_layers, batch_size, hidden_dim), i.e., (2, 200, 512)
"""
# The first dimension corresponds to batch size
batch_size = x.size(0)
# Convert x to LongTensor type as required by embedding layer
x = x.long()
# Encode x, changing size from (batch_size, 200) to (batch_size, 200, embedding_dim)
embeds = self.embedding(x)
# Pass encoded vectors and hidden states to LSTM
# lstm_out size: (batch_size, 200, 512), where 200 corresponds to the number of words in a sentence
# hidden is a tuple (hidden_state, cell_state) of size (2, batch_size, 512) due to the two LSTM layers
lstm_out, hidden = self.lstm(embeds, hidden)
# Flatten lstm_out for the fully connected layer, changing size to (batch_size * 200, hidden_dim)
# Since each word's output passes through the fully connected layer, this effectively sets the batch size to 40000
lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim)
# Apply Dropout to the fully connected layer
out = self.dropout(lstm_out)
# Pass through the fully connected layer
# The output size is (40000, 1)
out = self.fc(out)
# Apply sigmoid
out = self.sigmoid(out)
# Reshape output to (batch_size, 200) so each word has a corresponding output
out = out.view(batch_size, -1)
# Only take the output of the last word
# Final output size becomes (batch_size, 1)
out = out[:,-1]
# Return final output and hidden states (h, c)
return out, hidden
def init_hidden(self, batch_size):
"""
Initialize hidden states: the first input to LSTM has no previous hidden states,
so we initialize with zeros. This is a tuple as LSTM requires both hidden and cell states.
"""
hidden = (torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device),
torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device)
)
return hidden
```
Model Definition Completed, Constructing the Model Object:
```python model = SentimentNet(len(words)) model.to(device) ```
``` SentimentNet( (embedding): Embedding(221497, 400) (lstm): LSTM(400, 512, num_layers=2, batch_first=True, dropout=0.5) (fc): Linear(in_features=512, out_features=1, bias=True) (sigmoid): Sigmoid() (dropout): Dropout(p=0.5, inplace=False) ) ```
Next, we define the loss function. Since this is a binary classification problem, we'll use Binary Cross Entropy (BCE):
```python criterion = nn.BCELoss() ```
We'll use the Adam optimizer:
```python lr = 0.005 optimizer = torch.optim.Adam(model.parameters(), lr=lr) ```
Now, let’s define the training code:
```python
epochs = 2 # Number of training epochs
counter = 0 # Counts training iterations
print_every = 1000 # Print status every 1000 iterations
for i in range(epochs):
h = model.init_hidden(batch_size) # Initialize the first hidden state
for inputs, labels in train_loader: # Retrieve a batch of inputs and labels from train_loader
counter += 1 # Increment training count
# Convert the previous hidden state to a tuple format
# Since we use two layers, len(h) == 2
h = tuple([e.data for e in h])
# Move data to GPU
inputs, labels = inputs.to(device), labels.to(device)
# Zero the model gradients
model.zero_grad()
# Forward pass with the current inputs and hidden state,
# then receive the output and the new hidden state
output, h = model(inputs, h)
# Calculate loss with the predicted and true labels
loss = criterion(output, labels.float())
# Backpropagate
loss.backward()
# Clip gradients to prevent gradient explosion
# For details, refer to: https://blog.csdn.net/zhaohongfei_358/article/details/122820992
nn.utils.clip_grad_norm_(model.parameters(), max_norm=5)
# Update weights
optimizer.step()
# Print the status at intervals
if counter % print_every == 0:
print("Epoch: {}/{}...".format(i+1, epochs),
"Step: {}...".format(counter),
"Loss: {:.6f}...".format(loss.item()))
```
``` Epoch: 1/2... Step: 1000... Loss: 0.270512... Epoch: 1/2... Step: 2000... Loss: 0.218537... ... Epoch: 2/2... Step: 7000... Loss: 0.163251... Epoch: 2/2... Step: 8000... Loss: 0.203283... ```
If you encounter a
RuntimeError: CUDA out of memory. Tried to allocate ...error, try reducing thebatch_sizeor clearing the GPU cache withtorch.cuda.empty_cache().
After training the model for a while, let’s evaluate its performance:
```python
test_losses = [] # Track the losses on the test dataset
num_correct = 0 # Track the number of correct predictions
h = model.init_hidden(batch_size) # Initialize hidden_state and cell_state
model.eval() # Set the model to evaluation mode
# Start evaluating the model
for inputs, labels in test_loader:
h = tuple([each.data for each in h])
inputs, labels = inputs.to(device), labels.to(device)
output, h = model(inputs, h)
test_loss = criterion(output.squeeze(), labels.float())
test_losses.append(test_loss.item())
pred = torch.round(output.squeeze()) # Round predictions to 0 or 1
correct_tensor = pred.eq(labels.float().view_as(pred)) # Calculate correctly predicted data
correct = np.squeeze(correct_tensor.cpu().numpy())
num_correct += np.sum(correct)
print("Test loss: {:.3f}".format(np.mean(test_losses)))
test_acc = num_correct / len(test_loader.dataset)
print("Test accuracy: {:.3f}%".format(test_acc * 100))
```
``` Test loss: 0.179 Test accuracy: 93.151% ```
After training, we achieved over 90% accuracy.
Let’s test it out by defining a predict(sentence) function that takes in a sentence and outputs the prediction result:
```python
def predict(sentence):
# Tokenize the sentence and convert each word to its corresponding index
sentences = [[word2idx[word.lower()] if word.lower() in word2idx else 0 for word in nltk.word_tokenize(sentence)]]
# Pad the sentence to a fixed length of 200
sentences = pad_input(sentences, 200)
# Move data to GPU
sentences = torch.Tensor(sentences).long().to(device)
# Initialize the hidden state
h = (torch.Tensor(2, 1, 512).zero_().to(device),
torch.Tensor(2, 1, 512).zero_().to(device))
h = tuple([each.data for each in h])
# Make a prediction
if model(sentences, h)[0] >= 0.5:
print("positive")
else:
print("negative")
```
```python
predict("The film is so boring")
predict("The actor is too ugly.")
```
``` negative negative ```
We tried a couple of sentences, and as you can see, both predictions were correct.
References
Long Short-Term Memory: From Zero to Hero with PyTorch: https://blog.floydhub.com/long-short-term-memory-from-zero-to-hero-with-pytorch/