专栏首页Fdu弟中弟文本相似度计算

文本相似度计算

本文介绍文本相似度计算的各种方法,可以广泛应用在基于问答对匹配的问答系统中。

TF-IDF

tfidf_i = tfidf = \frac{词i的数量}{词语总数}log\frac{总文档数}{包含词i的文档数}

其中tf称为词频,idf为逆文档频率。

BM25

BM25(i) = \frac{词i的数量}{总词数}\frac{(k+1)C}{C+k(1-b+b\frac{|d|}{avdl})}log(\frac{总文档数}{包含i的文档数}) \C = tf=\frac{词i的数量}{总词数},k>0,b\in [0,1],d为文档i的长度,avdl是文档平均长度

1-b+b\frac{d}{avdl} 中的b看成0,那么此时中间项的结果为 \frac{(k+1)tf}{k+tf} ,通过设置一个k,就能够保证其最大值为1,达到限制tf过大的目的。

即:

\begin{align}&\frac{(k+1)tf}{k+tf}= \frac{k+1}{1+\frac{k}{tf}} \qquad \qquad \qquad,上下同除tf\end{align}

在一个句子中,某个词重要程度应该是随着词语的数量逐渐衰减的,所以中间项对词频进行了惩罚,随着次数的增加,影响程度的增加会越来越小。通过设置k值,能够保证其最大值为k+1,k往往取值1.2

此外, 1-b+b\frac{d}{avdl} 的作用是用来对文本的长度进行归一化。例如在考虑整个句子的tdidf的时候,如果句子的长度太短,那么计算总的tdidf的值是要比长句子的tdidf的值要低的。所以可以考虑对句子的长度进行归一化处理。

可以看到,当句子的长度越短,1-b+b\frac{|d|}{avdl}的值是越小,作为分母的位置,会让整个第二项越大,从而达到提高短文本句子的BM25的值的效果。当b的值为0,可以禁用归一化, b往往取值0.75 。

fasttext

可以使用fasttext获取词向量,然后对一个句子中的所有词语的词向量进行平均,获取整个句子的向量表示,而且通过参数的控制,能实现N-garm的效果。

假设我们有文本a.txt如下:

我 很 喜欢 她 
今天 天气 不错
我 爱 深度学习

那么我们可以实现获取句子向量的方法如下:

import fasttext.FastText as fasttext

#训练模型,设置n-garm=2
model = fasttext.train_unsupervised(input="a.txt", epoch=20, minCount=1, wordNgrams=2)
#获取句子向量,是对词向量的平均
print(model.get_sentence_vector("我 是 谁"))

默认生成的句向量是100维的。

pysparnn

pysparnn 使用的是一种 cluster pruning(簇修剪) 的技术,开始的时候对数据进行聚类,后续再有限个类别中进行数据的搜索,根据计算的余弦相似度返回结果。

数据预处理过程如下:

  1. 随机选择 \sqrt{N} 个样本作为leader
  2. 选择非leader的数据(follower),使用余弦相似度计算找到最近的leader

当获取到一个问题q的时候,查询过程:

  1. 计算每个leader和q的相似度,找到最相似的leader
  2. 然后计算问题q和leader所在簇的相似度,找到最相似的k个,作为最终的返回结果

代码如下:

import pysparnn.cluster_index as ci
from sklearn.feature_extraction.text import TfidfVectorizer

#1. 原始数据
data = [
    'hello world',
    'oh hello there',
    'Play it',
    'Play it again Sam',
]  

#2. 原始数据向量化
tv = TfidfVectorizer()
tv.fit(data)

features_vec = tv.transform(data)

# 原始数据构造索引
cp = ci.MultiClusterIndex(features_vec, data)

# 待搜索的数据向量化
search_data = [
    'oh there',
    'Play it again Frank'
]

search_features_vec = tv.transform(search_data)

#3. 索引中传入带搜索数据,返回结果
print(cp.search(search_features_vec, k=2, k_clusters=2, return_distance=True))

