PyTorch Beginner's Tutorial (4) - Sentiment Analysis of Text Using LSTM
Overview
This article is based on the code from Long Short-Term Memory: From Zero to Hero with PyTorch, with some modifications and added annotations for clarity. The referenced article provides an in-depth introduction to LSTM. For those unfamiliar with LSTM, reading it first might be helpful.
Here, we use an Amazon review dataset to train a classifier that can detect the sentiment of text.
Get Dataset:
``` Link: https://pan.baidu.com/s/1cK-scxLIliTsOPF-6byucQ Access code: yqbq ```
Data Preprocessing
First, import the necessary libraries:
```python import bz2 # For reading bz2 compressed files from collections import Counter # For word frequency statistics import re # Regular expressions import nltk # Text preprocessing import numpy as np ```
Extract the data samples to a "data" directory in the current folder. It should contain two files: "train.ft.txt.bz2" and "test.ft.txt.bz2".
After extraction, read in the training and testing data:
```python train_file = bz2.BZ2File('../data/amazon_reviews/train.ft.txt.bz2') test_file = bz2.BZ2File('../data/amazon_reviews/test.ft.txt.bz2') train_file = train_file.readlines() test_file = test_file.readlines() print(train_file[0]) ```
``` b'__label__2 Stuning even for the non-gamer: This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^\n' ```
As shown, each data entry consists of two parts: Label and Data. Here:
__label__1
represents a negative review, which we will encode as 0.__label__2
represents a positive review, which we will encode as 1.
Due to the large dataset size, we'll use only 1 million records for training. The dataset will be split into an 80:20 ratio for training and testing.
```python num_train = 800000 num_test = 200000 train_file = [x.decode('utf-8') for x in train_file[:num_train]] test_file = [x.decode('utf-8') for x in test_file[:num_test]] ```
In this example, we use
decode('utf-8')
because the source file is stored as binary, which is indicated by theb''
format.
In the source file, data and labels are combined, so we need to separate them:
```python # Encode '__label__1' as 0 (negative review) and '__label__2' as 1 (positive review) train_labels = [0 if x.split(' ')[0] == '__label__1' else 1 for x in train_file] test_labels = [0 if x.split(' ')[0] == '__label__1' else 1 for x in test_file] """ `split(' ', 1)[1]`: Separates label from data and retrieves the data part. `[:-1]`: Removes the last character (\n). `lower()`: Converts to lowercase, as case sensitivity doesn’t aid sentiment analysis and increases encoding complexity. """ train_sentences = [x.split(' ', 1)[1][:-1].lower() for x in train_file] test_sentences = [x.split(' ', 1)[1][:-1].lower() for x in test_file] ```
After separating the data, we perform some basic data cleaning:
Since numbers do not contribute significantly to sentiment classification, we replace all numbers with 0:
```python for i in range(len(train_sentences)): train_sentences[i] = re.sub('\d','0',train_sentences[i]) for i in range(len(test_sentences)): test_sentences[i] = re.sub('\d','0',test_sentences[i]) ```
The dataset also contains samples with website links, such as: Welcome to our website: www.pohabo.com
. Since URLs can interfere with data processing, we replace them with a placeholder: Welcome to our website: <url>
.
```python for i in range(len(train_sentences)): if 'www.' in train_sentences[i] or 'http:' in train_sentences[i] or 'https:' in train_sentences[i] or '.com' in train_sentences[i]: train_sentences[i] = re.sub(r"([^ ]+(?<=\.[a-z]{3}))", "<url>", train_sentences[i]) for i in range(len(test_sentences)): if 'www.' in test_sentences[i] or 'http:' in test_sentences[i] or 'https:' in test_sentences[i] or '.com' in test_sentences[i]: test_sentences[i] = re.sub(r"([^ ]+(?<=\.[a-z]{3}))", "<url>", test_sentences[i]) ```
After completing data cleaning, we need to tokenize the text and discard words that only appear once, as they hold minimal reference value:
```python words = Counter() # Tracks the frequency of each word for i, sentence in enumerate(train_sentences): words_list = nltk.word_tokenize(sentence) # Tokenize the sentence words.update(words_list) # Update the word frequency list train_sentences[i] = words_list # Store the tokenized words list if i%200000 == 0: # Print progress every 200,000 entries print(str((i*100)/num_train) + "% done") print("100% done") ```
``` 0.0% done 25.0% done 50.0% done 75.0% done 100% done ```
Next, we remove words that appear only once:
```python words = {k:v for k,v in words.items() if v>1} ```
We then sort words
in descending order of frequency and convert it to a list, forming our vocabulary. Later, word encoding will be based on this vocabulary:
```python words = sorted(words, key=words.get,reverse=True) print(words[:10]) # Display the 10 most frequent words ```
``` ['.', 'the', ',', 'i', 'and', 'a', 'to', 'it', 'of', 'this'] ```
To the vocabulary, we add a special token:
_PAD
: This token represents padding, as we’ll standardize sentence length. Overly long sentences will be truncated, and shorter ones will be padded with this token.
```python words = ['_PAD'] + words ```
Once the vocabulary is prepared, we proceed with encoding the words by mapping each word to a numeric value. Here, we use each word’s position in the list as its encoded value.
```python word2idx = {o:i for i,o in enumerate(words)} idx2word = {i:o for i,o in enumerate(words)} ```
After preparing the mapping dictionary, we can convert the words in train_sentences
into numerical representations:
```python for i, sentence in enumerate(train_sentences): train_sentences[i] = [word2idx[word] if word in word2idx else 0 for word in sentence] for i, sentence in enumerate(test_sentences): test_sentences[i] = [word2idx[word.lower()] if word.lower() in word2idx else 0 for word in nltk.word_tokenize(sentence)] ```
In the code above,
else 0
indicates that if a word is not found in the dictionary, it’s assigned a code of 0, corresponding to_PAD
as noted earlier.
To facilitate model construction, we need to standardize the length of all sentences. Here, we set a fixed length of 200. Sentences shorter than this will be padded with 0
(_PAD
) at the beginning, while those exceeding this length will be truncated from the end:
```python def pad_input(sentences, seq_len): """ Standardizes sentence length to `seq_len`: truncates sentences that exceed this length from the end, and pads shorter ones with 0 at the beginning. """ features = np.zeros((len(sentences), seq_len),dtype=int) for ii, review in enumerate(sentences): if len(review) != 0: features[ii, -len(review):] = np.array(review)[:seq_len] return features # Standardize the length of sentences in the training and test datasets train_sentences = pad_input(train_sentences, 200) test_sentences = pad_input(test_sentences, 200) ```
In addition to standardizing the length, this function also converts the sequences into numpy arrays. The label datasets need a similar transformation:
```python train_labels = np.array(train_labels) test_labels = np.array(test_labels) ```
At this point, data preprocessing is nearly complete, and it’s time to move on to PyTorch.
Model Building
First, let's import the necessary packages for Pytorch:
```python import torch from torch.utils.data import TensorDataset, DataLoader import torch.nn as nn ```
Next, we’ll set up DataLoaders for both the training and test datasets. Batch size is set to 200:
```python batch_size = 200 train_data = TensorDataset(torch.from_numpy(train_sentences), torch.from_numpy(train_labels)) test_data = TensorDataset(torch.from_numpy(test_sentences), torch.from_numpy(test_labels)) train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size) test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size) ```
If possible, it's recommended to use a GPU to accelerate computations:
```python device = torch.device('cuda') if torch.cuda.is_available() else torch.device("cpu") ```
Now, let’s proceed with building the model:
```python class SentimentNet(nn.Module): def __init__(self, vocab_size): super(SentimentNet, self).__init__() self.n_layers = n_layers = 2 # Number of LSTM layers # Dimension of hidden states, LSTM outputs hidden states of 512 dimensions self.hidden_dim = hidden_dim = 512 embedding_dim = 400 # Encode words as 400-dimensional vectors drop_prob=0.5 # Dropout probability # Define embedding layer to convert integers to vectors self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, # Dimension of input vectors hidden_dim, # Dimension of hidden states度 n_layers, # Number of LSTM layers dropout=drop_prob, batch_first=True # Set first dimension to batch size ) # Fully connected layer following LSTM self.fc = nn.Linear(in_features=hidden_dim, # Output from LSTM as input to this layer out_features=1 # For sentiment analysis, output is binary (0 or 1), hence dimension is 1 ) self.sigmoid = nn.Sigmoid() # Apply sigmoid to the output of the linear layer # Add Dropout to the final fully connected layer self.dropout = nn.Dropout(drop_prob) def forward(self, x, hidden): """ x: Input batch with size (batch_size, 200), where 200 is sentence length hidden: Hidden and cell states from the previous timestep, in the form (h, c) where both h and c have size (n_layers, batch_size, hidden_dim), i.e., (2, 200, 512) """ # The first dimension corresponds to batch size batch_size = x.size(0) # Convert x to LongTensor type as required by embedding layer x = x.long() # Encode x, changing size from (batch_size, 200) to (batch_size, 200, embedding_dim) embeds = self.embedding(x) # Pass encoded vectors and hidden states to LSTM # lstm_out size: (batch_size, 200, 512), where 200 corresponds to the number of words in a sentence # hidden is a tuple (hidden_state, cell_state) of size (2, batch_size, 512) due to the two LSTM layers lstm_out, hidden = self.lstm(embeds, hidden) # Flatten lstm_out for the fully connected layer, changing size to (batch_size * 200, hidden_dim) # Since each word's output passes through the fully connected layer, this effectively sets the batch size to 40000 lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim) # Apply Dropout to the fully connected layer out = self.dropout(lstm_out) # Pass through the fully connected layer # The output size is (40000, 1) out = self.fc(out) # Apply sigmoid out = self.sigmoid(out) # Reshape output to (batch_size, 200) so each word has a corresponding output out = out.view(batch_size, -1) # Only take the output of the last word # Final output size becomes (batch_size, 1) out = out[:,-1] # Return final output and hidden states (h, c) return out, hidden def init_hidden(self, batch_size): """ Initialize hidden states: the first input to LSTM has no previous hidden states, so we initialize with zeros. This is a tuple as LSTM requires both hidden and cell states. """ hidden = (torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device), torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device) ) return hidden ```
Model Definition Completed, Constructing the Model Object:
```python model = SentimentNet(len(words)) model.to(device) ```
``` SentimentNet( (embedding): Embedding(221497, 400) (lstm): LSTM(400, 512, num_layers=2, batch_first=True, dropout=0.5) (fc): Linear(in_features=512, out_features=1, bias=True) (sigmoid): Sigmoid() (dropout): Dropout(p=0.5, inplace=False) ) ```
Next, we define the loss function. Since this is a binary classification problem, we'll use Binary Cross Entropy (BCE):
```python criterion = nn.BCELoss() ```
We'll use the Adam optimizer:
```python lr = 0.005 optimizer = torch.optim.Adam(model.parameters(), lr=lr) ```
Now, let’s define the training code:
```python epochs = 2 # Number of training epochs counter = 0 # Counts training iterations print_every = 1000 # Print status every 1000 iterations for i in range(epochs): h = model.init_hidden(batch_size) # Initialize the first hidden state for inputs, labels in train_loader: # Retrieve a batch of inputs and labels from train_loader counter += 1 # Increment training count # Convert the previous hidden state to a tuple format # Since we use two layers, len(h) == 2 h = tuple([e.data for e in h]) # Move data to GPU inputs, labels = inputs.to(device), labels.to(device) # Zero the model gradients model.zero_grad() # Forward pass with the current inputs and hidden state, # then receive the output and the new hidden state output, h = model(inputs, h) # Calculate loss with the predicted and true labels loss = criterion(output, labels.float()) # Backpropagate loss.backward() # Clip gradients to prevent gradient explosion # For details, refer to: https://blog.csdn.net/zhaohongfei_358/article/details/122820992 nn.utils.clip_grad_norm_(model.parameters(), max_norm=5) # Update weights optimizer.step() # Print the status at intervals if counter % print_every == 0: print("Epoch: {}/{}...".format(i+1, epochs), "Step: {}...".format(counter), "Loss: {:.6f}...".format(loss.item())) ```
``` Epoch: 1/2... Step: 1000... Loss: 0.270512... Epoch: 1/2... Step: 2000... Loss: 0.218537... ... Epoch: 2/2... Step: 7000... Loss: 0.163251... Epoch: 2/2... Step: 8000... Loss: 0.203283... ```
If you encounter a
RuntimeError: CUDA out of memory. Tried to allocate ...
error, try reducing thebatch_size
or clearing the GPU cache withtorch.cuda.empty_cache()
.
After training the model for a while, let’s evaluate its performance:
```python test_losses = [] # Track the losses on the test dataset num_correct = 0 # Track the number of correct predictions h = model.init_hidden(batch_size) # Initialize hidden_state and cell_state model.eval() # Set the model to evaluation mode # Start evaluating the model for inputs, labels in test_loader: h = tuple([each.data for each in h]) inputs, labels = inputs.to(device), labels.to(device) output, h = model(inputs, h) test_loss = criterion(output.squeeze(), labels.float()) test_losses.append(test_loss.item()) pred = torch.round(output.squeeze()) # Round predictions to 0 or 1 correct_tensor = pred.eq(labels.float().view_as(pred)) # Calculate correctly predicted data correct = np.squeeze(correct_tensor.cpu().numpy()) num_correct += np.sum(correct) print("Test loss: {:.3f}".format(np.mean(test_losses))) test_acc = num_correct / len(test_loader.dataset) print("Test accuracy: {:.3f}%".format(test_acc * 100)) ```
``` Test loss: 0.179 Test accuracy: 93.151% ```
After training, we achieved over 90% accuracy.
Let’s test it out by defining a predict(sentence)
function that takes in a sentence and outputs the prediction result:
```python def predict(sentence): # Tokenize the sentence and convert each word to its corresponding index sentences = [[word2idx[word.lower()] if word.lower() in word2idx else 0 for word in nltk.word_tokenize(sentence)]] # Pad the sentence to a fixed length of 200 sentences = pad_input(sentences, 200) # Move data to GPU sentences = torch.Tensor(sentences).long().to(device) # Initialize the hidden state h = (torch.Tensor(2, 1, 512).zero_().to(device), torch.Tensor(2, 1, 512).zero_().to(device)) h = tuple([each.data for each in h]) # Make a prediction if model(sentences, h)[0] >= 0.5: print("positive") else: print("negative") ```
```python predict("The film is so boring") predict("The actor is too ugly.") ```
``` negative negative ```
We tried a couple of sentences, and as you can see, both predictions were correct.
References
Long Short-Term Memory: From Zero to Hero with PyTorch: https://blog.floydhub.com/long-short-term-memory-from-zero-to-hero-with-pytorch/