前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >利用PySpark 数据预处理(特征化)实战

利用PySpark 数据预处理(特征化)实战

作者头像
用户2936994
发布2018-08-27 14:38:40
1.6K0
发布2018-08-27 14:38:40
举报
文章被收录于专栏:祝威廉祝威廉祝威廉

前言

之前说要自己维护一个spark deep learning的分支,加快SDL的进度,这次终于提供了一些组件和实践,可以很大简化数据的预处理。

模型

这次实际情况是,我手头已经有个现成的模型,基于TF开发,并且算法工程师也提供了一些表给我,有用户信息表,用户行为表。行为表已经关联了内容的文本。现在我需要通过SDL来完成两个工作:

  1. 根据已有的表获取数据,处理成四个向量。
  2. 把数据喂给模型,进行训练

思路整理

四个向量又分成两个部分:

  1. 用户向量部分
  2. 内容向量部分

用户向量部分由2部分组成:

  1. 根据几个用户的基础属性,他们有数值也有字符串,我们需要将他们分别表示成二进制后拼接成一个数组。
  2. 根据用户访问的内容,通过词向量把每篇内容转化为一个向量,再把某个用户看过的所有内容转化为一个向量(都是简单采用加权平均)

内容向量部分组成:

对于文章,我们需要把他表示为一个数字序列(每个词汇由一个数字表示),同时需要放回词向量表,给RNN/CNN使用。

所以处理流程也是比较直观的:

  1. 通过用户信息表,可以得到用户基础属性向量
  2. 通过行为表,可以得到每篇涉及到的内容的数字序列表表示,同时也可以为每个用户算出行为向量。

最后的算法的输入其实是行为表,但是这个时候的行为表已经包含基础信息,内容序列,以及用户的内容行为向量。

实现

现在我们看看利用SDL里提供的组件,如何完成这些数据处理的工作以及衔接模型。

第一个是pyspark的套路,import SDL的一些组件,构建一个spark session:

# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, ArrayType, StringType, FloatType
from pyspark.sql.functions import *
import numpy as np

from sparkdl.transformers.tf_text import CategoricalBinaryTransformer, CombineBinaryColumnTransformer, \
    TextAnalysisTransformer, TextEmbeddingSequenceTransformer
from sparkdl.estimators.text_estimator import TextEstimator

session = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

读取用户基础信息表,这里我是直接读了一个CSV文件,现实中应该是Hive表。同时罗列有哪些字段是这次要用的,罗列一下:

person_basic_info_df = session.read.csv("/Users/allwefantasy/Downloads/query-impala-72329.csv", encoding="utf-8",
                                        header=True)
person_basic_info_df.registerTempTable("person_basic_info_df")

# 把所有基础属性罗列出来
person_basic_properties_str = "education,jobtitle..."

person_basic_properties_group = [item for item in
                                 person_basic_properties_str.split(",")]
# 每个属性我们会表示为一个12位的二进制字符串。
person_basic_info_vector_size = len(person_basic_properties_group) * 12

接着我们就可以利用SDL提供的CategoricalBinaryTransformer把这些字段批量转化为二进制。

# 基础信息中字符串字段需要转化为数字
binary_columns = [item + "_binary" for item in person_basic_properties_group]

binary_trans = CategoricalBinaryTransformer(inputCols=person_basic_properties_group,
                                            outputCols=binary_columns,
                                            embeddingSize=12)
combin_trans = CombineBinaryColumnTransformer(inputCols=binary_columns, outputCol="person_info_vector")

person_basic_info_with_all_binary_df = combin_trans.transform(binary_trans.transform(person_basic_info_df)). \
    groupBy("id").agg(first("person_info_vector").alias("person_info_vector"))

CategoricalBinaryTransformer接受inputCols参数, 传递一个数组字段,告诉他哪些字段是需要转化为二进制数值表示的。outputCols指定输出的名字,embeddingSize指定用多少个二进制数字。 所有的CategoricalBinaryTransformer会添加outputCols指定的字段。

