利用PySpark 数据预处理(特征化)实战

前言

之前说要自己维护一个spark deep learning的分支,加快SDL的进度,这次终于提供了一些组件和实践,可以很大简化数据的预处理。

模型

这次实际情况是,我手头已经有个现成的模型,基于TF开发,并且算法工程师也提供了一些表给我,有用户信息表,用户行为表。行为表已经关联了内容的文本。现在我需要通过SDL来完成两个工作:

  1. 根据已有的表获取数据,处理成四个向量。
  2. 把数据喂给模型,进行训练

思路整理

四个向量又分成两个部分:

  1. 用户向量部分
  2. 内容向量部分

用户向量部分由2部分组成:

  1. 根据几个用户的基础属性,他们有数值也有字符串,我们需要将他们分别表示成二进制后拼接成一个数组。
  2. 根据用户访问的内容,通过词向量把每篇内容转化为一个向量,再把某个用户看过的所有内容转化为一个向量(都是简单采用加权平均)

内容向量部分组成:

对于文章,我们需要把他表示为一个数字序列(每个词汇由一个数字表示),同时需要放回词向量表,给RNN/CNN使用。

所以处理流程也是比较直观的:

  1. 通过用户信息表,可以得到用户基础属性向量
  2. 通过行为表,可以得到每篇涉及到的内容的数字序列表表示,同时也可以为每个用户算出行为向量。

最后的算法的输入其实是行为表,但是这个时候的行为表已经包含基础信息,内容序列,以及用户的内容行为向量。

实现

现在我们看看利用SDL里提供的组件,如何完成这些数据处理的工作以及衔接模型。

第一个是pyspark的套路,import SDL的一些组件,构建一个spark session:

# -*- coding: UTF-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, ArrayType, StringType, FloatType
from pyspark.sql.functions import *
import numpy as np

from sparkdl.transformers.tf_text import CategoricalBinaryTransformer, CombineBinaryColumnTransformer, \
    TextAnalysisTransformer, TextEmbeddingSequenceTransformer
from sparkdl.estimators.text_estimator import TextEstimator

session = SparkSession.builder.master("local[*]").appName("test").getOrCreate()

读取用户基础信息表,这里我是直接读了一个CSV文件,现实中应该是Hive表。同时罗列有哪些字段是这次要用的,罗列一下:

person_basic_info_df = session.read.csv("/Users/allwefantasy/Downloads/query-impala-72329.csv", encoding="utf-8",
                                        header=True)
person_basic_info_df.registerTempTable("person_basic_info_df")

# 把所有基础属性罗列出来
person_basic_properties_str = "education,jobtitle..."

person_basic_properties_group = [item for item in
                                 person_basic_properties_str.split(",")]
# 每个属性我们会表示为一个12位的二进制字符串。
person_basic_info_vector_size = len(person_basic_properties_group) * 12

接着我们就可以利用SDL提供的CategoricalBinaryTransformer把这些字段批量转化为二进制。

# 基础信息中字符串字段需要转化为数字
binary_columns = [item + "_binary" for item in person_basic_properties_group]

binary_trans = CategoricalBinaryTransformer(inputCols=person_basic_properties_group,
                                            outputCols=binary_columns,
                                            embeddingSize=12)
combin_trans = CombineBinaryColumnTransformer(inputCols=binary_columns, outputCol="person_info_vector")

person_basic_info_with_all_binary_df = combin_trans.transform(binary_trans.transform(person_basic_info_df)). \
    groupBy("id").agg(first("person_info_vector").alias("person_info_vector"))

CategoricalBinaryTransformer接受inputCols参数, 传递一个数组字段,告诉他哪些字段是需要转化为二进制数值表示的。outputCols指定输出的名字,embeddingSize指定用多少个二进制数字。 所有的CategoricalBinaryTransformer会添加outputCols指定的字段。

因为我们需要把这些字段都拼接成一个字段,这个时候可以利用CombineBinaryColumnTransformer 。方式和CategoricalBinaryTransformer一样,但是输出只有一个字段。这样我们就得到了一个长度为person_basic_info_vector_size 的字段,格式大致这个样子:

[1,0,1,0,0,....]

CategoricalBinaryTransformer 内部的机制是,会将字段所有的值枚举出来,并且给每一个值递增的编号,然后给这个编号设置一个二进制字符串。

现在第一个特征就构造好了。接着,有一些NLP特有的操作了,我们需要对某些内容进行分词 ,同时将他们转化为数字序列(比如RNN就需要这种),并且把数字和词还有向量的对应关系给出。分词现在默认采用的是jieba。

person_behavior_df = session.read.csv("/Users/allwefantasy/Downloads/query-impala-72321.csv", encoding="utf-8",
                                      header=True).sample(True, 0.01).where(col("title").isNotNull()).where(
    col("text_body").isNotNull())

