如何基于SDL+TensorFlow/SK-Learn开发NLP程序

准备

Step1: 首先下载项目:

//下载项目
git clone https://github.com/allwefantasy/spark-deep-learning.git .
//切换到release 分支
git checkout release

Step2: 构建pyspark环境:

确保安装了python 2.7 ,强烈建议你使用Virtualenv方便python环境的管理。之后通过pip 安装pyspark

pip install pyspark

文件比较大,大约180多M,有点耐心。你也可以使用阿里源:

pip install pyspark -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com

下载 spark 2.2.0,然后解压到特定目录,设置SPARK_HOME即可。

其实如果通过spark-submit 提交程序,并不会需要额外安装pyspark, 这里通过pip安装的主要目的是为了让你的IDE能有代码提示。

接着安装项目需要的依赖:

pip install -r requirements.txt

最后进行项目build:

build/sbt assembly

这个时候你就得到了一个jar包:

target/scala-2.11/spark-deep-learning-assembly-0.1.0-spark2.1.jar

另外,另外你还需要一个Kafka。 似乎感觉有点麻烦,然而只要配置一次。

方便代码提示,package python 源码

为了方便在IDE得到代码提示,我们还需要把python相关的代码打包。 在主目录运行:

cd ./python && python setup.py bdist_wheel && cd dist && pip uninstall sparkdl  && pip install ./sparkdl-0.2.2-py2-none-any.whl && cd ..

我这里打包和安装放一块了。

现在,在IDE里,你可以得到代码提示补全了。

开发基于SK-Learn的应用

首先我们假设我们有这样的数据:

# -*- coding: UTF-8 -*-
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from spark_sklearn import GridSearchCV
from sklearn import svm
from sklearn.model_selection import GridSearchCV

from sparkdl.estimators.text_estimator import TextEstimator, KafkaMockServer
from sparkdl.transformers.tf_text import TFTextTransformer

session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()

documentDF = session.createDataFrame([
    ("Hi I heard about Spark", "spark"),
    ("I wish Java could use case classes", "java"),
    ("I wish Java could use case classes", "java"),
    ("I wish Java could use case classes", "java"),
    ("Logistic regression models are neat", "mlib"),
    ("Logistic regression models are neat", "spark"),
    ("Logistic regression models are neat", "mlib"),
    ("Logistic regression models are neat", "java"),
    ("Logistic regression models are neat", "spark"),
    ("Logistic regression models are neat", "java"),
    ("Logistic regression models are neat", "mlib")
], ["text", "preds"])

接着我们希望把preds转化为数字(分类),text转化为向量,这样才能喂给算法。我们可以这么做:

features = TFTextTransformer(
    inputCol="text", outputCol="features", shape=(-1,), embeddingSize=100, sequenceLength=64)

indexer = StringIndexer(inputCol="preds", outputCol="labels")

pipline = Pipeline(stages=[features, indexer])
ds = pipline.fit(documentDF).transform(documentDF)

TFTextTransformer 默认提供的是一个二维数组,shape为(64,100),这种shape其实是为了给深度学习使用的,这里我指定shape为(-1,) 则会将二维数组转化为一个64*100的向量

现在我们写一个函数,里面实现具体的sk-learn逻辑:

def sk_map_fun(args={}, ctx=None, _read_data=None):
    params = args['params']['fitParam']
    data = [item for item in _read_data()]
    parameters = {'kernel': ('linear', 'rbf')}
    svr = svm.SVC()
    clf = GridSearchCV(svr, parameters)
    X = [x["features"] for x in data[0]]
    y = [int(x["labels"]) for x in data[0]]
    model = clf.fit(X, y)
    print(model.best_estimator_)
    return ""

前面必须是def sk_map_fun(args={}, ctx=None, _read_data=None): 这样,函数名字可以随意定。 _read_data 是你获取数据的一个对象,典型用法如下:

        for data in _read_data(max_records=params["batch_size"]):
            batch_data = feed_dict(data)
            sess.run(train_step, feed_dict={input_x: batch_data})

因为SVM是需要全量数据的,所以我简单的一次性拉取所有数据,因为条数小于默认的64条,所以我没有指定max_records.

 data = [item for item in _read_data()]
 X = [x["features"] for x in data[0]]
 y = [int(x["labels"]) for x in data[0]]

现在我们要把sk_map_fun 集成到Estimator里:

estimator = TextEstimator(inputCol="features", outputCol="features", labelCol="labels",
                          kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                      "group_id": "sdl_1", "test_mode": False},
                          runningMode="Normal",
                          fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
                          mapFnParam=sk_map_fun)
estimator.fit(ds).collect()

这里,通过mapFnParam 参数,我们将sklearn函数传递给了TextEstimator,并且我们配置了Kakfa相关参数。这里唯一需要注意的是fitParam, 这里的fitParam 长度为2,意味着会启动两个进程运行sk_map_fun,并且一次传递对应的参数给sk_map_fun,sk_map_fun的第一段代码:

params = args['params']['fitParam']

这个时候params是{"epochs": 5, "batch_size": 64} 或者 {"epochs": 5, "batch_size": 1}。 这样你可以通过params拿到epoche,batch_size等,然后传给对应的Sk-Learn模型。

如果你只是运行Local模式,那么可以修改下kafkaParam参数:

import tempfile
mock_kafka_file = tempfile.mkdtemp()
kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                              "mock_kafka_file": mock_kafka_file,
                                              "group_id": "sdl_1", "test_mode": True},

指定一个临时目录mock_kafka_file,并且设置为test_mode为True,这样就可以不依赖于Kafka.

现在我么给出完整程序:

# -*- coding: UTF-8 -*-
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from spark_sklearn import GridSearchCV
from sklearn import svm
from sklearn.model_selection import GridSearchCV

from sparkdl.estimators.text_estimator import TextEstimator, KafkaMockServer
from sparkdl.transformers.tf_text import TFTextTransformer

session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()

documentDF = session.createDataFrame([
    ("Hi I heard about Spark", "spark"),
    ("I wish Java could use case classes", "java"),
    ("I wish Java could use case classes", "java"),
    ("I wish Java could use case classes", "java"),
    ("Logistic regression models are neat", "mlib"),
    ("Logistic regression models are neat", "spark"),
    ("Logistic regression models are neat", "mlib"),
    ("Logistic regression models are neat", "java"),
    ("Logistic regression models are neat", "spark"),
    ("Logistic regression models are neat", "java"),
    ("Logistic regression models are neat", "mlib")
], ["text", "preds"])

# transform text column to sentence_matrix column which contains 2-D array.
features = TFTextTransformer(
    inputCol="text", outputCol="features", shape=(-1,), embeddingSize=100, sequenceLength=64)

indexer = StringIndexer(inputCol="preds", outputCol="labels")

pipline = Pipeline(stages=[features, indexer])
ds = pipline.fit(documentDF).transform(documentDF)


def sk_map_fun(args={}, ctx=None, _read_data=None):
    data = [item for item in _read_data()]
    parameters = {'kernel': ('linear', 'rbf')}
    svr = svm.SVC()
    clf = GridSearchCV(svr, parameters)
    X = [x["features"] for x in data[0]]
    y = [int(x["labels"]) for x in data[0]]
    model = clf.fit(X, y)
    print(model.best_estimator_)
    return ""


# create a estimator to training where map_fun contains tensorflow's code
estimator = TextEstimator(inputCol="features", outputCol="features", labelCol="labels",
                          kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                      "group_id": "sdl_1", "test_mode": False},
                          runningMode="Normal",
                          fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
                          mapFnParam=sk_map_fun)
estimator.fit(ds).collect()

然后使用如下指令运行:

./bin/spark-submit \
--py-files spark-deep-learning-assembly-0.1.0-spark2.1.jar \
--jars spark-deep-learning-assembly-0.1.0-spark2.1.jar \
--master "local[*]"  Sk2.py

记得改下代码。

开发基于TensorFlow的应用

只要修改map_fun函数即可,比如:

def map_fun(args={}, ctx=None, _read_data=None):
    import tensorflow as tf
    EMBEDDING_SIZE = args["embedding_size"]
    params = args['params']['fitParam']
    SEQUENCE_LENGTH = 64

    def feed_dict(batch):
        # Convert from dict of named arrays to two numpy arrays of the proper type
        features = []
        for i in batch:
            features.append(i['sentence_matrix'])

        # print("{} {}".format(feature, features))
        return features

    encoder_variables_dict = {
        "encoder_w1": tf.Variable(
            tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"),
        "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"),
        "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"),
        "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2")
    }

    def encoder(x, name="encoder"):
        with tf.name_scope(name):
            encoder_w1 = encoder_variables_dict["encoder_w1"]
            encoder_b1 = encoder_variables_dict["encoder_b1"]

            layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1)

            encoder_w2 = encoder_variables_dict["encoder_w2"]
            encoder_b2 = encoder_variables_dict["encoder_b2"]

            layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2)
            return layer_2

    def decoder(x, name="decoder"):
        with tf.name_scope(name):
            decoder_w1 = tf.Variable(tf.random_normal([128, 256]))
            decoder_b1 = tf.Variable(tf.random_normal([256]))

            layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1)

            decoder_w2 = tf.Variable(
                tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE]))
            decoder_b2 = tf.Variable(
                tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE]))

            layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2)
            return layer_2

    tf.reset_default_graph
    sess = tf.Session()

    input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x")
    flattened = tf.reshape(input_x,
                           [-1, SEQUENCE_LENGTH * EMBEDDING_SIZE])

    encoder_op = encoder(flattened)

    tf.add_to_collection('encoder_op', encoder_op)

    y_pred = decoder(encoder_op)

    y_true = flattened

    with tf.name_scope("xent"):
        consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1),
                         tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)),
                                     tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1))))
        xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine))
        tf.summary.scalar("xent", xent)

    with tf.name_scope("train"):
        # train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent)
        train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent)

    summ = tf.summary.merge_all()

    sess.run(tf.global_variables_initializer())

    for i in range(params["epochs"]):
        print("epoll {}".format(i))
        for data in _read_data(max_records=params["batch_size"]):
            batch_data = feed_dict(data)
            sess.run(train_step, feed_dict={input_x: batch_data})

    sess.close()

我这里还是之前的一个例子,一个auto-encoder程序。 接着通过TextEstimator接入:

        estimator = TextEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds",
                                  kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                              "mock_kafka_file": mock_kafka_file,
                                              "group_id": "sdl_1", "test_mode": True},
                                  runningMode="Normal",
                                  fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
                                  mapFnParam=map_fun)
estimator.fit(df).collect()

大同小异了。

关于tensorflow,还可以有集群模式,可参考: 为Spark Deep Learning 集成TFoS

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏python爬虫日记

python下调用pytesseract识别某网站验证码

pytesseract最新版本0.1.6,网址:https://pypi.python.org/pypi/pytesseract

3283
来自专栏数值分析与有限元编程

导出ANSYS模型的数据

在计算过程中,通常需要对计算结果进行统计,ANSYS计算的模型大多时候节点单元数目很多,结果数据也很多,因此在GUI界面的查询操作不太方便,工作量太大,而且结果...

4285
来自专栏杨熹的专栏

TensorFlow-6-TensorBoard 可视化学习

学习资料: https://www.tensorflow.org/get_started/summaries_and_tensorboard 中文翻译: h...

3335
来自专栏点滴积累

geotrellis使用(四)geotrellis数据处理部分细节

       前面写了几篇博客介绍了Geotrellis的简单使用,具体链接在文后,今天我主要介绍一下Geotrellis在数据处理的过程中需要注意的细节,或者...

3585
来自专栏aCloudDeveloper

Mobility Model and Routing Model about the ONE

ONE主要的功能是节点的移动,节点间的相遇情况,路由情况以及消息的处理机制。下面简要介绍下目前ONE自带的六种移动模型和六种路由模型。 Mobility Mod...

1929
来自专栏大数据和云计算技术

SparkStreaming窗口操作

黄文辉同学第三篇的总结,大家支持。 概述 SparkStreaming提供了窗口的计算,它允许你对数据的滑动窗口应用转换。基于窗口的操作会在一个比Streami...

5528
来自专栏AI研习社

Github 项目推荐 | 用 Keras 实现的神经网络机器翻译

本库是用 Keras 实现的神经网络机器翻译,查阅库文件请访问: https://nmt-keras.readthedocs.io/ Github 页面: ht...

40312
来自专栏生信技能树

第3篇:用MACS2软件call peaks

Peak calling即利用计算的方法找出ChIP-seq或ATAC-seq中reads富集的基因组区域。

1.3K4
来自专栏SIGAI学习与实践平台

编写基于TensorFlow的应用之构建数据pipeline

本文主要以MNIST数据集为例介绍TFRecords文件如何制作以及加载使用。所讲内容可以在SIGAI 在线编程功能中的sharedata/intro_to_t...

1252
来自专栏用户2442861的专栏

Caffe中LMDB的使用

http://rayz0620.github.io/2015/05/25/lmdb_in_caffe/

2911

扫码关注云+社区

领取腾讯云代金券