此外,还可以设置两个大于0的数字b1和b2

  • b1表示在数据预处理阶段,每个follower选择b1个最相似的leader,而不是选择单独一个leader,这样不同的簇是有数据交叉的
  • b2表示在查询阶段,找到最相似的b2个leader,然后再计算不同的leader中下的topk的结果

通过增加b1和b2的值,我们能够有更大的机会找到更好的结果,但是这样会需要更加大量的计算。

ci.MultiClusterIndex(features, records_data, num_indexes) 中, num_indexes 能够设置b1的值,默认为2。

在搜索的过程中, cp.search(search_vec, k=8, k_clusters=10, return_distance=True,num_indexes)num_Indexes 可以设置b2的值,默认等于b1的值。

孪生网络

孪生神经网络由两个共享权值的网络的组成,通过两个输入,被DNN进行编码,得到向量的表示之后,根据实际的用途来制定损失函数。比如我们需要计算相似度的时候,可以使用余弦相似度,或者使用 exp^{-||h^{left}-h^{right}||} 来确定向量的距离。孪生神经网络被用于有多个输入和一个输出的场景,比如手写字体识别、文本相似度检验、人脸识别等。

模型架构大致如下:

import torch.nn.functional as F
import torch.nn as nn
import config
import torch


class SiameseNetwork(nn.Module):
    def __init__(self):
        super(SiameseNetwork, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=len(config.sort_ws),
                                      embedding_dim=300,
                                      padding_idx=config.sort_ws.PAD)
        self.gru1 = nn.GRU(input_size=300,
                          hidden_size=config.sort_hidden_size,
                          num_layers=config.sort_num_layers,
                          bidirectional=config.bidirectional,
                          batch_first=True)
        self.gru2 = nn.GRU(input_size=config.sort_hidden_size * 8,
                           hidden_size=config.sort_hidden_size,
                           num_layers=1,
                           batch_first=True,
                           bidirectional=False)
        self.fc = nn.Sequential(
            nn.BatchNorm1d(config.sort_hidden_size * 2),

            nn.Linear(config.sort_hidden_size * 2, config.sort_hidden_size),
            nn.ELU(inplace=True),
            nn.BatchNorm1d(config.sort_hidden_size),
            nn.Dropout(config.sort_dropout),

            nn.Linear(config.sort_hidden_size, config.sort_hidden_size),
            nn.ELU(inplace=True),
            nn.BatchNorm1d(config.sort_hidden_size),
            nn.Dropout(config.sort_dropout),

            nn.Linear(config.sort_hidden_size, 2),
        )

    def forward(self, input1, input2):
        mask1, mask2 = input1.eq(config.sort_ws.PAD), input2.eq(config.sort_ws.PAD)
        input1 = self.embedding(input1) # [batch_size, max_len, 300]
        input2 = self.embedding(input2)
        # output [batch_size, max_len, hidden_size*num_layer]
        # hidden [numlayer*2, batch_size, hidden_size]
        output1, _ = self.gru1(input1)
        output2, _ = self.gru1(input2)
        output1_align, output2_align = self.sort_attention_align(output1, output2, mask1, mask2)
        q1_combined = torch.cat([output1, output1_align, self.submul(output1, output1_align)], dim=-1) # [batch_size, max_len, hidden_size*8]
        q2_combined = torch.cat([output2, output2_align, self.submul(output2, output2_align)], dim=-1)

        # batch_size * seq_len * (1 * hidden_size)
        q1_compose, _ = self.gru2(q1_combined)
        q2_compose, _ = self.gru2(q2_combined)

        # 进行Aggregate操作,也就是进行pooling
        # input: batch_size * seq_len * (1 * hidden_size)
        # output: batch_size * (1 * hidden_size)
        q1_rep = self.apply_pooling(q1_compose)
        q2_rep = self.apply_pooling(q2_compose)

        # Concate合并到一起,用来进行计算相似度
        out = torch.cat([q1_rep, q2_rep], dim=-1) # batch_size * (2 * hidden_size)
        out = self.fc(out) # batch_size * 2
        return out

    def submul(self, x1, x2):
        mul = x1 * x2
        sub = x1 - x2
        return torch.cat([sub, mul], dim=-1)

    def apply_pooling(self, output):
        avg_pooled = F.avg_pool1d(output.transpose(1, 2), kernel_size=output.size(1)).squeeze(-1)
        max_pooled = F.max_pool1d(output.transpose(1, 2), kernel_size=output.size(1)).squeeze(-1)
        return avg_pooled + max_pooled

    def sort_attention_align(self, x1, x2, mask1, mask2):
        '''
        x1: batch_size * seq_len_1 * hidden_size
        x2: batch_size * seq_len_2 * hidden_size
        mask1:x1中pad的位置为1,其他为0
        mask2:x2中pad的位置为1,其他为0
        '''
        # attention: batch_size * seq_len_1 * seq_len_2
        attention_weight = torch.matmul(x1, x2.transpose(1, 2))
        # mask1 : batch_size,seq_len1
        mask1 = mask1.float().masked_fill_(mask1, float('-inf'))
        # mask2 : batch_size,seq_len2
        mask2 = mask2.float().masked_fill_(mask2, float('-inf'))

        # weight: batch_size * seq_len_1 * seq_len_2
        weight1 = F.softmax(attention_weight + mask2.unsqueeze(1), dim=-1)
        # batch_size*seq_len_1*hidden_size
        x1_align = torch.matmul(weight1, x2)

        # 同理,需要对attention_weight进行permute操作
        weight2 = F.softmax(attention_weight.transpose(1, 2) + mask1.unsqueeze(1), dim=-1)
        x2_align = torch.matmul(weight2, x1)

        return x1_align, x2_align

