GPT(Generative Pre-Training)是OpenAI在2018年发表的一篇论文《Improving Language Understanding by Generative Pre-Training》中提出的框架,本框架通过两个阶段进行任务的实现:第一个阶段是利用语言模型进行预训练(无监督形式),第二阶段通过 Fine-tuning 的模式解决下游任务(监督模式下)。第一个阶段中其实和 Embeddings from Language Models(ELMO)模型 是类似的,区别在于GPT用的特征提取器是Transformer,ELMO用的是RNN;;ELMO使用上下文对单词进行预测,而 GPT 则只采用 Context-before 这个单词的上文来进行预测。第二阶段的下游任务在文献原文中进行了描述如下图:
Improving Language Understanding by Generative Pre-Training https://s3-us-west-2.amazonaws.com/openai-assets/research-covers/language-unsupervised/language_understanding_paper.pdf
我们今天为了可更简单呈现此模型借助 Andrej Karpathy 发布的minGPT进行代码的演示。演示前先需要在Pycharm中配置国内的下载镜像:File》setting》Tools》setting repository
在这里对于minGPT依赖包的安装不进行赘述了,我们需要通过github将minGPT核心代码下载到本地,具体的文件结构如下:
minGPT https://github.com/karpathy/minGPT
其核心代码就是mingpt文件目录下的几个模块,其实在使用的时候只需要将此目录拷贝到自己的项目中即可。接下来我们看下具体的使用:
##载入依赖包
import torch
from torch.utils.data import Dataset
from torch.utils.data.dataloader import DataLoader
from mingpt.utils import set_seed
import pickle
import os
set_seed(3407)
##数据预处理
class SortDataset(Dataset):
"""
Dataset for the Sort problem. E.g. for problem length 6:
Input: 0 0 2 1 0 1 -> Output: 0 0 0 1 1 2
Which will feed into the transformer concatenated as:
input: 0 0 2 1 0 1 0 0 0 1 1
output: I I I I I 0 0 0 1 1 2
where I is "ignore", as the transformer is reading the input sequence
"""
def __init__(self, split, length=6, num_digits=3):
assert split in {'train', 'test'}
self.split = split
self.length = length
self.num_digits = num_digits
def __len__(self):
return 10000 # ...
def get_vocab_size(self):
return self.num_digits
def get_block_size(self):
# the length of the sequence that will feed into transformer,
# containing concatenated input and the output, but -1 because
# the transformer starts making predictions at the last input element
return self.length * 2 - 1
def __getitem__(self, idx):
# use rejection sampling to generate an input example from the desired split
while True:
# generate some random integers
inp = torch.randint(self.num_digits, size=(self.length,), dtype=torch.long)
# half of the time let's try to boost the number of examples that
# have a large number of repeats, as this is what the model seems to struggle
# with later in training, and they are kind of rate
if torch.rand(1).item() < 0.5:
if inp.unique().nelement() > self.length // 2:
# too many unqiue digits, re-sample
continue
# figure out if this generated example is train or test based on its hash
h = hash(pickle.dumps(inp.tolist()))
inp_split = 'test' if h % 4 == 0 else 'train' # designate 25% of examples as test
if inp_split == self.split:
break # ok
# solve the task: i.e. sort
sol = torch.sort(inp)[0]
# concatenate the problem specification and the solution
cat = torch.cat((inp, sol), dim=0)
# the inputs to the transformer will be the offset sequence
x = cat[:-1].clone()
y = cat[1:].clone()
# we only want to predict at output locations, mask out the loss at the input locations
y[:self.length-1] = -1
return x, y
数据预处理函数其实在此包的程序中是现成的。主要的功能就是生成一段长度为6,范围3以内的整数向量,然后通过hsah除于4取余进行数据筛选取1/4的数据作为训练或者测试集。
##创建数据集
train_dataset = SortDataset('train')
test_dataset = SortDataset('test')
##创建模型
# create a GPT instance
from mingpt.model import GPT
model_config = GPT.get_default_config()
model_config.model_type = 'gpt-nano'
model_config.vocab_size = train_dataset.get_vocab_size()
model_config.block_size = train_dataset.get_block_size()
model = GPT(model_config)
##创建训练模型基础参数
from mingpt.trainer import Trainer
train_config = Trainer.get_default_config()
train_config.learning_rate = 5e-4 # the model we're using is so small that we can go a bit faster
train_config.max_iters = 2000
train_config.num_workers = 0
trainer = Trainer(train_config, model, train_dataset)
##训练模型
def batch_end_callback(trainer):
if trainer.iter_num % 100 == 0:
print(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")
trainer.set_callback('on_batch_end', batch_end_callback)
trainer.run()
在这里模型训练提供了一个很好的训练接口trainer,可以直接通过自定义形式输出训练过程中参数信息。在这里当然需要我们知道在trainer中输出参数的意义:
iter_num:迭代的次数
iter_time:迭代开始时间
iter_dt:迭代一次所用时间
loss.item:损失率
##模型评估
def eval_split(trainer, split, max_batches):
dataset = {'train':train_dataset, 'test':test_dataset}[split]
n = train_dataset.length # naugy direct access shrug
results = []
mistakes_printed_already = 0
loader = DataLoader(dataset, batch_size=100, num_workers=0, drop_last=False)
for b, (x, y) in enumerate(loader):
x = x.to(trainer.device)
y = y.to(trainer.device)
# isolate the input pattern alone
inp = x[:, :n]
sol = y[:, -n:]
# let the model sample the rest of the sequence
cat = model.generate(inp, n, do_sample=False) # using greedy argmax, not sampling
sol_candidate = cat[:, n:] # isolate the filled in sequence
# compare the predicted sequence to the true sequence
correct = (sol == sol_candidate).all(1).cpu() # Software 1.0 vs. Software 2.0 fight RIGHT on this line haha
for i in range(x.size(0)):
results.append(int(correct[i]))
if not correct[i] and mistakes_printed_already < 3: # only print up to 5 mistakes to get a sense
mistakes_printed_already += 1
print("GPT claims that %s sorted is %s but gt is %s" % (inp[i].tolist(), sol_candidate[i].tolist(), sol[i].tolist()))
if max_batches is not None and b+1 >= max_batches:
break
rt = torch.tensor(results, dtype=torch.float)
print("%s final score: %d/%d = %.2f%% correct" % (split, rt.sum(), len(results), 100*rt.mean()))
return rt.sum()
评估细节主要是通过eval_split函数对数据进行评估并最后给出评估打分。
##开始评估
with torch.no_grad():
train_score = eval_split(trainer, 'train', max_batches=50)
test_score = eval_split(trainer, 'test', max_batches=50)
当然上面的两个部分其实在训练中需要进行合并使用的,我们可以进行重构使它实现训练和评估自动进行。
##模型的训练评估
def batch_end_callback(trainer):
if trainer.iter_num % 100 == 0:
print(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")
if trainer.iter_num % 500 == 0:
# evaluate both the train and test score
model.eval();
with torch.no_grad():
train_score = eval_split(trainer, 'train', max_batches=50)
test_score = eval_split(trainer, 'test', max_batches=50)
trainer.set_callback('on_batch_end', batch_end_callback)
trainer.run()
再让他更加的自动化,我们可以把我们需要的分数高的模型参数保存在指定的文件夹,我们可以为参数输出添加目录:
##模型训练,评估,存储
top_score = 0
def batch_end_callback(trainer):
work_dir = './out'
global top_score
if trainer.iter_num % 100 == 0:
print(f"iter_dt {trainer.iter_dt * 1000:.2f}ms; iter {trainer.iter_num}: train loss {trainer.loss.item():.5f}")
if trainer.iter_num % 500 == 0:
# evaluate both the train and test score
model.eval();
with torch.no_grad():
train_score = eval_split(trainer, 'train', max_batches=50)
test_score = eval_split(trainer, 'test', max_batches=50)
score = train_score + test_score
# save the model if this is the best score we've seen so far
if score > top_score:
top_score = score
print(f"saving model with new top score of {score}")
ckpt_path = os.path.join(work_dir, "model.pt")
torch.save(model.state_dict(), ckpt_path)
trainer.set_callback('on_batch_end', batch_end_callback)
trainer.run()
这里需要先构建out目录并且添加args.txt, config.json,model.pt三个文件,就可以进行模型训练啦。
##模型的测试
# let's run a random given sequence through the model as well
n = train_dataset.length # naugy direct access shrug
inp = torch.tensor([[0, 0, 2, 1, 0, 1]], dtype=torch.long).to(trainer.device)
assert inp[0].nelement() == n
with torch.no_grad():
cat = model.generate(inp, n, do_sample=False)
sol = torch.sort(inp[0])[0]
sol_candidate = cat[:, n:]
print('input sequence :', inp.tolist())
print('predicted sorted:', sol_candidate.tolist())
print('gt sort :', sol.tolist())
print('matches :', bool((sol == sol_candidate).all()))
至此minGPT的使用基本介绍完成,当然下面就是自己实践需要更换的数据集了,目前是图像文本都有在使用的具体效果还需要通过自己的数据进行评估。