FM算法全称为因子分解机 (FactorizationMachine)。
它是广告和推荐领域非常著名的算法,在线性回归模型上考虑了特征的二阶交互。
适合捕捉大规模稀疏特征(类别特征)当中的特征交互。
FM及其衍生的一些较有名的算法的简要介绍如下:
参考文章:张俊林《FFM及DeepFFM模型在推荐系统的探索》https://zhuanlan.zhihu.com/p/67795161
公众号后台回复关键词:FM,获取本文全部源代码和数据集。
FM模型的表达形式如下:
其中 前两项与 线性回归一致。
交互项的简化计算类似于
可以看到交互项的计算复杂度已经变成
了
因此 FM的模型形式也可以改写成:
注意到
这是FM面对稀疏特征具有很强泛化性的原因。
考虑一个典型的给用户推荐商品的推荐场景中,用户所在城市特征和商品类目特征的交互。
葫芦岛是一个小城市,渔网是一种小众商品。它们都是稀疏特征,绝大部分样本在这两个onehot位上的取值都是0.
稀疏乘以稀疏更加稀疏,所以在训练样本中可能根本不存在葫芦岛城市的用户购买渔网这样的样本。
但是只要训练样本中存在着葫芦岛的用户购买其它商品这样的样本,也存在其他城市用户购买渔网这样的样本,FM模型就可以给葫芦岛市的用户购买渔网的可能性作出一个估计,这个值可能不小,最后甚至会给葫芦岛的用户推荐渔网。
这就是FM面对稀疏特征具有很强泛化性的一个例子。
下面是FM模型的一个完整pytorch实现。
注意的是,我们代码中的embedding向量或者线性层作用结果实际上是 的结果。这是许多读者包括我在学习FM时候感到困惑的一个地方。
import torch
from torch import nn
from torch import nn,Tensor
import torch.nn.functional as F
class NumEmbedding(nn.Module):
"""
连续特征用linear层编码
输入shape: [batch_size,features_num(n), d_in], # d_in 通常是1
输出shape: [batch_size,features_num(n), d_out]
"""
def __init__(self, n: int, d_in: int, d_out: int, bias: bool = False) -> None:
super().__init__()
self.weight = nn.Parameter(Tensor(n, d_in, d_out))
self.bias = nn.Parameter(Tensor(n, d_out)) if bias else None
with torch.no_grad():
for i in range(n):
layer = nn.Linear(d_in, d_out)
self.weight[i] = layer.weight.T
if self.bias is not None:
self.bias[i] = layer.bias
def forward(self, x_num):
# x_num: batch_size, features_num, d_in
assert x_num.ndim == 3
#x = x_num[..., None] * self.weight[None]
#x = x.sum(-2)
x = torch.einsum("bfi,fij->bfj",x_num,self.weight)
if self.bias is not None:
x = x + self.bias[None]
return x
class CatEmbedding(nn.Module):
"""
离散特征用Embedding层编码
输入shape: [batch_size,features_num],
输出shape: [batch_size,features_num, d_embed]
"""
def __init__(self, categories, d_embed):
super().__init__()
self.embedding = torch.nn.Embedding(sum(categories), d_embed)
self.offsets = nn.Parameter(
torch.tensor([0] + categories[:-1]).cumsum(0),requires_grad=False)
torch.nn.init.xavier_uniform_(self.embedding.weight.data)
def forward(self, x_cat):
"""
:param x_cat: Long tensor of size ``(batch_size, features_num)``
"""
x = x_cat + self.offsets[None]
return self.embedding(x)
class CatLinear(nn.Module):
"""
离散特征用Embedding实现线性层(等价于先F.onehot再nn.Linear())
输入shape: [batch_size,features_num],
输出shape: [batch_size,features_num, d_out]
"""
def __init__(self, categories, d_out=1):
super().__init__()
self.fc = nn.Embedding(sum(categories), d_out)
self.bias = nn.Parameter(torch.zeros((d_out,)))
self.offsets = nn.Parameter(
torch.tensor([0] + categories[:-1]).cumsum(0),requires_grad=False)
def forward(self, x_cat):
"""
:param x: Long tensor of size ``(batch_size, features_num)``
"""
x = x_cat + self.offsets[None]
return torch.sum(self.fc(x), dim=1) + self.bias
class FMLayer(nn.Module):
"""
FM交互项
"""
def __init__(self, reduce_sum=True):
super().__init__()
self.reduce_sum = reduce_sum
def forward(self, x): #注意:这里的x是公式中的 <v_i> * xi
"""
:param x: Float tensor of size ``(batch_size, num_features, k)``
"""
square_of_sum = torch.sum(x, dim=1) ** 2
sum_of_square = torch.sum(x ** 2, dim=1)
ix = square_of_sum - sum_of_square
if self.reduce_sum:
ix = torch.sum(ix, dim=1, keepdim=True)
return 0.5 * ix
class FM(nn.Module):
"""
完整FM模型。
"""
def __init__(self, d_numerical, categories=None, d_embed=4,
n_classes = 1):
super().__init__()
if d_numerical is None:
d_numerical = 0
if categories is None:
categories = []
self.categories = categories
self.n_classes = n_classes
self.num_linear = nn.Linear(d_numerical,n_classes) if d_numerical else None
self.cat_linear = CatLinear(categories,n_classes) if categories else None
self.num_embedding = NumEmbedding(d_numerical,1,d_embed) if d_numerical else None
self.cat_embedding = CatEmbedding(categories, d_embed) if categories else None
if n_classes==1:
self.fm = FMLayer(reduce_sum=True)
self.fm_linear = None
else:
assert n_classes>=2
self.fm = FMLayer(reduce_sum=False)
self.fm_linear = nn.Linear(d_embed,n_classes)
def forward(self, x):
"""
x_num: numerical features
x_cat: category features
"""
x_num,x_cat = x
#linear部分
x = 0.0
if self.num_linear:
x = x + self.num_linear(x_num)
if self.cat_linear:
x = x + self.cat_linear(x_cat)
#交叉项部分
x_embedding = []
if self.num_embedding:
x_embedding.append(self.num_embedding(x_num[...,None]))
if self.cat_embedding:
x_embedding.append(self.cat_embedding(x_cat))
x_embedding = torch.cat(x_embedding,dim=1)
if self.n_classes==1:
x = x + self.fm(x_embedding)
x = x.squeeze(-1)
else:
x = x + self.fm_linear(self.fm(x_embedding))
return x
##测试 NumEmbedding
num_embedding = NumEmbedding(2,1,4)
x_num = torch.randn(2,2)
x_out = (num_embedding(x_num.unsqueeze(-1)))
print(x_out.shape)
##测试 CatEmbedding
cat_embedding = CatEmbedding(categories = [3,2,2],d_embed=4)
x_cat = torch.randint(0,2,(2,3))
x_out = cat_embedding(x_cat)
print(x_cat.shape)
print(x_out.shape)
##测试 CatLinear
cat_linear = CatLinear(categories = [3,2,2],d_out=1)
x_cat = torch.randint(0,2,(2,3))
x_out = cat_linear(x_cat)
print(x_cat.shape)
print(x_out.shape)
##测试 FMLayer
fm_layer = FMLayer(reduce_sum=False)
x = torch.randn(2,3,4)
x_out = fm_layer(x)
print(x_out.shape)
##测试 FM
fm = FM(d_numerical = 3, categories = [4,3,2],
d_embed = 4,n_classes = 2)
self = fm
x_num = torch.randn(2,3)
x_cat = torch.randint(0,2,(2,3))
fm((x_num,x_cat))
Cretio数据集是一个经典的广告点击率CTR预测数据集。
这个数据集的目标是通过用户特征和广告特征来预测某条广告是否会为用户点击。
数据集有13维数值特征(I1~I13)和26维类别特征(C14~C39), 共39维特征, 特征中包含着许多缺失值。
训练集4000万个样本,测试集600万个样本。数据集大小超过100G.
此处使用的是采样100万个样本后的cretio_small数据集。
#!pip install -U torchkeras -i https://pypi.org/simple/
import numpy as np
import pandas as pd
import datetime
from sklearn.model_selection import train_test_split
import torch
from torch import nn
from torch.utils.data import Dataset,DataLoader
import torch.nn.functional as F
import torchkeras
def printlog(info):
nowtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("\n"+"=========="*8 + "%s"%nowtime)
print(info+'...\n\n')
from sklearn.preprocessing import LabelEncoder,QuantileTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
dfdata = pd.read_csv("../data/criteo_small.zip",sep="\t",header=None)
dfdata.columns = ["label"] + ["I"+str(x) for x in range(1,14)] + [
"C"+str(x) for x in range(14,40)]
cat_cols = [x for x in dfdata.columns if x.startswith('C')]
num_cols = [x for x in dfdata.columns if x.startswith('I')]
num_pipe = Pipeline(steps = [('impute',SimpleImputer()),('quantile',QuantileTransformer())])
for col in cat_cols:
dfdata[col] = LabelEncoder().fit_transform(dfdata[col])
dfdata[num_cols] = num_pipe.fit_transform(dfdata[num_cols])
categories = [dfdata[col].max()+1 for col in cat_cols]
dfdata
import torch
from torch.utils.data import Dataset,DataLoader
#DataFrame转换成torch数据集Dataset, 特征分割成X_num,X_cat方式
class DfDataset(Dataset):
def __init__(self,df,
label_col,
num_features,
cat_features,
categories,
is_training=True):
self.X_num = torch.tensor(df[num_features].values).float() if num_features else None
self.X_cat = torch.tensor(df[cat_features].values).long() if cat_features else None
self.Y = torch.tensor(df[label_col].values).float()
self.categories = categories
self.is_training = is_training
def __len__(self):
return len(self.Y)
def __getitem__(self,index):
if self.is_training:
return ((self.X_num[index],self.X_cat[index]),self.Y[index])
else:
return (self.X_num[index],self.X_cat[index])
def get_categories(self):
return self.categories
dftrain_val,dftest = train_test_split(dfdata,test_size=0.2)
dftrain,dfval = train_test_split(dftrain_val,test_size=0.2)
ds_train = DfDataset(dftrain,label_col = "label",num_features = num_cols,cat_features = cat_cols,
categories = categories, is_training=True)
ds_val = DfDataset(dfval,label_col = "label",num_features = num_cols,cat_features = cat_cols,
categories = categories, is_training=True)
ds_test = DfDataset(dftest,label_col = "label",num_features = num_cols,cat_features = cat_cols,
categories = categories, is_training=True)
dl_train = DataLoader(ds_train,batch_size = 2048,shuffle=True)
dl_val = DataLoader(ds_val,batch_size = 2048,shuffle=False)
dl_test = DataLoader(ds_test,batch_size = 2048,shuffle=False)
for features,labels in dl_train:
break
def create_net():
net = FM(
d_numerical= ds_train.X_num.shape[1],
categories= ds_train.get_categories(),
d_embed = 8,
n_classes = 1
)
return net
from torchkeras import summary
net = create_net()
summary(net,input_data=features);
此处使用的训练代码是一套通用代码,仿照keras风格构建pytorch模型的训练循环,来自我写的torchkeras库中的KerasModel类的源码。总共不到200行代码,包括进度条、EarlyStopping、GPU支持等功能,非常简洁实用,有兴趣的同学可以去github上搜索torchkeras参考使用。
import os,sys,time
import numpy as np
import pandas as pd
import datetime
from tqdm import tqdm
import torch
from torch import nn
from accelerate import Accelerator
from copy import deepcopy
def printlog(info):
nowtime = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print("\n"+"=========="*8 + "%s"%nowtime)
print(str(info)+"\n")
class StepRunner:
def __init__(self, net, loss_fn,stage = "train", metrics_dict = None,
optimizer = None, lr_scheduler = None,
accelerator = None
):
self.net,self.loss_fn,self.metrics_dict,self.stage = net,loss_fn,metrics_dict,stage
self.optimizer,self.lr_scheduler = optimizer,lr_scheduler
self.accelerator = accelerator
def __call__(self, features, labels):
#loss
preds = self.net(features)
loss = self.loss_fn(preds,labels)
#backward()
if self.optimizer is not None and self.stage=="train":
if self.accelerator is None:
loss.backward()
else:
self.accelerator.backward(loss)
self.optimizer.step()
if self.lr_scheduler is not None:
self.lr_scheduler.step()
self.optimizer.zero_grad()
#metrics
step_metrics = {self.stage+"_"+name:metric_fn(preds, labels).item()
for name,metric_fn in self.metrics_dict.items()}
return loss.item(),step_metrics
class EpochRunner:
def __init__(self,steprunner):
self.steprunner = steprunner
self.stage = steprunner.stage
self.steprunner.net.train() if self.stage=="train" else self.steprunner.net.eval()
def __call__(self,dataloader):
total_loss,step = 0,0
loop = tqdm(enumerate(dataloader), total =len(dataloader))
for i, batch in loop:
features,labels = batch
loss, step_metrics = self.steprunner(features,labels)
step_log = dict({self.stage+"_loss":loss},**step_metrics)
total_loss += loss
step+=1
if i!=len(dataloader)-1:
loop.set_postfix(**step_log)
else:
epoch_loss = total_loss/step
epoch_metrics = {self.stage+"_"+name:metric_fn.compute().item()
for name,metric_fn in self.steprunner.metrics_dict.items()}
epoch_log = dict({self.stage+"_loss":epoch_loss},**epoch_metrics)
loop.set_postfix(**epoch_log)
for name,metric_fn in self.steprunner.metrics_dict.items():
metric_fn.reset()
return epoch_log
class KerasModel(torch.nn.Module):
def __init__(self,net,loss_fn,metrics_dict=None,optimizer=None,lr_scheduler = None):
super().__init__()
self.accelerator = Accelerator()
self.history = {}
self.net = net
self.loss_fn = loss_fn
self.metrics_dict = nn.ModuleDict(metrics_dict)
self.optimizer = optimizer if optimizer is not None else torch.optim.Adam(
self.parameters(), lr=1e-2)
self.lr_scheduler = lr_scheduler
self.net,self.loss_fn,self.metrics_dict,self.optimizer = self.accelerator.prepare(
self.net,self.loss_fn,self.metrics_dict,self.optimizer)
def forward(self, x):
if self.net:
return self.net.forward(x)
else:
raise NotImplementedError
def fit(self, train_data, val_data=None, epochs=10, ckpt_path='checkpoint.pt',
patience=5, monitor="val_loss", mode="min"):
train_data = self.accelerator.prepare(train_data)
val_data = self.accelerator.prepare(val_data) if val_data else []
for epoch in range(1, epochs+1):
printlog("Epoch {0} / {1}".format(epoch, epochs))
# 1,train -------------------------------------------------
train_step_runner = StepRunner(net = self.net,stage="train",
loss_fn = self.loss_fn,metrics_dict=deepcopy(self.metrics_dict),
optimizer = self.optimizer, lr_scheduler = self.lr_scheduler,
accelerator = self.accelerator)
train_epoch_runner = EpochRunner(train_step_runner)
train_metrics = train_epoch_runner(train_data)
for name, metric in train_metrics.items():
self.history[name] = self.history.get(name, []) + [metric]
# 2,validate -------------------------------------------------
if val_data:
val_step_runner = StepRunner(net = self.net,stage="val",
loss_fn = self.loss_fn,metrics_dict=deepcopy(self.metrics_dict),
accelerator = self.accelerator)
val_epoch_runner = EpochRunner(val_step_runner)
with torch.no_grad():
val_metrics = val_epoch_runner(val_data)
val_metrics["epoch"] = epoch
for name, metric in val_metrics.items():
self.history[name] = self.history.get(name, []) + [metric]
# 3,early-stopping -------------------------------------------------
arr_scores = self.history[monitor]
best_score_idx = np.argmax(arr_scores) if mode=="max" else np.argmin(arr_scores)
if best_score_idx==len(arr_scores)-1:
torch.save(self.net.state_dict(),ckpt_path)
print("<<<<<< reach best {0} : {1} >>>>>>".format(monitor,
arr_scores[best_score_idx]),file=sys.stderr)
if len(arr_scores)-best_score_idx>patience:
print("<<<<<< {} without improvement in {} epoch, early stopping >>>>>>".format(
monitor,patience),file=sys.stderr)
self.net.load_state_dict(torch.load(ckpt_path))
break
return pd.DataFrame(self.history)
@torch.no_grad()
def evaluate(self, val_data):
val_data = self.accelerator.prepare(val_data)
val_step_runner = StepRunner(net = self.net,stage="val",
loss_fn = self.loss_fn,metrics_dict=deepcopy(self.metrics_dict),
accelerator = self.accelerator)
val_epoch_runner = EpochRunner(val_step_runner)
val_metrics = val_epoch_runner(val_data)
return val_metrics
@torch.no_grad()
def predict(self, dataloader):
dataloader = self.accelerator.prepare(dataloader)
result = torch.cat([self.forward(t[0]) for t in dataloader])
return result.data
from torchkeras.metrics import AUC
loss_fn = nn.BCEWithLogitsLoss()
metrics_dict = {"auc":AUC()}
optimizer = torch.optim.Adam(net.parameters(), lr=0.005, weight_decay=0.001)
model = KerasModel(net,
loss_fn = loss_fn,
metrics_dict= metrics_dict,
optimizer = optimizer
)
dfhistory = model.fit(train_data=dl_train,val_data=dl_val,epochs=30, patience=5,
monitor = "val_auc",mode="max")
可以看到,未经过调参,我们的FM模型在验证集的AUC指标为0.772,not bad!
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
import matplotlib.pyplot as plt
def plot_metric(dfhistory, metric):
train_metrics = dfhistory["train_"+metric]
val_metrics = dfhistory['val_'+metric]
epochs = range(1, len(train_metrics) + 1)
plt.plot(epochs, train_metrics, 'bo--')
plt.plot(epochs, val_metrics, 'ro-')
plt.title('Training and validation '+ metric)
plt.xlabel("Epochs")
plt.ylabel(metric)
plt.legend(["train_"+metric, 'val_'+metric])
plt.show()
plot_metric(dfhistory,"loss")
plot_metric(dfhistory,"auc")
model.evaluate(dl_train)
{'val_loss': 0.4663983137843708, 'val_auc': 0.7798623442649841}
model.evaluate(dl_val)
{'val_loss': 0.4729234987421881, 'val_auc': 0.7715966105461121}
from sklearn.metrics import roc_auc_score
preds = F.sigmoid(model.predict(dl_val))
labels = torch.cat([x[-1] for x in dl_val])
val_auc = roc_auc_score(labels.numpy(),preds.numpy())
print(val_auc)
0.7715966341541106
模型最佳权重已经保存在 model.fit(ckpt_path) 传入的参数中了。
net_clone = create_net()
net_clone.load_state_dict(torch.load("checkpoint.pt"))
from sklearn.metrics import roc_auc_score
preds = torch.cat([F.sigmoid(net_clone(x[0])).data for x in dl_val])
labels = torch.cat([x[-1] for x in dl_val])
val_auc = roc_auc_score(labels.numpy(),preds.numpy())
print(val_auc)
0.7715966341541106