BERT

也可以使用BERT进行文本相似度计算,这里使用 https://github.com/IAdmireu/ChineseSTS/blob/master/simtrain_to05sts.txt 数据集,两个句子的相似度范围从0到5。然后就和上篇文章中的方法一样,转化为文本分类问题就ok了。

import torch
import time 
import torch.nn as nn
import torch.nn.functional as F 
from transformers import BertModel, BertTokenizer
import pandas as pd 
import numpy as np 
from tqdm import tqdm 
from torch.utils.data import *
import os


os.environ["CUDA_VISIBLE_DEVICES"] = "1"
path = "./"
bert_path = "hfl/chinese-roberta-wwm-ext"
tokenizer = BertTokenizer(vocab_file="vocab.txt")  # 初始化分词器
input_ids = []     # input char ids
input_types = []   # segment ids
input_masks = []   # attention mask
label = []         # 标签
pad_size = 64      # 也称为 max_len 
 
with open(path + "train.txt", 'r', encoding='utf-8') as f:
    for i, l in tqdm(enumerate(f)): 
        x1, x2, y = l.strip().split('\t')
        x1 = tokenizer.tokenize(x1)
        x2 = tokenizer.tokenize(x2)
        tokens = ["[CLS]"] + x1 + ["[SEP]"] + x2 + ["[SEP]"]
        
        # 得到input_id, seg_id, att_mask
        ids = tokenizer.convert_tokens_to_ids(tokens)
        types = [0] * len(ids)
        masks = [1] * len(ids)
        # 短则补齐,长则切断
        if len(ids) < pad_size:
            types = types + [1] * (pad_size - len(ids))  # mask部分 segment置为1
            masks = masks + [0] * (pad_size - len(ids))
            ids = ids + [0] * (pad_size - len(ids))
        else:
            types = types[:pad_size]
            masks = masks[:pad_size]
            ids = ids[:pad_size]
        input_ids.append(ids)
        input_types.append(types)
        input_masks.append(masks)
        assert len(ids) == len(masks) == len(types) == pad_size
        label.append([int(float(y))])

# 随机打乱索引
random_order = list(range(len(input_ids)))
np.random.seed(2020)   # 固定种子
np.random.shuffle(random_order)