因为我们需要把这些字段都拼接成一个字段,这个时候可以利用CombineBinaryColumnTransformer 。方式和CategoricalBinaryTransformer一样,但是输出只有一个字段。这样我们就得到了一个长度为person_basic_info_vector_size 的字段,格式大致这个样子:

[1,0,1,0,0,....]

CategoricalBinaryTransformer 内部的机制是,会将字段所有的值枚举出来,并且给每一个值递增的编号,然后给这个编号设置一个二进制字符串。

现在第一个特征就构造好了。接着,有一些NLP特有的操作了,我们需要对某些内容进行分词 ,同时将他们转化为数字序列(比如RNN就需要这种),并且把数字和词还有向量的对应关系给出。分词现在默认采用的是jieba。

person_behavior_df = session.read.csv("/Users/allwefantasy/Downloads/query-impala-72321.csv", encoding="utf-8",
                                      header=True).sample(True, 0.01).where(col("title").isNotNull()).where(
    col("text_body").isNotNull())

# 通过TextAnalysisTransformer我们对所有需要分词/抽词的字段进行分词
text_columns = ["title", "text_body"]
text_cut_columns = [item + "_cut" for item in text_columns]
tat_trans = TextAnalysisTransformer(inputCols=text_columns, outputCols=text_cut_columns)
tat_df = tat_trans.transform(person_behavior_df)
tat_df.show()

# 通过TextEmbeddingSequenceTransformer把分完词的字段里面的词汇全部替换成数字,这一步分会作为文章的输出
text_sequence_columns = [item + "_seq" for item in text_columns]
test_trans = TextEmbeddingSequenceTransformer(inputCols=text_cut_columns, outputCols=text_sequence_columns)
test_df = test_trans.transform(tat_df)
test_df.show()

# TextEmbeddingSequenceTransformer 有几个属性可以获取词向量相关信息
word_embedding = test_trans.getWordEmbedding()
word2vec_model = test_trans.getW2vModel()
embedding_size = test_trans.getEmbeddingSize()

# 广播出去,方便在自定义函数里使用
word_index2v_mapping_br = session.sparkContext.broadcast(
    dict([(item["word_index"], item["vector"]) for item in word_embedding]))

# 把标题和正文拼接
person_behavior_vector_seq_cctf = CombineBinaryColumnTransformer(inputCols=text_sequence_columns,
                                                                 outputCol="person_behavior_vector_seq")
person_behavior_vector_seq_df = person_behavior_vector_seq_cctf.transform(test_df)

这样就完成了文本到数字序列的转化了,并且通过TextEmbeddingSequenceTransformer获取词向量表数据。接下来,我们看看如何做一个复杂的自定义操作,这个操作主要是在行为表,把数字序列转化词向量,然后做加权平均。这个时候,每篇文章已经可以用一个向量表示了。

# 定义一个函数,接受的是一个数字序列,然后把数字转化为vector,然后做
# 加权平均
def avg_word_embbeding(word_seq):
    result = np.zeros(embedding_size)
    for item in word_seq:
        if item in word_index2v_mapping_br.value:
            result = result + np.array(word_index2v_mapping_br.value[item])
    return (result / len(word_seq)).tolist()

# 注册成udf函数
avg_word_embbeding_udf = udf(avg_word_embbeding, ArrayType(FloatType()))
# 添加一个person_behavior_article_vector新列
person_behavior_vector_df = person_behavior_vector_seq_df.withColumn(
    "person_behavior_article_vector",
    avg_word_embbeding_udf(
        "person_behavior_vector_seq"))

现在根据用户id做groupby 然后把多篇文章的文章向量合并成一个,然后把数字转换为向量,做加权平均。这个时候,每个用户终于有一个行为向量了。

# 我们根据用户名groupby ,把用户看过的所有文章聚合然后计算一个向量

def avg_word_embbeding_2(word_seq):
    result = np.zeros(embedding_size)
    for item in word_seq:
        result = result + np.array(item)
    return (result / len(word_seq)).tolist()


