背景:需要在pyspark上例行化word2vec,但是加载预训练的词向量是一个大问题,因此需要先上传到HDFS,然后通过代码再获取。调研后发现pyspark虽然有自己的word2vec方法,但是好像无法加载预训练txt词向量。
因此大致的步骤应分为两步:
1.从hdfs获取词向量文件
2.对pyspark dataframe内的数据做分词+向量化的处理
开源的词向量文件很多,基本上都是key-value形式的txt文档,以腾讯AI Lab的词向量为例。
(https://ai.tencent.com/ailab/nlp/en/embedding.html)
首先需要将词向量txt文件上传到hdfs里,接着在代码里通过使用sparkfile来实现把文件下发到每一个worker:
from pyspark.sql import SparkSession
from pyspark import SparkFiles
# 将hdfs的词向量下发到每一个worker
sparkContext = spark.sparkContext
sparkContext.addPyFile("hdfs://******/tencent-ailab-embedding-zh-d100-v0.2.0-s.txt")
# 使用文件的方法:就和本地使用文件时"/***/***"一样
SparkFiles.get("tencent-ailab-embedding-zh-d100-v0.2.0-s.txt")
这一步的耗时主要在词向量下发到每一个worker这一步上。如果词向量文件较大可能耗时较高。
预训练词向量下发到每一个worker后,下一步就是对数据进行分词和获取词向量,采用udf函数来实现以上操作:
import pyspark.sql.functions as f
# 定义分词以及向量化的udf
@f.udf(StringType())
def generate_embedding(title, subtitle=None):
cut_title = jieba.lcut(title.lower())
if subtitle is None:
cut_sentence = cut_title
else:
cut_subtitle = jieba.lcut(title.lower())
cut_sentence = cut_title + cut_subtitle
res_embed = []
for word in cut_sentence:
# 未登录单词这里选择不处理, 也可以用unk替代
try:
res_embed.append(model.get_vector(word))
except:
pass
# 对词向量做avg_pooling
if len(res_embed)==0:
avg_vectors = np.zeros(100)
else:
res_embed_arr = np.array(res_embed)
avg_vectors = res_embed_arr.mean(axis=(0))
avg_vectors = np.round(avg_vectors,decimals=6)
# 转换成所需要的格式
tmp = []
for j in avg_vectors:
tmp.append(str(j))
output = ','.join(tmp)
return output
这里如果需要使用用户自定义jieba词典的时候就会有一个问题,我怎么在pyspark上实现jieba.load_userdict()
因此需要一种方式,在每一个worker上只加载一次。
首先在main方法里将用户自定义词典下发到每一个worker:
# 将hdfs的词典下发到每一个worker
sparkContext.addPyFile("hdfs://xxxxxxx/word_dict.txt")
接着在udf内首行添加jieba.dt.initialized
判断是否需要加载词典:
if not jieba.dt.initialized:
jieba.load_userdict(SparkFiles.get("word_dict.txt"))
至此完美解决这个问题~
参考:
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。