Transformer 对IMDB进行文本情感分类 (基于Pytorch的保姆级教程,无预训练模型,从头搭建transformer)
保姆级的基于pytorch的transformer实现,包括数据处理
·
Transformer 对IMDB进行文本情感分类 (基于Pytorch的保姆级教程,无预训练模型,从头搭建transformer)
编写的起因来自于网上大部分的blog要么只介绍了transformer的架构,但是缺乏数据处理的部分;要么实现的库过于陈旧以至于经常报错no module called***;再者就是基于Hungging face 提供的预训练模型,缺乏对tranformer内部架构的展现;还有的就是输出展示都没有不知道能不能跑通。
在查询了一些代码以及论文之后决定编写这篇blog帮助大家入门transformer文本分类任务,手把手保姆级,基于jupyter notebook,每个代码块可以在jupyter notebook中跑通。
其中transformer模型的编写参考地址来自pytorch搭建transformer文本分类
ok, 让我们开始吧
数据下载
首先下载目标IMDB数据集IMDB数据集地址
解压打开是一个目录结构长这样的文件
定义配置
首先定义训练中需要配置的参数
import numpy as np
import torch
from torch import nn, optim
import torch.nn.functional as F
from torchtext import data
import math
import time
from torch.autograd import Variable
import copy
import random
from torch import device
class Config(object):
"""配置参数"""
def __init__(self):
self.model_name = 'Transformer'
self.embedding_pretrained = None # 预训练词向量
self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') # 设备
self.dropout = 0.5 # 随机失活
self.num_classes = 2 # 类别数
self.num_epochs = 200 # epoch数
self.batch_size = 20 # mini-batch大小
self.pad_size = 500 # 每句话处理成的长度(短填长切)
self.n_vocab = None#这里需要读取数据的部分进行赋值
self.learning_rate = 5e-4 # 学习率
self.embed = 300 # 词向量维度
self.dim_model = 300
self.hidden = 1024
self.last_hidden = 512
self.num_head = 5
self.num_encoder = 2
self.checkpoint_path = './model.ckpt'
编写dataLoader
import collections
import torchtext
import os
import random
import torch
from torchtext.vocab import vocab, GloVe
from tqdm import tqdm
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, TensorDataset
from torch import device
torch.manual_seed(1234)
class ImdbDataset(Dataset):
def __init__(
self, folder_path="./aclImdb", is_train=True, is_small=False
) -> None:
super().__init__()
self.data, self.labels = self.read_dataset(folder_path, is_train, is_small)
# 读取数据
def read_dataset(
self,
folder_path,
is_train,
small
):
data, labels = [], []
for label in ("pos", "neg"):
folder_name = os.path.join(
folder_path, "train" if is_train else "test", label
)
for file in tqdm(os.listdir(folder_name)):
with open(os.path.join(folder_name, file), "rb") as f:
text = f.read().decode("utf-8").replace("\n", "").lower()
data.append(text)
labels.append(1 if label == "pos" else 0)
# random.shuffle(data)
# random.shuffle(labels)
# 小样本训练,仅用于本机验证
return data, labels
def __len__(self):
return len(self.data)
def __getitem__(self, index):
return self.data[index], int(self.labels[index])
def get_data(self):
return self.data
def get_labels(self):
return self.labels
def get_tokenized(data):
"""获取数据集的词元列表"""
def tokenizer(text):
return [tok.lower() for tok in text.split(" ")]
return [tokenizer(review) for review in data]
def get_vocab(data):
"""获取数据集的词汇表"""
tokenized_data = get_tokenized(data)
counter = collections.Counter([tk for st in tokenized_data for tk in st])
# 将min_freq设置为5,确保仅包括至少出现5次的单词
vocab_freq = {"<UNK>": 0, "<PAD>": 1}
# 添加满足词频条件的单词到词汇表,并分配索引
for word, freq in counter.items():
if freq >= 5:
vocab_freq[word] = len(vocab_freq)
# 构建词汇表对象并返回
return vocab(vocab_freq)
def preprocess_imdb(train_data, vocab,config):
"""数据预处理,将数据转换成神经网络的输入形式"""
max_l = config.pad_size # 将每条评论通过截断或者补0,使得长度变成500
def pad(x):
return x[:max_l] if len(x) > max_l else x + [1] * (max_l - len(x))
labels = train_data.get_labels()
tokenized_data = get_tokenized(train_data.get_data())
vocab_dict = vocab.get_stoi()
features = torch.tensor(
[pad([vocab_dict.get(word, 0) for word in words]) for words in tokenized_data]
)
labels = torch.tensor([label for label in labels])
return features, labels
def load_data(config):
"""加载数据集"""
train_data = ImdbDataset(folder_path="./aclImdb", is_train=True)
test_data = ImdbDataset(folder_path="./aclImdb", is_train=False)
print("输出第一句话:")
print(train_data.__getitem__(1))
vocab = get_vocab(train_data.get_data())
train_set = TensorDataset(*preprocess_imdb(train_data, vocab,config))
print("输出第一句话字典编码表示以及序列长度:")
print(train_set.__getitem__(1),train_set.__getitem__(1)[0].shape)
# 20%作为验证集
# train_set, valid_set = torch.utils.data.random_split(
# train_set, [int(len(train_set) * 0.8), int(len(train_set) * 0.2)]
# )
test_set = TensorDataset(*preprocess_imdb(test_data, vocab,config))
print(f"训练集大小{train_set.__len__()}")
print(f"测试集大小{test_set.__len__()}")
print(f"词表中单词个数:{len(vocab)}")
train_iter = DataLoader(
train_set, batch_size=config.batch_size, shuffle=True, num_workers=0
)
# valid_iter = DataLoader(valid_set, batch_size)
test_iter = DataLoader(test_set, config.batch_size)
return train_iter, test_iter, vocab
# train_data = ImdbDataset(is_train=True )
# test_data = ImdbDataset(is_train=False)
# vocab = get_vocab(train_data.get_data())
# print(f"词表中单词个数:{len(vocab)}")
# len_vocab=len(vocab)
# train_set = TensorDataset(*preprocess_imdb(train_data, vocab))
# test_set = TensorDataset(*preprocess_imdb(test_data, vocab))
# train_dataloader = DataLoader(
# train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=0
# )
# test_dataloader = DataLoader(test_set, batch_size=BATCH_SIZE)
# load_data(config=Config())
transformer模型代码编写
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import copy
'''Attention Is All You Need'''
class Model(nn.Module):
def __init__(self, config):
super(Model, self).__init__()
if config.embedding_pretrained is not None:
self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)
else:
self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1)
self.postion_embedding = Positional_Encoding(config.embed, config.pad_size, config.dropout, config.device)
self.encoder = Encoder(config.dim_model, config.num_head, config.hidden, config.dropout)
self.encoders = nn.ModuleList([
copy.deepcopy(self.encoder)
# Encoder(config.dim_model, config.num_head, config.hidden, config.dropout)
for _ in range(config.num_encoder)])
self.fc1 = nn.Linear(config.pad_size * config.dim_model, config.num_classes)
# self.fc2 = nn.Linear(config.last_hidden, config.num_classes)
# self.fc1 = nn.Linear(config.dim_model, config.num_classes)
def forward(self, x):
out = self.embedding(x)
#return out
out = self.postion_embedding(out)
for encoder in self.encoders:
out = encoder(out)
out = out.view(out.size(0), -1)
# out = torch.mean(out, 1)
out = self.fc1(out)
return out
class Encoder(nn.Module):
def __init__(self, dim_model, num_head, hidden, dropout):
super(Encoder, self).__init__()
self.attention = Multi_Head_Attention(dim_model, num_head, dropout)
self.feed_forward = Position_wise_Feed_Forward(dim_model, hidden, dropout)
def forward(self, x):
out = self.attention(x)
out = self.feed_forward(out)
return out
class Positional_Encoding(nn.Module):
def __init__(self, embed, pad_size, dropout, device):
super(Positional_Encoding, self).__init__()
self.device = device
self.pe = torch.tensor([[pos / (10000.0 ** (i // 2 * 2.0 / embed)) for i in range(embed)] for pos in range(pad_size)])
self.pe[:, 0::2] = np.sin(self.pe[:, 0::2])
self.pe[:, 1::2] = np.cos(self.pe[:, 1::2])
self.dropout = nn.Dropout(dropout)
def forward(self, x):
out = x + nn.Parameter(self.pe, requires_grad=False).to(self.device)
out = self.dropout(out)
return out
class Scaled_Dot_Product_Attention(nn.Module):
'''Scaled Dot-Product Attention '''
def __init__(self):
super(Scaled_Dot_Product_Attention, self).__init__()
def forward(self, Q, K, V, scale=None):
'''
Args:
Q: [batch_size, len_Q, dim_Q]
K: [batch_size, len_K, dim_K]
V: [batch_size, len_V, dim_V]
scale: 缩放因子 论文为根号dim_K
Return:
self-attention后的张量,以及attention张量
'''
attention = torch.matmul(Q, K.permute(0, 2, 1))
if scale:
attention = attention * scale
# if mask: # TODO change this
# attention = attention.masked_fill_(mask == 0, -1e9)
attention = F.softmax(attention, dim=-1)
context = torch.matmul(attention, V)
return context
class Multi_Head_Attention(nn.Module):
def __init__(self, dim_model, num_head, dropout=0.0):
super(Multi_Head_Attention, self).__init__()
self.num_head = num_head
assert dim_model % num_head == 0
self.dim_head = dim_model // self.num_head
self.fc_Q = nn.Linear(dim_model, num_head * self.dim_head)
self.fc_K = nn.Linear(dim_model, num_head * self.dim_head)
self.fc_V = nn.Linear(dim_model, num_head * self.dim_head)
self.attention = Scaled_Dot_Product_Attention()
self.fc = nn.Linear(num_head * self.dim_head, dim_model)
self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(dim_model)
def forward(self, x):
batch_size = x.size(0)
Q = self.fc_Q(x)
K = self.fc_K(x)
V = self.fc_V(x)
Q = Q.view(batch_size * self.num_head, -1, self.dim_head)
K = K.view(batch_size * self.num_head, -1, self.dim_head)
V = V.view(batch_size * self.num_head, -1, self.dim_head)
# if mask: # TODO
# mask = mask.repeat(self.num_head, 1, 1) # TODO change this
scale = K.size(-1) ** -0.5 # 缩放因子
context = self.attention(Q, K, V, scale)
context = context.view(batch_size, -1, self.dim_head * self.num_head)
out = self.fc(context)
out = self.dropout(out)
out = out + x # 残差连接
out = self.layer_norm(out)
return out
class Position_wise_Feed_Forward(nn.Module):
def __init__(self, dim_model, hidden, dropout=0.0):
super(Position_wise_Feed_Forward, self).__init__()
self.fc1 = nn.Linear(dim_model, hidden)
self.fc2 = nn.Linear(hidden, dim_model)
self.dropout = nn.Dropout(dropout)
self.layer_norm = nn.LayerNorm(dim_model)
def forward(self, x):
out = self.fc1(x)
out = F.relu(out)
out = self.fc2(out)
out = self.dropout(out)
out = out + x # 残差连接
out = self.layer_norm(out)
return out
训练
#---------------------------------------------------
import pandas as pd
from collections import Counter
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import torch
from torch import autograd
import os
from tqdm import tqdm
# 预先定义配置
config = Config()
train_data,test_data,vocabs_size = load_data(config)#加载数据
config.n_vocab = len(vocabs_size) + 1#补充词表大小,词表一定要多留出来一个
model = Model(config)#调用transformer的编码器
model.cuda()
optimizer = torch.optim.Adam(model.parameters(),lr=config.learning_rate)
criterion = nn.CrossEntropyLoss()#多分类的任务
batch_size=config.batch_size
# 记录训练过程的数据
epoch_loss_values = []
metric_values = []
best_acc = 0.0
for epoch in range(config.num_epochs):
train_acc = 0.0
train_loss = 0.0
val_acc = 0.0
val_loss = 0.0
# training
model.train()
for i,train_idx in enumerate(tqdm(train_data)):
features, labels = train_idx
features = features.cuda()
labels = labels.cuda()
optimizer.zero_grad()
outputs = model(features)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
_, train_pred = torch.max(outputs, 1) # get the index of the class with the highest probability
train_acc += (train_pred.detach() == labels.detach()).sum().item()
train_loss += loss.item()
model.eval() # set the model to evaluation mode
with torch.no_grad():
for i, batch in enumerate(tqdm(test_data)):
features, labels = batch
features = features.cuda()
labels = labels.cuda()
outputs = model(features)
loss = criterion(outputs, labels)
_, val_pred = torch.max(outputs, 1)
val_acc += (val_pred.cpu() == labels.cpu()).sum().item() # get the index of the class with the highest probability
val_loss += loss.item()
print(f'训练信息:[{epoch+1:03d}/{config.num_epochs:03d}] Train Acc: {train_acc/25000:3.5f} Loss: {train_loss/len(train_data):3.5f} | Val Acc: {val_acc/25000:3.5f} loss: {val_loss/len(test_data):3.5f}')
epoch_loss_values.append(train_loss/len(train_data))
metric_values.append(val_acc/25000)
if val_acc > best_acc:
best_acc = val_acc
torch.save(model.state_dict(), config.checkpoint_path)
print(f'saving model with acc {best_acc/25000:.5f}')
结果展示
数据读取和处理结果
训练过程
输出训练过程的信息
观察输出训练中的数据发现:在训练到大约第50轮的时候产生过拟合现象,故第100个epoch的时候停止训练,并保存了在验证集上表现最好的模型参数
# 画出训练过程中的损失曲线以及准确率曲线
import matplotlib.pyplot as plt
plt.figure("train", (12, 6))
plt.subplot(1, 2, 1)
plt.title("Iteration Average Loss")
x = [ (i + 1) for i in range(len(epoch_loss_values))]
y = epoch_loss_values
plt.xlabel("Iteration")
plt.plot(x, y)
plt.subplot(1, 2, 2)
plt.title("Val Mean Dice")
x = [(i + 1) for i in range(len(metric_values))]
y = metric_values
plt.xlabel("Iteration")
plt.plot(x, y)
plt.show()
更多推荐
所有评论(0)