Pytorch入门实战(4):基于LSTM实现文本的情感分析
本文涉及知识点
详解torch.nn.utils.clip_grad_norm_ 的使用与原理
本文内容
本文基于文章Long Short-Term Memory: From Zero to Hero with PyTorch的代码,对该文章代码进行了一些修改和注释添加。该文章详细的介绍了LSTM,如果对LSTM不熟悉的朋友,可以先看下改文章。
本文使用的亚马逊评论数据集,训练一个可以判别文本情感的分类器。
数据集如下:
``` 链接:https://pan.baidu.com/s/1cK-scxLIliTsOPF-6byucQ 提取码:yqbq ```
------开冲--------
数据预处理
首先导入要使用的包:
```python import bz2 # 用于读取bz2压缩文件 from collections import Counter # 用于统计词频 import re # 正则表达式 import nltk # 文本预处理 import numpy as np ```
将数据样本解压到当前目录的data目录下,其中包含两个文件:train.ft.txt.bz2”和“test.ft.txt.bz2”
解压后,读取训练数据和测试数据:
```python train_file = bz2.BZ2File('../data/amazon_reviews/train.ft.txt.bz2') test_file = bz2.BZ2File('../data/amazon_reviews/test.ft.txt.bz2') train_file = train_file.readlines() test_file = test_file.readlines() print(train_file[0]) ```
b'__label__2 Stuning even for the non-gamer: This sound track was beautiful! It paints the senery in your mind so well I would recomend it even to people who hate vid. game music! I have played the game Chrono Cross but out of all of the games I have ever played it has the best music! It backs away from crude keyboarding and takes a fresher step with grate guitars and soulful orchestras. It would impress anyone who cares to listen! ^_^\n'
从上面打印的数据可以看到,每条数据由两部分组成,Label和Data。其中:
__label__1
代表差评,之后将其编码为0__label__2
代表好评,之后将其编码为1
由于数据量太大,所以这里只取100w条记录进行训练,训练集和测试集按照8:2进行拆分:
```python num_train = 800000 num_test = 200000 train_file = [x.decode('utf-8') for x in train_file[:num_train]] test_file = [x.decode('utf-8') for x in test_file[:num_test]] ```
这里使用decode('utf-8')是因为源文件是以二进制类型存储的,从上面的
b''
可以看出
源文件中,数据和标签是在一起的,所以要将其拆分开:
```python # 将__label__1编码为0(差评),__label__2编码为1(好评) train_labels = [0 if x.split(' ')[0] == '__label__1' else 1 for x in train_file] test_labels = [0 if x.split(' ')[0] == '__label__1' else 1 for x in test_file] """ `split(' ', 1)[1]`:将label和data分开后,获取data部分 `[:-1]`:去掉最后一个字符(\n) `lower()`: 将其转换为小写,因为区分大小写对情感识别帮助不大,且会增加编码难度 """ train_sentences = [x.split(' ', 1)[1][:-1].lower() for x in train_file] test_sentences = [x.split(' ', 1)[1][:-1].lower() for x in test_file] ```
在对数据拆分后,对数据进行简单的数据清理:
由于数字对情感分类帮助不大,所以这里将所有的数字都转换为0:
```python for i in range(len(train_sentences)): train_sentences[i] = re.sub('\d','0',train_sentences[i]) for i in range(len(test_sentences)): test_sentences[i] = re.sub('\d','0',test_sentences[i]) ```
数据集中还存在包含网站的样本,例如:Welcome to our website: www.pohabo.com
。对于这种带有网站的样本,网站地址会干扰数据处理,所以一律处理成:Welcome to our website: <url>
:
```python for i in range(len(train_sentences)): if 'www.' in train_sentences[i] or 'http:' in train_sentences[i] or 'https:' in train_sentences[i] or '.com' in train_sentences[i]: train_sentences[i] = re.sub(r"([^ ]+(?<=\.[a-z]{3}))", "", train_sentences[i]) for i in range(len(test_sentences)): if 'www.' in test_sentences[i] or 'http:' in test_sentences[i] or 'https:' in test_sentences[i] or '.com' in test_sentences[i]: test_sentences[i] = re.sub(r"([^ ]+(?<=\.[a-z]{3}))", " ", test_sentences[i]) ```
数据清理结束后,我们需要将文本进行分词,并将仅出现一次的单词丢掉,因为它们参考价值不大:
```python words = Counter() # 用于统计每个单词出现的次数 for i, sentence in enumerate(train_sentences): words_list = nltk.word_tokenize(sentence) # 将句子进行分词 words.update(words_list) # 更新词频列表 train_sentences[i] = words_list # 分词后的单词列表存在该列表中 if i%200000 == 0: # 没20w打印一次进度 print(str((i*100)/num_train) + "% done") print("100% done") ```
0.0% done
25.0% done
50.0% done
75.0% done
100% done
移除仅出现一次的单词:
```python words = {k:v for k,v in words.items() if v>1} ```
将words按照出现次数由大到小排序,并转换为list,作为我们的词典,之后对于单词的编码会基于该词典:
```python words = sorted(words, key=words.get,reverse=True) print(words[:10]) # 打印一下出现次数最多的10个单词 ```
['.', 'the', ',', 'i', 'and', 'a', 'to', 'it', 'of', 'this']
向词典中增加一个单词:
_PAD
:表示填充,因为后续会固定所有句子长度。过长的句子进行阶段,过短的句子使用该单词进行填充
```python words = ['_PAD'] + words ```
整理好词典后,对单词进行编码,即将单词映射成数字,这里直接使用单词所在的数字下表作为单词的编码值:
```python word2idx = {o:i for i,o in enumerate(words)} idx2word = {i:o for i,o in enumerate(words)} ```
映射字典准备完毕后,就可以将train_sentences
中存储的单词转化为数字了:
```python for i, sentence in enumerate(train_sentences): train_sentences[i] = [word2idx[word] if word in word2idx else 0 for word in sentence] for i, sentence in enumerate(test_sentences): test_sentences[i] = [word2idx[word.lower()] if word.lower() in word2idx else 0 for word in nltk.word_tokenize(sentence)] ```
上面的
else 0
表示:如果单词没有在字典中出现过,则使用编码0,对应上面的_PAD
为了方便构建模型,需要固定所有句子的长度,这里选择200作为句子的固定长度,对于长度不够的句子,在前面填充0
(_PAD
),超出长度的句子进行从后面截断:
```python def pad_input(sentences, seq_len): """ 将句子长度固定为`seq_len`,超出长度的从后面阶段,长度不足的在前面补0 """ features = np.zeros((len(sentences), seq_len),dtype=int) for ii, review in enumerate(sentences): if len(review) != 0: features[ii, -len(review):] = np.array(review)[:seq_len] return features # 固定测试数据集和训练数据集的句子长度 train_sentences = pad_input(train_sentences, 200) test_sentences = pad_input(test_sentences, 200) ```
上述方法除了固定长度外,还顺便将数字转化为了numpy数组。Label数据集也需要转换一下:
```python train_labels = np.array(train_labels) test_labels = np.array(test_labels) ```
到这里,数据预处理的工作基本完成,接下来该PyTorch登场了
模型构建
首先导出Pytorch需要用到的包
```python import torch from torch.utils.data import TensorDataset, DataLoader import torch.nn as nn ```
构建训练数据集和测试数据集的DataLoader,同时定义BatchSize为200:
```python batch_size = 200 train_data = TensorDataset(torch.from_numpy(train_sentences), torch.from_numpy(train_labels)) test_data = TensorDataset(torch.from_numpy(test_sentences), torch.from_numpy(test_labels)) train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size) test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size) ```
如果有条件,建议使用显卡来加速计算:
```python device = torch.device('cuda') if torch.cuda.is_available() else torch.device("cpu") ```
接下来开始构建模型:
```python class SentimentNet(nn.Module): def __init__(self, vocab_size): super(SentimentNet, self).__init__() self.n_layers = n_layers = 2 # LSTM的层数 self.hidden_dim = hidden_dim = 512 # 隐状态的维度,即LSTM输出的隐状态的维度为512 embedding_dim = 400 # 将单词编码成400维的向量 drop_prob=0.5 # dropout # 定义embedding,负责将数字编码成向量,详情可参考:https://blog.csdn.net/zhaohongfei_358/article/details/122809709 self.embedding = nn.Embedding(vocab_size, embedding_dim) self.lstm = nn.LSTM(embedding_dim, # 输入的维度 hidden_dim, # LSTM输出的hidden_state的维度 n_layers, # LSTM的层数 dropout=drop_prob, batch_first=True # 第一个维度是否是batch_size ) # LSTM结束后的全连接线性层 self.fc = nn.Linear(in_features=hidden_dim, # 将LSTM的输出作为线性层的输入 out_features=1 # 由于情感分析只需要输出0或1,所以输出的维度是1 ) self.sigmoid = nn.Sigmoid() # 线性层输出后,还需要过一下sigmoid # 给最后的全连接层加一个Dropout self.dropout = nn.Dropout(drop_prob) def forward(self, x, hidden): """ x: 本次的输入,其size为(batch_size, 200),200为句子长度 hidden: 上一时刻的Hidden State和Cell State。类型为tuple: (h, c), 其中h和c的size都为(n_layers, batch_size, hidden_dim), 即(2, 200, 512) """ # 因为一次输入一组数据,所以第一个维度是batch的大小 batch_size = x.size(0) # 由于embedding只接受LongTensor类型,所以将x转换为LongTensor类型 x = x.long() # 对x进行编码,这里会将x的size由(batch_size, 200)转化为(batch_size, 200, embedding_dim) embeds = self.embedding(x) # 将编码后的向量和上一时刻的hidden_state传给LSTM,并获取本次的输出和隐状态(hidden_state, cell_state) # lstm_out的size为 (batch_size, 200, 512),200是单词的数量,由于是一个单词一个单词送给LSTM的,所以会产生与单词数量相同的输出 # hidden为tuple(hidden_state, cell_state),它们俩的size都为(2, batch_size, 512), 2是由于lstm有两层。由于是所有单词都是共享隐状态的,所以并不会出现上面的那个200 lstm_out, hidden = self.lstm(embeds, hidden) # 接下来要过全连接层,所以size变为(batch_size * 200, hidden_dim), # 之所以是batch_size * 200=40000,是因为每个单词的输出都要经过全连接层。 # 换句话说,全连接层的batch_size为40000 lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim) # 给全连接层加个Dropout out = self.dropout(lstm_out) # 将dropout后的数据送给全连接层 # 全连接层输出的size为(40000, 1) out = self.fc(out) # 过一下sigmoid out = self.sigmoid(out) # 将最终的输出数据维度变为 (batch_size, 200),即每个单词都对应一个输出 out = out.view(batch_size, -1) # 只去最后一个单词的输出 # 所以out的size会变为(200, 1) out = out[:,-1] # 将输出和本次的(h, c)返回 return out, hidden def init_hidden(self, batch_size): """ 初始化隐状态:第一次送给LSTM时,没有隐状态,所以要初始化一个 这里的初始化策略是全部赋0。 这里之所以是tuple,是因为LSTM需要接受两个隐状态hidden state和cell state """ hidden = (torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device), torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(device) ) return hidden ```
模型定义完毕,构建模型对象:
```python model = SentimentNet(len(words)) model.to(device) ```
SentimentNet(
(embedding): Embedding(221497, 400)
(lstm): LSTM(400, 512, num_layers=2, batch_first=True, dropout=0.5)
(fc): Linear(in_features=512, out_features=1, bias=True)
(sigmoid): Sigmoid()
(dropout): Dropout(p=0.5, inplace=False)
)
接下来定义损失函数,由于是二分类问题,所以使用交叉熵(Binary Cross Entropy,BCE):
```python criterion = nn.BCELoss() ```
优化器选用Adam优化器:
```python lr = 0.005 optimizer = torch.optim.Adam(model.parameters(), lr=lr) ```
接下来定义训练代码:
```python epochs = 2 # 一共训练两轮 counter = 0 # 用于记录训练次数 print_every = 1000 # 每1000次打印一下当前状态 for i in range(epochs): h = model.init_hidden(batch_size) # 初始化第一个Hidden_state for inputs, labels in train_loader: # 从train_loader中获取一组inputs和labels counter += 1 # 训练次数+1 # 将上次输出的hidden_state转为tuple格式 # 因为有两次,所以len(h)==2 h = tuple([e.data for e in h]) # 将数据迁移到GPU inputs, labels = inputs.to(device), labels.to(device) # 清空模型梯度 model.zero_grad() # 将本轮的输入和hidden_state送给模型,进行前向传播, # 然后获取本次的输出和新的hidden_state output, h = model(inputs, h) # 将预测值和真实值送给损失函数计算损失 loss = criterion(output, labels.float()) # 进行反向传播 loss.backward() # 对模型进行裁剪,防止模型梯度爆炸 # 详情请参考:https://blog.csdn.net/zhaohongfei_358/article/details/122820992 nn.utils.clip_grad_norm_(model.parameters(), max_norm=5) # 更新权重 optimizer.step() # 隔一定次数打印一下当前状态 if counter%print_every == 0: print("Epoch: {}/{}...".format(i+1, epochs), "Step: {}...".format(counter), "Loss: {:.6f}...".format(loss.item())) ```
Epoch: 1/2... Step: 1000... Loss: 0.270512...
Epoch: 1/2... Step: 2000... Loss: 0.218537...
Epoch: 1/2... Step: 3000... Loss: 0.152510...
Epoch: 1/2... Step: 4000... Loss: 0.172654...
Epoch: 2/2... Step: 5000... Loss: 0.164501...
Epoch: 2/2... Step: 6000... Loss: 0.213740...
Epoch: 2/2... Step: 7000... Loss: 0.163251...
Epoch: 2/2... Step: 8000... Loss: 0.203283...
如果这里抛出了
RuntimeError: CUDA out of memory. Tried to allocate ...
异常,可以将batch_size调小,或者清空gpu中的缓存(torch.cuda.empty_cache()
)
经过一段时间的训练,现在来评估一下模型的性能:
```python test_losses = [] # 记录测试数据集的损失 num_correct = 0 # 记录正确预测的数量 h = model.init_hidden(batch_size) # 初始化hidden_state和cell_state model.eval() # 将模型调整为评估模式 # 开始评估模型 for inputs, labels in test_loader: h = tuple([each.data for each in h]) inputs, labels = inputs.to(device), labels.to(device) output, h = model(inputs, h) test_loss = criterion(output.squeeze(), labels.float()) test_losses.append(test_loss.item()) pred = torch.round(output.squeeze()) # 将模型四舍五入为0和1 correct_tensor = pred.eq(labels.float().view_as(pred)) # 计算预测正确的数据 correct = np.squeeze(correct_tensor.cpu().numpy()) num_correct += np.sum(correct) print("Test loss: {:.3f}".format(np.mean(test_losses))) test_acc = num_correct/len(test_loader.dataset) print("Test accuracy: {:.3f}%".format(test_acc*100)) ```
Test loss: 0.179
Test accuracy: 93.151%
最终,经过训练后,可以得到90%以上的准确率。
我们来实际尝试一下,定义一个predict(sentence)
函数,输入一个句子,输出其预测结果:
```python def predict(sentence): # 将句子分词后,转换为数字 sentences = [[word2idx[word.lower()] if word.lower() in word2idx else 0 for word in nltk.word_tokenize(sentence)]] # 将句子变为固定长度200 sentences = pad_input(sentences, 200) # 将数据移到GPU中 sentences = torch.Tensor(sentences).long().to(device) # 初始化隐状态 h = (torch.Tensor(2, 1, 512).zero_().to(device), torch.Tensor(2, 1, 512).zero_().to(device)) h = tuple([each.data for each in h]) # 预测 if model(sentences, h)[0] >= 0.5: print("positive") else: print("negative") ```
```python predict("The film is so boring") predict("The actor is too ugly.") ```
negative
negative
我们随便弄了两个句子,可以看到,都预测对了
参考资料
Long Short-Term Memory: From Zero to Hero with PyTorch: https://blog.floydhub.com/long-short-term-memory-from-zero-to-hero-with-pytorch/