Pytorch入门实战(7):基于BERT实现文本隐喻二分类(Kaggle入门题目)
本文涉及知识点
本文内容
这是Kaggle上NLP的一个入门题目(链接),任务是对文本进行二分类。内容描述:人们会在Twitter上发布一些内容,这些内容有些是灾难事件,例如“白宫着火了,火焰很大”,这就是一个灾难事件。而有一些虽然也带了相关词汇,却不是灾难事件,例如:”天上那朵云好像燃烧的火焰。“。所以本项目的任务就是区分这两种情况。
数据集可以到Kaggle上下载(链接),或者使用百度网盘下载(链接)
最终可以将你的预测结果上传到Kaggle上查看分数(链接)。
你可以在Github上找到本文的源码(链接)。你也可以直接使用Google Colab来运行代码(Open In Google Colab)
环境配置
本项目使用库版本如下
``` python==3.8.5 pandas==1.3.5 torch==1.11.0 transformers==4.21 ```
导入本文要使用的所有依赖包:
```python import os import pandas import torch from torch import nn from torch.utils.data import Dataset, DataLoader # 用于加载bert模型的分词器 from transformers import AutoTokenizer # 用于加载bert模型 from transformers import AutoModel from pathlib import Path from tqdm.notebook import tqdm ```
全局配置
```python
batch_size = 16
# 文本的最大长度
text_max_length = 128
# 总训练的epochs数,我只是随便定义了个数
epochs = 100
# 取多少训练集的数据作为验证集
validation_ratio = 0.1
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 每多少步,打印一次loss
log_per_step = 50
# 数据集所在位置
dataset_dir = Path("./dataset")
os.makedirs(dataset_dir) if not os.path.exists(dataset_dir) else ''
# 模型存储路径
model_dir = Path("./drive/MyDrive/model/bert_checkpoints")
# 如果模型目录不存在,则创建一个
os.makedirs(model_dir) if not os.path.exists(model_dir) else ''
print("Device:", device)
```
Device: cuda
数据处理
加载数据集
请先下载数据集,并解压到dataset目录下,其中会有train.csv、test.csv和sample_submission.csv三个文件。
使用pandas来加载训练数据,对于训练数据,我们只需要text和target两行:
```python pd_data = pandas.read_csv(dataset_dir / 'train.csv')[['text', 'target']] ```
加载成功后,来看一下内容:
```python pd_data ```
| text | target | |
|---|---|---|
| 0 | Our Deeds are the Reason of this #earthquake M... | 1 |
| 1 | Forest fire near La Ronge Sask. Canada | 1 |
| 2 | All residents asked to 'shelter in place' are ... | 1 |
| 3 | 13,000 people receive #wildfires evacuation or... | 1 |
| 4 | Just got sent this photo from Ruby #Alaska as ... | 1 |
| ... | ... | ... |
| 7608 | Two giant cranes holding a bridge collapse int... | 1 |
| 7609 | @aria_ahrary @TheTawniest The out of control w... | 1 |
| 7610 | M1.94 [01:04 UTC]?5km S of Volcano Hawaii. htt... | 1 |
| 7611 | Police investigating after an e-bike collided ... | 1 |
| 7612 | The Latest: More Homes Razed by Northern Calif... | 1 |
```python pd_validation_data = pd_data.sample(frac=validation_ratio) pd_train_data = pd_data[~pd_data.index.isin(pd_validation_data.index)] ```加载好数据集后,我们就可以开始构建Dataset了,我们这里Dataset就是返回推文和其target:
```python
class MyDataset(Dataset):
def __init__(self, mode='train'):
super(MyDataset, self).__init__()
self.mode = mode
# 拿到对应的数据
if mode == 'train':
self.dataset = pd_train_data
elif mode == 'validation':
self.dataset = pd_validation_data
elif mode == 'test':
# 如果是测试模式,则返回推文和id。拿id做target主要是方便后面写入结果。
self.dataset = pandas.read_csv(dataset_dir / 'test.csv')[['text', 'id']]
else:
raise Exception("Unknown mode {}".format(mode))
def __getitem__(self, index):
# 取第index条
data = self.dataset.iloc[index]
# 取其推文,做个简单的数据清理
source = data['text'].replace("#", "").replace("@", "")
# 取对应的推文
if self.mode == 'test':
# 如果是test,将id做为target
target = data['id']
else:
target = data['target']
# 返回推文和target
return source, target
def __len__(self):
return len(self.dataset)
```
```python
train_dataset = MyDataset('train')
validation_dataset = MyDataset('validation')
```
我们来打印看一下;
```python train_dataset.__getitem__(0) ```
```
('Our Deeds are the Reason of this earthquake May ALLAH Forgive us all', 1)
```
构造好Dataset后,就可以来构造Dataloader了。在构造Dataloader前,我们需要先定义好分词器:
```python
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
```
我们来尝试使用一下分词器:
```python
tokenizer("I'm learning deep learning", return_tensors='pt')
```
```
{'input_ids': tensor([[ 101, 1045, 1005, 1049, 4083, 2784, 4083, 102]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1]])}
```
可以正常运行。其中101表示“开始”(`[CLS]`),102表示句子结束(`[SEP]`)。
我们接着构造我们的Dataloader。我们需要定义一下collate_fn,在其中完成对句子进行编码、填充、组装batch等动作:
```python
def collate_fn(batch):
"""
将一个batch的文本句子转成tensor,并组成batch。
:param batch: 一个batch的句子,例如: [('推文', target), ('推文', target), ...]
:return: 处理后的结果,例如:
src: {'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...])}
target:[1, 1, 0, ...]
"""
text, target = zip(*batch)
text, target = list(text), list(target)
# src是要送给bert的,所以不需要特殊处理,直接用tokenizer的结果即可
# padding='max_length' 不够长度的进行填充
# truncation=True 长度过长的进行裁剪
src = tokenizer(text, padding='max_length', max_length=text_max_length, return_tensors='pt', truncation=True)
return src, torch.LongTensor(target)
```
```python train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn) validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn) ```我们来看一眼train_loader的数据:
```python
inputs, targets = next(iter(train_loader))
print("inputs:", inputs)
print("targets:", targets)
```
```
inputs: {'input_ids': tensor([[ 101, 4911, 1024, ..., 0, 0, 0],
[ 101, 19387, 11113, ..., 0, 0, 0],
[ 101, 2317, 2111, ..., 0, 0, 0],
...,
[ 101, 25595, 10288, ..., 0, 0, 0],
[ 101, 1037, 14700, ..., 0, 0, 0],
[ 101, 12361, 2042, ..., 0, 0, 0]]), 'token_type_ids': tensor([[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
...,
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0],
[0, 0, 0, ..., 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, ..., 0, 0, 0],
[1, 1, 1, ..., 0, 0, 0],
[1, 1, 1, ..., 0, 0, 0],
...,
[1, 1, 1, ..., 0, 0, 0],
[1, 1, 1, ..., 0, 0, 0],
[1, 1, 1, ..., 0, 0, 0]])}
targets: tensor([1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0])
```
# 构建模型
```python
class MyModel(nn.Module):
def __init__(self):
super(MyModel, self).__init__()
# 加载bert模型
self.bert = AutoModel.from_pretrained("bert-base-uncased")
# 最后的预测层
self.predictor = nn.Sequential(
nn.Linear(768, 256),
nn.ReLU(),
nn.Linear(256, 1),
nn.Sigmoid()
)
def forward(self, src):
"""
:param src: 分词后的推文数据
"""
# 将src直接序列解包传入bert,因为bert和tokenizer是一套的,所以可以这么做。
# 得到encoder的输出,用最前面[CLS]的输出作为最终线性层的输入
outputs = self.bert(**src).last_hidden_state[:, 0, :]
# 使用线性层来做最终的预测
return self.predictor(outputs)
```
```python model = MyModel() model = model.to(device) ```
```python model(inputs.to(device)) ```
```
tensor([[0.5121],
[0.5032],
[0.5032],
[0.4913],
[0.4941],
[0.4924],
[0.5204],
[0.4764],
[0.5025],
[0.5145],
[0.4916],
[0.4909],
[0.4891],
[0.5333],
[0.4967],
[0.4951]], device='cuda:0', grad_fn=)
```
# 训练模型
接下来开始正式训练模型,首先定义出损失函数和优化器。因为是二分类问题,用Binary Cross Entropy就行:
```python criteria = nn.BCELoss() optimizer = torch.optim.Adam(model.parameters(), lr=3e-5) ```> 这个学习率是我测试出来的,之前用的`3e-4`,发现怎么都不收敛。看来学习率确实很重要。
```python
# 由于inputs是字典类型的,定义一个辅助函数帮助to(device)
def to_device(dict_tensors):
result_tensors = {}
for key, value in dict_tensors.items():
result_tensors[key] = value.to(device)
return result_tensors
```
定义一个验证方法,获取到验证集的精准率和loss。
```python
def validate():
model.eval()
total_loss = 0.
total_correct = 0
for inputs, targets in validation_loader:
inputs, targets = to_device(inputs), targets.to(device)
outputs = model(inputs)
loss = criteria(outputs.view(-1), targets.float())
total_loss += float(loss)
correct_num = (((outputs >= 0.5).float() * 1).flatten() == targets).sum()
total_correct += correct_num
return total_correct / len(validation_dataset), total_loss / len(validation_dataset)
```
开始训练:
```python
# 首先将模型调成训练模式
model.train()
# 清空一下cuda缓存
if torch.cuda.is_available():
torch.cuda.empty_cache()
# 定义几个变量,帮助打印loss
total_loss = 0.
# 记录步数
step = 0
# 记录在验证集上最好的准确率
best_accuracy = 0
# 开始训练
for epoch in range(epochs):
model.train()
for i, (inputs, targets) in enumerate(train_loader):
# 从batch中拿到训练数据
inputs, targets = to_device(inputs), targets.to(device)
# 传入模型进行前向传递
outputs = model(inputs)
# 计算损失
loss = criteria(outputs.view(-1), targets.float())
loss.backward()
optimizer.step()
optimizer.zero_grad()
total_loss += float(loss)
step += 1
if step % log_per_step == 0:
print("Epoch {}/{}, Step: {}/{}, total loss:{:.4f}".format(epoch+1, epochs, i, len(train_loader), total_loss))
total_loss = 0
del inputs, targets
# 一个epoch后,使用过验证集进行验证
accuracy, validation_loss = validate()
print("Epoch {}, accuracy: {:.4f}, validation loss: {:.4f}".format(epoch+1, accuracy, validation_loss))
torch.save(model, model_dir / f"model_{epoch}.pt")
# 保存最好的模型
if accuracy > best_accuracy:
torch.save(model, model_dir / f"model_best.pt")
best_accuracy = accuracy
```
``` Epoch 1/100, Step: 49/429, total loss:28.4544 Epoch 1/100, Step: 99/429, total loss:22.8545 Epoch 1/100, Step: 149/429, total loss:21.7493 。。。略 Epoch 10/100, Step: 288/429, total loss:3.1754 Epoch 10/100, Step: 338/429, total loss:3.3069 Epoch 10/100, Step: 388/429, total loss:1.8836 Epoch 10, accuracy: 0.8292, validation loss: 0.0561 ```# 模型使用 加载最好的模型,然后按照Kaggle的要求组装csv文件。
```python model = torch.load(model_dir / f"model_best.pt") model = model.eval() ```构造测试集的dataloader。测试集是不包含target的。
```python
test_dataset = MyDataset('test')
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
```
将测试数据送入模型,得到结果,最后组装成Kaggle要求数据结构:
```python
results = []
for inputs, ids in tqdm(test_loader):
outputs = model(inputs.to(device))
outputs = (outputs >= 0.5).int().flatten().tolist()
ids = ids.tolist()
results = results + [(id, result) for result, id in zip(outputs, ids)]
```
```python
with open(dataset_dir / 'results.csv', 'w', encoding='utf-8') as f:
f.write('id,target\n')
for id, result in results:
f.write(f"{id},{result}\n")
print("Finished!")
```
``` Finished! ```拿着结果去Kaggle上试一下吧,看看你能得多少分。我这边跑了10个Epoch,最终得到了0.83573的分数,还行。