# 4:1 划分训练集和测试集
input_ids_train = np.array([input_ids[i] for i in random_order[:int(len(input_ids)*0.8)]])
input_types_train = np.array([input_types[i] for i in random_order[:int(len(input_ids)*0.8)]])
input_masks_train = np.array([input_masks[i] for i in random_order[:int(len(input_ids)*0.8)]])
y_train = np.array([label[i] for i in random_order[:int(len(input_ids) * 0.8)]])
print(input_ids_train.shape, input_types_train.shape, input_masks_train.shape, y_train.shape)

input_ids_test = np.array([input_ids[i] for i in random_order[int(len(input_ids)*0.8):]])
input_types_test = np.array([input_types[i] for i in random_order[int(len(input_ids)*0.8):]])
input_masks_test = np.array([input_masks[i] for i in random_order[int(len(input_ids)*0.8):]])
y_test = np.array([label[i] for i in random_order[int(len(input_ids) * 0.8):]])
print(input_ids_test.shape, input_types_test.shape, input_masks_test.shape, y_test.shape)

BATCH_SIZE = 64
train_data = TensorDataset(torch.LongTensor(input_ids_train), 
                           torch.LongTensor(input_types_train), 
                           torch.LongTensor(input_masks_train), 
                           torch.LongTensor(y_train))
train_sampler = RandomSampler(train_data)  
train_loader = DataLoader(train_data, sampler=train_sampler, batch_size=BATCH_SIZE)
test_data = TensorDataset(torch.LongTensor(input_ids_test), 
                          torch.LongTensor(input_types_test), 
                          torch.LongTensor(input_masks_test),
                          torch.LongTensor(y_test))
test_sampler = SequentialSampler(test_data)
test_loader = DataLoader(test_data, sampler=test_sampler, batch_size=BATCH_SIZE)

class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.bert = BertModel.from_pretrained(bert_path)  # /bert_pretrain/
        for param in self.bert.parameters():
            param.requires_grad = True  # 每个参数都要 求梯度
        self.fc = nn.Linear(768, 6)   # 768 -> 6
    def forward(self, x): # (ids, seq_len, mask)
        context = x[0]  # 输入的句子   
        types = x[1]
        mask = x[2]  # 对padding部分进行mask,和句子相同size,padding部分用0表示,如:[1, 1, 1, 1, 0, 0]
        _, pooled = self.bert(context, token_type_ids=types, attention_mask=mask)
        # print(_.shape, pooled.shape) # torch.Size([128, 32, 768]) torch.Size([128, 768])
        # print(_[0,0] == pooled[0]) # False 注意是不一样的 pooled再加了一层dense和activation
        out = self.fc(pooled)   # 得到6分类
        return out

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Model().to(DEVICE)
print(model) 

# param_optimizer = list(model.named_parameters())  # 模型参数名字列表
# no_decay = ['bias', 'LayerNorm.bias', 'LayerNorm.weight']
# optimizer_grouped_parameters = [
#     {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)], 'weight_decay': 0.01},
#     {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)], 'weight_decay': 0.0}]
optimizer = torch.optim.Adam(model.parameters(), lr=2e-5)
NUM_EPOCHS = 1
def train(model, device, train_loader, optimizer, epoch):   # 训练模型
    model.train()
    best_acc = 0.0 
    for batch_idx, (x1, x2, x3, y) in enumerate(train_loader):
        start_time = time.time()
        x1, x2, x3, y = x1.to(device), x2.to(device), x3.to(device), y.to(device)
        y_pred = model([x1, x2, x3])  # 得到预测结果
        optimizer.zero_grad()             # 梯度清零
        loss = F.cross_entropy(y_pred, y.squeeze())  # 得到loss
        loss.backward()
        optimizer.step()
        if(batch_idx + 1) % 100 == 0:    # 打印loss
            print('Train Epoch: {} [{}/{} ({:.2f}%)]\tLoss: {:.6f}'.format(epoch, (batch_idx+1) * len(x1), 
                                                                           len(train_loader.dataset),
                                                                           100. * batch_idx / len(train_loader), 
                                                                           loss.item()))  # 记得为loss.item()