avg_word_embbeding_2_udf = udf(avg_word_embbeding_2, ArrayType(FloatType()))

person_behavior_vector_all_df = person_behavior_vector_df.groupBy("id").agg(
    avg_word_embbeding_2_udf(collect_list("person_behavior_article_vector")).alias("person_behavior_vector"))

现在,我们拿到了用户基础信息向量,访问内容向量。 当然还有之前计算出来的访问内容的数字序列,但是分在不同的表里(dataframe),我们把他们拼接成一个:

pv_df = person_basic_info_with_all_binary_df.select("id", "person_info_vector").alias("pv")
cv_df = person_behavior_vector_all_df.select("id", "person_behavior_vector").alias(
    "cv")
person_vector_df = cv_df.join(
    pv_df,
    col("pv.id") == col("cv.id"), "left"
)

person_df = person_vector_df.select("pv.id", "pv.person_info_vector", "cv.person_behavior_vector").where(
    col("id").isNotNull())

这里是标准的spark dataframe的join操作。

我们假设做的是一个二分类问题,到目前为止,我们还没有分类字段,为了简单起见我随机填充了分类,利用前面的办法,自定义一个UDF函数,添加了一个like_or_not_like 列。最后返回df的时候,过滤掉去胳膊少腿的行。

def like_or_not_like():
    return [0, 1] if np.random.uniform() < 0.5 else [1, 0]

like_or_not_like_udf = udf(like_or_not_like, ArrayType(IntegerType()))
result_df = person_behavior_vector_df.join(person_df, person_behavior_vector_df["id"] == person_df["id"],
                                           "left").withColumn("like_or_not_like", like_or_not_like_udf()).drop(
    person_df["id"]).where(
    col("person_info_vector").isNotNull()).where(
    col("person_behavior_vector").isNotNull()).where(
    col("person_behavior_vector_seq").isNotNull())

word2v_mapping_br = session.sparkContext.broadcast(
    dict([(item["word"], item["vector"]) for item in word_embedding]))

现在我们获得了所有的向量,我们可以把数据喂给算法了,这个主要通过TextEstimator来完成。

estimator = TextEstimator(kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                      "group_id": "sdl_1", "test_mode": False},
                          runningMode="Normal",
                          fitParam=[{"epochs": 5, "batch_size": 64, "word_embedding_bs": word2v_mapping_br.value}],
                          mapFnParam=map_fun)
estimator.fit(result_df).collect()

word embbeding表,我们通过fitParam参数传递给tf程序,然后tf所有的代码都在map_fun里,我们简单看看tf怎么拿到数据:

def map_fun(args={}, ctx=None, _read_data=None):
    import tensorflow as tf
    import numpy as np
    import datetime
    import os
    import time
    from sklearn.utils import Bunch
    FLAGS = Bunch(**args["params"]["fitParam"])
    embedded_vec = FLAGS.word_embedding_bs

    def config_default_value(name, value, desc):
        FLAGS.setdefault(name, value)

    # 产生数据
    def training_batch_generator(batch_size):
        for items in _read_data(max_records=batch_size):
            x_basic_info = [item["person_info_vector"] for item in items]
            x_subs = [item["person_subs"] for item in items]
            x_personas = [item["person_behavior_vector"] for item in items]
            x_contents = [item["person_behavior_vector_seq"] for item in items]
            y = [item["like_or_not_like"] for item in items]
            yield np.array(x_basic_info), np.array(x_subs), np.array(x_personas), np.array(x_contents), np.array(y)

现在通过training_batch_generator你已经可以拿到训练数据了。

如何执行

虽然已经简化了处理,但是代码还是不少,为了方便调试,建议使用pyspark shell。运行指令如下:

export PYTHONIOENCODING=utf8;./bin/pyspark --py-files spark-deep-learning-assembly-0.1.0-spark2.1.jar --jars spark-deep-learning-assembly-0.1.0-spark2.1.jar --master "local[*]"

然后把代码黏贴进去就可以了。

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.10.29 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • 模型
  • 思路整理
  • 实现
  • 如何执行
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档