# 通过TextAnalysisTransformer我们对所有需要分词/抽词的字段进行分词
text_columns = ["title", "text_body"]
text_cut_columns = [item + "_cut" for item in text_columns]
tat_trans = TextAnalysisTransformer(inputCols=text_columns, outputCols=text_cut_columns)
tat_df = tat_trans.transform(person_behavior_df)
tat_df.show()

# 通过TextEmbeddingSequenceTransformer把分完词的字段里面的词汇全部替换成数字,这一步分会作为文章的输出
text_sequence_columns = [item + "_seq" for item in text_columns]
test_trans = TextEmbeddingSequenceTransformer(inputCols=text_cut_columns, outputCols=text_sequence_columns)
test_df = test_trans.transform(tat_df)
test_df.show()

# TextEmbeddingSequenceTransformer 有几个属性可以获取词向量相关信息
word_embedding = test_trans.getWordEmbedding()
word2vec_model = test_trans.getW2vModel()
embedding_size = test_trans.getEmbeddingSize()

# 广播出去,方便在自定义函数里使用
word_index2v_mapping_br = session.sparkContext.broadcast(
    dict([(item["word_index"], item["vector"]) for item in word_embedding]))

# 把标题和正文拼接
person_behavior_vector_seq_cctf = CombineBinaryColumnTransformer(inputCols=text_sequence_columns,
                                                                 outputCol="person_behavior_vector_seq")
person_behavior_vector_seq_df = person_behavior_vector_seq_cctf.transform(test_df)

这样就完成了文本到数字序列的转化了,并且通过TextEmbeddingSequenceTransformer获取词向量表数据。接下来,我们看看如何做一个复杂的自定义操作,这个操作主要是在行为表,把数字序列转化词向量,然后做加权平均。这个时候,每篇文章已经可以用一个向量表示了。

# 定义一个函数,接受的是一个数字序列,然后把数字转化为vector,然后做
# 加权平均
def avg_word_embbeding(word_seq):
    result = np.zeros(embedding_size)
    for item in word_seq:
        if item in word_index2v_mapping_br.value:
            result = result + np.array(word_index2v_mapping_br.value[item])
    return (result / len(word_seq)).tolist()

# 注册成udf函数
avg_word_embbeding_udf = udf(avg_word_embbeding, ArrayType(FloatType()))
# 添加一个person_behavior_article_vector新列
person_behavior_vector_df = person_behavior_vector_seq_df.withColumn(
    "person_behavior_article_vector",
    avg_word_embbeding_udf(
        "person_behavior_vector_seq"))

现在根据用户id做groupby 然后把多篇文章的文章向量合并成一个,然后把数字转换为向量,做加权平均。这个时候,每个用户终于有一个行为向量了。

# 我们根据用户名groupby ,把用户看过的所有文章聚合然后计算一个向量

def avg_word_embbeding_2(word_seq):
    result = np.zeros(embedding_size)
    for item in word_seq:
        result = result + np.array(item)
    return (result / len(word_seq)).tolist()


avg_word_embbeding_2_udf = udf(avg_word_embbeding_2, ArrayType(FloatType()))

person_behavior_vector_all_df = person_behavior_vector_df.groupBy("id").agg(
    avg_word_embbeding_2_udf(collect_list("person_behavior_article_vector")).alias("person_behavior_vector"))

现在,我们拿到了用户基础信息向量,访问内容向量。 当然还有之前计算出来的访问内容的数字序列,但是分在不同的表里(dataframe),我们把他们拼接成一个:

pv_df = person_basic_info_with_all_binary_df.select("id", "person_info_vector").alias("pv")
cv_df = person_behavior_vector_all_df.select("id", "person_behavior_vector").alias(
    "cv")
person_vector_df = cv_df.join(
    pv_df,
    col("pv.id") == col("cv.id"), "left"
)

person_df = person_vector_df.select("pv.id", "pv.person_info_vector", "cv.person_behavior_vector").where(
    col("id").isNotNull())

这里是标准的spark dataframe的join操作。

我们假设做的是一个二分类问题,到目前为止,我们还没有分类字段,为了简单起见我随机填充了分类,利用前面的办法,自定义一个UDF函数,添加了一个like_or_not_like 列。最后返回df的时候,过滤掉去胳膊少腿的行。

def like_or_not_like():
    return [0, 1] if np.random.uniform() < 0.5 else [1, 0]

like_or_not_like_udf = udf(like_or_not_like, ArrayType(IntegerType()))
result_df = person_behavior_vector_df.join(person_df, person_behavior_vector_df["id"] == person_df["id"],
                                           "left").withColumn("like_or_not_like", like_or_not_like_udf()).drop(
    person_df["id"]).where(
    col("person_info_vector").isNotNull()).where(
    col("person_behavior_vector").isNotNull()).where(
    col("person_behavior_vector_seq").isNotNull())