def test(model, device, test_loader):    # 测试模型, 得到测试集评估结果
    model.eval()
    test_loss = 0.0 
    acc = 0 
    for batch_idx, (x1, x2, x3, y) in enumerate(test_loader):
        x1, x2, x3, y = x1.to(device), x2.to(device), x3.to(device), y.to(device)
        with torch.no_grad():
            y_ = model([x1,x2,x3])
        test_loss += F.cross_entropy(y_, y.squeeze())
        pred = y_.max(-1, keepdim=True)[1]   # .max(): 2输出,分别为最大值和最大值的index
        acc += pred.eq(y.view_as(pred)).sum().item()    # 记得加item()
    test_loss /= len(test_loader)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)'.format(
          test_loss, acc, len(test_loader.dataset),
          100. * acc / len(test_loader.dataset)))
    return acc / len(test_loader.dataset)

best_acc = 0.0 
PATH = 'roberta_model.pth'  # 定义模型保存路径
for epoch in range(1, NUM_EPOCHS+1):  # 1个epoch
    train(model, DEVICE, train_loader, optimizer, epoch)
    acc = test(model, DEVICE, test_loader)
    if best_acc < acc: 
        best_acc = acc 
        torch.save(model.state_dict(), PATH)  # 保存最优模型
    print("acc is: {:.4f}, best acc is {:.4f}\n".format(acc, best_acc))

model.load_state_dict(torch.load(PATH))
acc = test(model, DEVICE, test_loader)

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • python文本相似度计算

    步骤 分词、去停用词 词袋模型向量化文本 TF-IDF模型向量化文本 LSI模型向量化文本 计算相似度 理论知识...

    机器学习AI算法工程
  • python文本相似度计算

    步骤 分词、去停用词 词袋模型向量化文本 TF-IDF模型向量化文本 LSI模型向量化文本 计算相似度 理论知识 ...

    小莹莹
  • python文本相似度计算

    两篇中文文本,如何计算相似度?相似度是数学上的概念,自然语言肯定无法完成,所有要把文本转化为向量。两个向量计算相似度就很简单了,欧式距离、余弦相似度等等各种方法...

    周小董
  • 中文文本相似度计算工具集

    作者 | fendouai 编辑 | 磐石 出品 | 磐创AI技术团队 ---- 【磐创AI导读】:前两篇文章中我们介绍了一些机器学习不错的项目合集和深度学习入...

    磐创AI
  • python专业方向 | 文本相似度计算

    步骤 1、分词、去停用词 2、词袋模型向量化文本 3、TF-IDF模型向量化文本 4、LSI模型向量化文本 5、计算相似度 理论知识 两篇中文文本,如何计算相似...

    用户1332428
  • Kaggle知识点:文本相似度计算方法

    文本相似度是指衡量两个文本的相似程度,相似程度的评价有很多角度:单纯的字面相似度(例如:我和他 v.s. 我和她),语义的相似度(例如:爸爸 v.s. 父亲)和...

    Coggle数据科学
  • 最准的中文文本相似度计算工具

    text2vec, chinese text to vetor.(文本向量化表示工具,包括词向量化、句子向量化)

    机器学习AI算法工程
  • 【NLP实战】基于ALBERT的文本相似度计算

    实战是学习一门技术最好的方式,也是深入了解一门技术唯一的方式。因此,NLP专栏推出了实战专栏,让有兴趣的同学在看文章之余也可以自己动手试一试。

    用户1508658
  • BERT中文实战:文本相似度计算与文本分类

    谷歌提供了以下几个版本的BERT模型,每个模型的参数都做了简单的说明,中文的预训练模型在11月3日的时候提供了,这里我们只需要用到中文的版本

    大数据技术与机器学习

扫码关注云+社区

领取腾讯云代金券