首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用Elasticsearch、Spark构建推荐系统 #2:深入分析

使用Elasticsearch、Spark构建推荐系统 #2:深入分析

原创
作者头像
flavorfan
发布2022-04-08 15:24:41
3.5K0
发布2022-04-08 15:24:41
举报
文章被收录于专栏:范传康的专栏范传康的专栏

Elasticsearch-spark-based recommender系统方案的两个关键步骤:

  1. ALS算法将user-item的交互历史建模构建相关共享隐变量空间(user matrix 和item matirx);
  2. 基于Elasticsearch将推荐问题转换为搜索问题。

1. 训练ALS模型

1) 数据预处理

ratings_from_es = spark.read.format("es").load("ratings")
ratings_from_es.show(5)

数据从es中读取,实际可以从其他源处理(clickhouse,csv等),另外可以分割为train、valid、test数据集

2)训练ALS模型

from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", regParam=0.02, rank=VECTOR_DIM, seed=54)
model = als.fit(ratings_from_es)
model.userFactors.show(5)
model.itemFactors.show(5)

3)将ALS模型的user和itemfactor vector存储到Elasticsearch

from pyspark.sql.functions import lit, current_timestamp, unix_timestamp
ver = model.uid
ts = unix_timestamp(current_timestamp())
movie_vectors = model.itemFactors.select("id",\
                                         col("features").alias("model_factor"),\
                                         lit(ver).alias("model_version"),\
                                         ts.alias("model_timestamp"))
movie_vectors.show(5)
user_vectors = model.userFactors.select("id",\
                                        col("features").alias("model_factor"),\
                                        lit(ver).alias("model_version"),\
                                        ts.alias("model_timestamp"))
user_vectors.show(5)

movie_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "update") \
    .save("movies", mode="append")
user_vectors.write.format("es") \
    .option("es.mapping.id", "id") \
    .option("es.write.operation", "index") \
    .save("users", mode="append")   

2. 使用Elasticsearch进行推荐:Script score query

def vector_query(query_vec, vector_field, q="*", cosine=False):   
    if cosine:
        score_fn = "doc['{v}'].size() == 0 ? 0 : cosineSimilarity(params.vector, '{v}') + 1.0"
    else:
        score_fn = "doc['{v}'].size() == 0 ? 0 : sigmoid(1, Math.E, -dotProduct(params.vector, '{v}'))"
       
    score_fn = score_fn.format(v=vector_field, fn=score_fn)    
    return {
    "query": {
        "script_score": {
            "query" : { 
                "query_string": {
                    "query": q
                }
            },
            "script": {
                "source": score_fn,
                "params": {
                    "vector": query_vec
                }
            }
        }
    }
}

def get_similar(the_id, q="*", num=10, index="movies", vector_field='model_factor'):
    response = es.get(index=index, id=the_id)
    src = response['_source']
    if vector_field in src:
        query_vec = src[vector_field]
        q = vector_query(query_vec, vector_field, q=q, cosine=True)
        results = es.search(index=index, body=q)
        hits = results['hits']['hits']
        return src, hits[1:num+1]    
    
def get_user_recs(the_id, q="*", num=10, users="users", movies="movies", vector_field='model_factor'):
    response = es.get(index=users, id=the_id)
    src = response['_source']
    if vector_field in src:
        query_vec = src[vector_field]
        q = vector_query(query_vec, vector_field, q=q, cosine=False)
        results = es.search(index=movies, body=q)
        hits = results['hits']['hits']
        return src, hits[:num]

def get_movies_for_user(the_id, num=10, ratings="ratings", movies="movies"):
    response = es.search(index=ratings, q="userId:{}".format(the_id), size=num, sort=[{"rating":"desc"}])
    hits = response['hits']['hits']
    ids = [h['_source']['movieId'] for h in hits]
    movies = es.mget(body={"ids": ids}, index=movies, _source_includes=['tmdbId', 'title'])
    movies_hits = movies['docs']
    tmdbids = [h['_source'] for h in movies_hits]
    return tmdbids

通过Elasticsearch的script score query for vector functions从factor vector中生成推荐,具体通过vector_query进行封装,用cosine距离计算同种(user或者item)相似度,用prudoct点乘对user计算推荐物品。

3. 深入分析

1) 为什么不使用spark ml直接推荐?

其一,工程和学术做trade-off的结果,在model serving过程中对几百万个候选集逐一跑一遍模型的时间开销显然太大了,因此在通过Elasticsearch最近邻搜索的方法高效很多,复杂度nlogn vs logn。

其二,可以添加丰富灵活的query,直接对候选集进行多维度的过滤操作。比如:杭州地区(地点)20年代(年龄)用户喜欢的火锅店(品类)。

2) implicit vs explicit

显式反馈的目标函数

隐式反馈的目标函数

隐式反馈的数据场景远远多于显式反馈,spark.ml.recommender.ALS对两种都支持

class pyspark.ml.recommendation.ALS(
  rank=10, 
  maxIter=10, 
  regParam=0.1, 
  numUserBlocks=10, 
  numItemBlocks=10, 
  implicitPrefs=False, 
  alpha=1.0, 
  userCol='user', 
  itemCol='item', 
  seed=None, 
  ratingCol='rating', 
  nonnegative=False, 
  checkpointInterval=10, 
  intermediateStorageLevel='MEMORY_AND_DISK', 
  finalStorageLevel='MEMORY_AND_DISK', 
  coldStartStrategy='nan')

关键参数的选择

3) 隐式反馈的评估 MPR, MRR

隐式反馈的评估基于召回的MPR(mean percent ranking)平均百分比排名。

另外一个评估指标是MRR(Mean Reciprocal Rank):

具体相关的计算pyspark代码

(
    predictions
    .withColumn('rank', row_number().over(Window.partitionBy('userId').orderBy(desc('prediction'))))
    .where(col('counts') > 0) # Notice: this excludes users with no actions at all
    .groupby('userId')
    .agg(
        count('*').alias('n'),
        sum(1 - col('prediction')).alias('sum_pred'),
        sum(col('rank') / n_genres).alias('sum_perc_rank'),
        min('rank').alias('min_rank')
    )
    .agg(
        (sum('sum_pred') / sum('n')).alias('avg 1-score'),
        (sum('sum_perc_rank') / sum('n')).alias('MPR'), # the lower the better
        mean(1 / col('min_rank')).alias('MRR')          # the higher the better
    )
    .withColumn('MPR*k', col('MPR') * n_genres)
    .withColumn('1/MRR', 1/col('MRR'))
).show()

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 1. 训练ALS模型
    • 1) 数据预处理
      • 2)训练ALS模型
        • 3)将ALS模型的user和itemfactor vector存储到Elasticsearch
        • 2. 使用Elasticsearch进行推荐:Script score query
        • 3. 深入分析
          • 1) 为什么不使用spark ml直接推荐?
            • 2) implicit vs explicit
              • 3) 隐式反馈的评估 MPR, MRR
              相关产品与服务
              Elasticsearch Service
              腾讯云 Elasticsearch Service(ES)是云端全托管海量数据检索分析服务,拥有高性能自研内核,集成X-Pack。ES 支持通过自治索引、存算分离、集群巡检等特性轻松管理集群,也支持免运维、自动弹性、按需使用的 Serverless 模式。使用 ES 您可以高效构建信息检索、日志分析、运维监控等服务,它独特的向量检索还可助您构建基于语义、图像的AI深度应用。
              领券
              问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档