word2v_mapping_br = session.sparkContext.broadcast(
    dict([(item["word"], item["vector"]) for item in word_embedding]))

现在我们获得了所有的向量,我们可以把数据喂给算法了,这个主要通过TextEstimator来完成。

estimator = TextEstimator(kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                      "group_id": "sdl_1", "test_mode": False},
                          runningMode="Normal",
                          fitParam=[{"epochs": 5, "batch_size": 64, "word_embedding_bs": word2v_mapping_br.value}],
                          mapFnParam=map_fun)
estimator.fit(result_df).collect()

word embbeding表,我们通过fitParam参数传递给tf程序,然后tf所有的代码都在map_fun里,我们简单看看tf怎么拿到数据:

def map_fun(args={}, ctx=None, _read_data=None):
    import tensorflow as tf
    import numpy as np
    import datetime
    import os
    import time
    from sklearn.utils import Bunch
    FLAGS = Bunch(**args["params"]["fitParam"])
    embedded_vec = FLAGS.word_embedding_bs

    def config_default_value(name, value, desc):
        FLAGS.setdefault(name, value)

    # 产生数据
    def training_batch_generator(batch_size):
        for items in _read_data(max_records=batch_size):
            x_basic_info = [item["person_info_vector"] for item in items]
            x_subs = [item["person_subs"] for item in items]
            x_personas = [item["person_behavior_vector"] for item in items]
            x_contents = [item["person_behavior_vector_seq"] for item in items]
            y = [item["like_or_not_like"] for item in items]
            yield np.array(x_basic_info), np.array(x_subs), np.array(x_personas), np.array(x_contents), np.array(y)

现在通过training_batch_generator你已经可以拿到训练数据了。

如何执行

虽然已经简化了处理,但是代码还是不少,为了方便调试,建议使用pyspark shell。运行指令如下:

export PYTHONIOENCODING=utf8;./bin/pyspark --py-files spark-deep-learning-assembly-0.1.0-spark2.1.jar --jars spark-deep-learning-assembly-0.1.0-spark2.1.jar --master "local[*]"

然后把代码黏贴进去就可以了。

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏数值分析与有限元编程

Python也能干大事

用Python做数值计算,和MATLAB一样简洁方便,关键是Python还是免费的,不用担心版权的问题。下面举几个例子。 1.计算方阵行列式 ? 在Anacon...

4159
来自专栏AI研习社

如何利用微信监管你的TF训练?

之前回答问题【在机器学习模型的训练期间,大概几十分钟到几小时不等,大家都会在等实验的时候做什么?(http://t.cn/Rl8119m)】的时候,说到可以用微...

3684
来自专栏生信宝典

R包ggseqlogo 绘制seq logo图

在生物信息分析中,经常会做序列分析图(sequence logo),这里的序列指的是核苷酸(DNA/RNA链中)或氨基酸(在蛋白质序列中)。sequence l...

2483
来自专栏Python小屋

Python切分图像小案例(1、3、2、4象限子图互换)

首先解释上一篇文章详解Python科学计算扩展库numpy中的矩阵运算(1)最后的习题,该问题答案是10 ** 8 = 100000000,原因在于Python...

4177
来自专栏瓜大三哥

直方图操作(二)

直方图操作(二)之统计电路 在实际的图像中,连续的像素点灰度值为相同值的情况非常常见,如果每来一个像素都对双口RAM进行一次寻址和写操作,显然降低了统计效率而提...

2087
来自专栏MyBlog

软件测试方法课程笔记(2)

为了产生少量的测试用例, 并且可以测试大部分的情况, 我们可以使用等价类划分的方法 比如对于输入值是范围值, 我们可以使用等价类划分成范围内的和不是范围内的两...

1402
来自专栏人工智能LeadAI

用TensorFlow的Linear/DNNRegrressor预测数据

今天要处理的问题对于一个只学了线性回归的机器学习初学者来说还是比较棘手——通过已知的几组数据预测一组数据。用excel看了下,关系不是很明显,平方,log都不是...

1031
来自专栏深度学习之tensorflow实战篇

R语言之系统聚类(层次)分析之图谱形式完整版

读取数据常见错误: 在读取数据过程中可能遇到以下问题,参照上一篇博客: 可能遇到报错: 1、Error in if (is.na(n) || n > 65536...

5945
来自专栏人工智能LeadAI

tensorflow的数据输入

tensorflow有两种数据输入方法,比较简单的一种是使用feed_dict,这种方法在画graph的时候使用placeholder来站位,在真正run的时候...

1335
来自专栏数据结构与算法

BZOJ1030: [JSOI2007]文本生成器(AC自动机)

  JSOI交给队员ZYX一个任务,编制一个称之为“文本生成器”的电脑软件:该软件的使用者是一些低幼人群, 他们现在使用的是GW文本生成器v6版。该软件可以随机...

862

扫码关注云+社区

领取腾讯云代金券