Pytorch入门实战(7):基于BERT实现文本隐喻二分类(Kaggle入门题目)
本文涉及知识点
本文内容
这是Kaggle上NLP的一个入门题目(链接),任务是对文本进行二分类。内容描述:人们会在Twitter上发布一些内容,这些内容有些是灾难事件,例如“白宫着火了,火焰很大”,这就是一个灾难事件。而有一些虽然也带了相关词汇,却不是灾难事件,例如:”天上那朵云好像燃烧的火焰。“。所以本项目的任务就是区分这两种情况。
数据集可以到Kaggle上下载(链接),或者使用百度网盘下载(链接)
最终可以将你的预测结果上传到Kaggle上查看分数(链接)。
你可以在Github上找到本文的源码(链接)。你也可以直接使用Google Colab来运行代码(Open In Google Colab)
环境配置
本项目使用库版本如下
``` python==3.8.5 pandas==1.3.5 torch==1.11.0 transformers==4.21 ```
导入本文要使用的所有依赖包:
```python import os import pandas import torch from torch import nn from torch.utils.data import Dataset, DataLoader # 用于加载bert模型的分词器 from transformers import AutoTokenizer # 用于加载bert模型 from transformers import AutoModel from pathlib import Path from tqdm.notebook import tqdm ```
全局配置
```python batch_size = 16 # 文本的最大长度 text_max_length = 128 # 总训练的epochs数,我只是随便定义了个数 epochs = 100 # 取多少训练集的数据作为验证集 validation_ratio = 0.1 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 每多少步,打印一次loss log_per_step = 50 # 数据集所在位置 dataset_dir = Path("./dataset") os.makedirs(dataset_dir) if not os.path.exists(dataset_dir) else '' # 模型存储路径 model_dir = Path("./drive/MyDrive/model/bert_checkpoints") # 如果模型目录不存在,则创建一个 os.makedirs(model_dir) if not os.path.exists(model_dir) else '' print("Device:", device) ```
Device: cuda
数据处理
加载数据集
请先下载数据集,并解压到dataset
目录下,其中会有train.csv、test.csv和sample_submission.csv三个文件。
使用pandas来加载训练数据,对于训练数据,我们只需要text和target两行:
```python pd_data = pandas.read_csv(dataset_dir / 'train.csv')[['text', 'target']] ```
加载成功后,来看一下内容:
```python pd_data ```
text | target | |
---|---|---|
0 | Our Deeds are the Reason of this #earthquake M... | 1 |
1 | Forest fire near La Ronge Sask. Canada | 1 |
2 | All residents asked to 'shelter in place' are ... | 1 |
3 | 13,000 people receive #wildfires evacuation or... | 1 |
4 | Just got sent this photo from Ruby #Alaska as ... | 1 |
... | ... | ... |
7608 | Two giant cranes holding a bridge collapse int... | 1 |
7609 | @aria_ahrary @TheTawniest The out of control w... | 1 |
7610 | M1.94 [01:04 UTC]?5km S of Volcano Hawaii. htt... | 1 |
7611 | Police investigating after an e-bike collided ... | 1 |
7612 | The Latest: More Homes Razed by Northern Calif... | 1 |
```python pd_validation_data = pd_data.sample(frac=validation_ratio) pd_train_data = pd_data[~pd_data.index.isin(pd_validation_data.index)] ```加载好数据集后,我们就可以开始构建Dataset了,我们这里Dataset就是返回推文和其target:
```python class MyDataset(Dataset): def __init__(self, mode='train'): super(MyDataset, self).__init__() self.mode = mode # 拿到对应的数据 if mode == 'train': self.dataset = pd_train_data elif mode == 'validation': self.dataset = pd_validation_data elif mode == 'test': # 如果是测试模式,则返回推文和id。拿id做target主要是方便后面写入结果。 self.dataset = pandas.read_csv(dataset_dir / 'test.csv')[['text', 'id']] else: raise Exception("Unknown mode {}".format(mode)) def __getitem__(self, index): # 取第index条 data = self.dataset.iloc[index] # 取其推文,做个简单的数据清理 source = data['text'].replace("#", "").replace("@", "") # 取对应的推文 if self.mode == 'test': # 如果是test,将id做为target target = data['id'] else: target = data['target'] # 返回推文和target return source, target def __len__(self): return len(self.dataset) ```
```python train_dataset = MyDataset('train') validation_dataset = MyDataset('validation') ```我们来打印看一下;
```python train_dataset.__getitem__(0) ```
``` ('Our Deeds are the Reason of this earthquake May ALLAH Forgive us all', 1) ```构造好Dataset后,就可以来构造Dataloader了。在构造Dataloader前,我们需要先定义好分词器:
```python tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") ```我们来尝试使用一下分词器:
```python tokenizer("I'm learning deep learning", return_tensors='pt') ```
``` {'input_ids': tensor([[ 101, 1045, 1005, 1049, 4083, 2784, 4083, 102]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1, 1, 1]])} ```可以正常运行。其中101表示“开始”(`[CLS]`),102表示句子结束(`[SEP]`)。 我们接着构造我们的Dataloader。我们需要定义一下collate_fn,在其中完成对句子进行编码、填充、组装batch等动作:
```python def collate_fn(batch): """ 将一个batch的文本句子转成tensor,并组成batch。 :param batch: 一个batch的句子,例如: [('推文', target), ('推文', target), ...] :return: 处理后的结果,例如: src: {'input_ids': tensor([[ 101, ..., 102, 0, 0, ...], ...]), 'attention_mask': tensor([[1, ..., 1, 0, ...], ...])} target:[1, 1, 0, ...] """ text, target = zip(*batch) text, target = list(text), list(target) # src是要送给bert的,所以不需要特殊处理,直接用tokenizer的结果即可 # padding='max_length' 不够长度的进行填充 # truncation=True 长度过长的进行裁剪 src = tokenizer(text, padding='max_length', max_length=text_max_length, return_tensors='pt', truncation=True) return src, torch.LongTensor(target) ```
```python train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn) validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn) ```我们来看一眼train_loader的数据:
```python inputs, targets = next(iter(train_loader)) print("inputs:", inputs) print("targets:", targets) ```
``` inputs: {'input_ids': tensor([[ 101, 4911, 1024, ..., 0, 0, 0], [ 101, 19387, 11113, ..., 0, 0, 0], [ 101, 2317, 2111, ..., 0, 0, 0], ..., [ 101, 25595, 10288, ..., 0, 0, 0], [ 101, 1037, 14700, ..., 0, 0, 0], [ 101, 12361, 2042, ..., 0, 0, 0]]), 'token_type_ids': tensor([[0, 0, 0, ..., 0, 0, 0], [0, 0, 0, ..., 0, 0, 0], [0, 0, 0, ..., 0, 0, 0], ..., [0, 0, 0, ..., 0, 0, 0], [0, 0, 0, ..., 0, 0, 0], [0, 0, 0, ..., 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, ..., 0, 0, 0], [1, 1, 1, ..., 0, 0, 0], [1, 1, 1, ..., 0, 0, 0], ..., [1, 1, 1, ..., 0, 0, 0], [1, 1, 1, ..., 0, 0, 0], [1, 1, 1, ..., 0, 0, 0]])} targets: tensor([1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0]) ```# 构建模型
```python class MyModel(nn.Module): def __init__(self): super(MyModel, self).__init__() # 加载bert模型 self.bert = AutoModel.from_pretrained("bert-base-uncased") # 最后的预测层 self.predictor = nn.Sequential( nn.Linear(768, 256), nn.ReLU(), nn.Linear(256, 1), nn.Sigmoid() ) def forward(self, src): """ :param src: 分词后的推文数据 """ # 将src直接序列解包传入bert,因为bert和tokenizer是一套的,所以可以这么做。 # 得到encoder的输出,用最前面[CLS]的输出作为最终线性层的输入 outputs = self.bert(**src).last_hidden_state[:, 0, :] # 使用线性层来做最终的预测 return self.predictor(outputs) ```
```python model = MyModel() model = model.to(device) ```
```python model(inputs.to(device)) ```
``` tensor([[0.5121], [0.5032], [0.5032], [0.4913], [0.4941], [0.4924], [0.5204], [0.4764], [0.5025], [0.5145], [0.4916], [0.4909], [0.4891], [0.5333], [0.4967], [0.4951]], device='cuda:0', grad_fn=# 训练模型 接下来开始正式训练模型,首先定义出损失函数和优化器。因为是二分类问题,用Binary Cross Entropy就行:) ```
```python criteria = nn.BCELoss() optimizer = torch.optim.Adam(model.parameters(), lr=3e-5) ```> 这个学习率是我测试出来的,之前用的`3e-4`,发现怎么都不收敛。看来学习率确实很重要。
```python # 由于inputs是字典类型的,定义一个辅助函数帮助to(device) def to_device(dict_tensors): result_tensors = {} for key, value in dict_tensors.items(): result_tensors[key] = value.to(device) return result_tensors ```定义一个验证方法,获取到验证集的精准率和loss。
```python def validate(): model.eval() total_loss = 0. total_correct = 0 for inputs, targets in validation_loader: inputs, targets = to_device(inputs), targets.to(device) outputs = model(inputs) loss = criteria(outputs.view(-1), targets.float()) total_loss += float(loss) correct_num = (((outputs >= 0.5).float() * 1).flatten() == targets).sum() total_correct += correct_num return total_correct / len(validation_dataset), total_loss / len(validation_dataset) ```开始训练:
```python # 首先将模型调成训练模式 model.train() # 清空一下cuda缓存 if torch.cuda.is_available(): torch.cuda.empty_cache() # 定义几个变量,帮助打印loss total_loss = 0. # 记录步数 step = 0 # 记录在验证集上最好的准确率 best_accuracy = 0 # 开始训练 for epoch in range(epochs): model.train() for i, (inputs, targets) in enumerate(train_loader): # 从batch中拿到训练数据 inputs, targets = to_device(inputs), targets.to(device) # 传入模型进行前向传递 outputs = model(inputs) # 计算损失 loss = criteria(outputs.view(-1), targets.float()) loss.backward() optimizer.step() optimizer.zero_grad() total_loss += float(loss) step += 1 if step % log_per_step == 0: print("Epoch {}/{}, Step: {}/{}, total loss:{:.4f}".format(epoch+1, epochs, i, len(train_loader), total_loss)) total_loss = 0 del inputs, targets # 一个epoch后,使用过验证集进行验证 accuracy, validation_loss = validate() print("Epoch {}, accuracy: {:.4f}, validation loss: {:.4f}".format(epoch+1, accuracy, validation_loss)) torch.save(model, model_dir / f"model_{epoch}.pt") # 保存最好的模型 if accuracy > best_accuracy: torch.save(model, model_dir / f"model_best.pt") best_accuracy = accuracy ```
``` Epoch 1/100, Step: 49/429, total loss:28.4544 Epoch 1/100, Step: 99/429, total loss:22.8545 Epoch 1/100, Step: 149/429, total loss:21.7493 。。。略 Epoch 10/100, Step: 288/429, total loss:3.1754 Epoch 10/100, Step: 338/429, total loss:3.3069 Epoch 10/100, Step: 388/429, total loss:1.8836 Epoch 10, accuracy: 0.8292, validation loss: 0.0561 ```# 模型使用 加载最好的模型,然后按照Kaggle的要求组装csv文件。
```python model = torch.load(model_dir / f"model_best.pt") model = model.eval() ```构造测试集的dataloader。测试集是不包含target的。
```python test_dataset = MyDataset('test') test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn) ```将测试数据送入模型,得到结果,最后组装成Kaggle要求数据结构:
```python results = [] for inputs, ids in tqdm(test_loader): outputs = model(inputs.to(device)) outputs = (outputs >= 0.5).int().flatten().tolist() ids = ids.tolist() results = results + [(id, result) for result, id in zip(outputs, ids)] ```
```python with open(dataset_dir / 'results.csv', 'w', encoding='utf-8') as f: f.write('id,target\n') for id, result in results: f.write(f"{id},{result}\n") print("Finished!") ```
``` Finished! ```拿着结果去Kaggle上试一下吧,看看你能得多少分。我这边跑了10个Epoch,最终得到了0.83573的分数,还行。