前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何基于SDL+TensorFlow/SK-Learn开发NLP程序

如何基于SDL+TensorFlow/SK-Learn开发NLP程序

作者头像
用户2936994
发布2018-08-27 14:38:10
4080
发布2018-08-27 14:38:10
举报
文章被收录于专栏:祝威廉祝威廉

准备

Step1: 首先下载项目:

代码语言:javascript
复制
//下载项目
git clone https://github.com/allwefantasy/spark-deep-learning.git .
//切换到release 分支
git checkout release

Step2: 构建pyspark环境:

确保安装了python 2.7 ,强烈建议你使用Virtualenv方便python环境的管理。之后通过pip 安装pyspark

代码语言:javascript
复制
pip install pyspark

文件比较大,大约180多M,有点耐心。你也可以使用阿里源:

代码语言:javascript
复制
pip install pyspark -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com

下载 spark 2.2.0,然后解压到特定目录,设置SPARK_HOME即可。

其实如果通过spark-submit 提交程序,并不会需要额外安装pyspark, 这里通过pip安装的主要目的是为了让你的IDE能有代码提示。

接着安装项目需要的依赖:

代码语言:javascript
复制
pip install -r requirements.txt

最后进行项目build:

代码语言:javascript
复制
build/sbt assembly

这个时候你就得到了一个jar包:

代码语言:javascript
复制
target/scala-2.11/spark-deep-learning-assembly-0.1.0-spark2.1.jar

另外,另外你还需要一个Kafka。 似乎感觉有点麻烦,然而只要配置一次。

方便代码提示,package python 源码

为了方便在IDE得到代码提示,我们还需要把python相关的代码打包。

在主目录运行:

代码语言:javascript
复制
cd ./python && python setup.py bdist_wheel && cd dist && pip uninstall sparkdl  && pip install ./sparkdl-0.2.2-py2-none-any.whl && cd ..

我这里打包和安装放一块了。

现在,在IDE里,你可以得到代码提示补全了。

开发基于SK-Learn的应用

首先我们假设我们有这样的数据:

代码语言:javascript
复制
# -*- coding: UTF-8 -*-
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from spark_sklearn import GridSearchCV
from sklearn import svm
from sklearn.model_selection import GridSearchCV

from sparkdl.estimators.text_estimator import TextEstimator, KafkaMockServer
from sparkdl.transformers.tf_text import TFTextTransformer

session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()

documentDF = session.createDataFrame([
    ("Hi I heard about Spark", "spark"),
    ("I wish Java could use case classes", "java"),
    ("I wish Java could use case classes", "java"),
    ("I wish Java could use case classes", "java"),
    ("Logistic regression models are neat", "mlib"),
    ("Logistic regression models are neat", "spark"),
    ("Logistic regression models are neat", "mlib"),
    ("Logistic regression models are neat", "java"),
    ("Logistic regression models are neat", "spark"),
    ("Logistic regression models are neat", "java"),
    ("Logistic regression models are neat", "mlib")
], ["text", "preds"])

接着我们希望把preds转化为数字(分类),text转化为向量,这样才能喂给算法。我们可以这么做:

代码语言:javascript
复制
features = TFTextTransformer(
    inputCol="text", outputCol="features", shape=(-1,), embeddingSize=100, sequenceLength=64)

indexer = StringIndexer(inputCol="preds", outputCol="labels")

pipline = Pipeline(stages=[features, indexer])
ds = pipline.fit(documentDF).transform(documentDF)

TFTextTransformer 默认提供的是一个二维数组,shape为(64,100),这种shape其实是为了给深度学习使用的,这里我指定shape为(-1,) 则会将二维数组转化为一个64*100的向量

现在我们写一个函数,里面实现具体的sk-learn逻辑:

代码语言:javascript
复制
def sk_map_fun(args={}, ctx=None, _read_data=None):
    params = args['params']['fitParam']
    data = [item for item in _read_data()]
    parameters = {'kernel': ('linear', 'rbf')}
    svr = svm.SVC()
    clf = GridSearchCV(svr, parameters)
    X = [x["features"] for x in data[0]]
    y = [int(x["labels"]) for x in data[0]]
    model = clf.fit(X, y)
    print(model.best_estimator_)
    return ""

前面必须是def sk_map_fun(args={}, ctx=None, _read_data=None): 这样,函数名字可以随意定。 _read_data 是你获取数据的一个对象,典型用法如下:

代码语言:javascript
复制
        for data in _read_data(max_records=params["batch_size"]):
            batch_data = feed_dict(data)
            sess.run(train_step, feed_dict={input_x: batch_data})

因为SVM是需要全量数据的,所以我简单的一次性拉取所有数据,因为条数小于默认的64条,所以我没有指定max_records.

代码语言:javascript
复制
 data = [item for item in _read_data()]
 X = [x["features"] for x in data[0]]
 y = [int(x["labels"]) for x in data[0]]

现在我们要把sk_map_fun 集成到Estimator里:

代码语言:javascript
复制
estimator = TextEstimator(inputCol="features", outputCol="features", labelCol="labels",
                          kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                      "group_id": "sdl_1", "test_mode": False},
                          runningMode="Normal",
                          fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
                          mapFnParam=sk_map_fun)
estimator.fit(ds).collect()

这里,通过mapFnParam 参数,我们将sklearn函数传递给了TextEstimator,并且我们配置了Kakfa相关参数。这里唯一需要注意的是fitParam, 这里的fitParam 长度为2,意味着会启动两个进程运行sk_map_fun,并且一次传递对应的参数给sk_map_fun,sk_map_fun的第一段代码:

代码语言:javascript
复制
params = args['params']['fitParam']

这个时候params是{"epochs": 5, "batch_size": 64} 或者 {"epochs": 5, "batch_size": 1}。

这样你可以通过params拿到epoche,batch_size等,然后传给对应的Sk-Learn模型。

如果你只是运行Local模式,那么可以修改下kafkaParam参数:

代码语言:javascript
复制
import tempfile
mock_kafka_file = tempfile.mkdtemp()
kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                              "mock_kafka_file": mock_kafka_file,
                                              "group_id": "sdl_1", "test_mode": True},

指定一个临时目录mock_kafka_file,并且设置为test_mode为True,这样就可以不依赖于Kafka.

现在我么给出完整程序:

代码语言:javascript
复制
# -*- coding: UTF-8 -*-
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from spark_sklearn import GridSearchCV
from sklearn import svm
from sklearn.model_selection import GridSearchCV

from sparkdl.estimators.text_estimator import TextEstimator, KafkaMockServer
from sparkdl.transformers.tf_text import TFTextTransformer

session = SparkSession.builder.master("local[2]").appName("test").getOrCreate()

documentDF = session.createDataFrame([
    ("Hi I heard about Spark", "spark"),
    ("I wish Java could use case classes", "java"),
    ("I wish Java could use case classes", "java"),
    ("I wish Java could use case classes", "java"),
    ("Logistic regression models are neat", "mlib"),
    ("Logistic regression models are neat", "spark"),
    ("Logistic regression models are neat", "mlib"),
    ("Logistic regression models are neat", "java"),
    ("Logistic regression models are neat", "spark"),
    ("Logistic regression models are neat", "java"),
    ("Logistic regression models are neat", "mlib")
], ["text", "preds"])

# transform text column to sentence_matrix column which contains 2-D array.
features = TFTextTransformer(
    inputCol="text", outputCol="features", shape=(-1,), embeddingSize=100, sequenceLength=64)

indexer = StringIndexer(inputCol="preds", outputCol="labels")

pipline = Pipeline(stages=[features, indexer])
ds = pipline.fit(documentDF).transform(documentDF)


def sk_map_fun(args={}, ctx=None, _read_data=None):
    data = [item for item in _read_data()]
    parameters = {'kernel': ('linear', 'rbf')}
    svr = svm.SVC()
    clf = GridSearchCV(svr, parameters)
    X = [x["features"] for x in data[0]]
    y = [int(x["labels"]) for x in data[0]]
    model = clf.fit(X, y)
    print(model.best_estimator_)
    return ""


# create a estimator to training where map_fun contains tensorflow's code
estimator = TextEstimator(inputCol="features", outputCol="features", labelCol="labels",
                          kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                      "group_id": "sdl_1", "test_mode": False},
                          runningMode="Normal",
                          fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
                          mapFnParam=sk_map_fun)
estimator.fit(ds).collect()

然后使用如下指令运行:

代码语言:javascript
复制
./bin/spark-submit \
--py-files spark-deep-learning-assembly-0.1.0-spark2.1.jar \
--jars spark-deep-learning-assembly-0.1.0-spark2.1.jar \
--master "local[*]"  Sk2.py

记得改下代码。

开发基于TensorFlow的应用

只要修改map_fun函数即可,比如:

代码语言:javascript
复制
def map_fun(args={}, ctx=None, _read_data=None):
    import tensorflow as tf
    EMBEDDING_SIZE = args["embedding_size"]
    params = args['params']['fitParam']
    SEQUENCE_LENGTH = 64

    def feed_dict(batch):
        # Convert from dict of named arrays to two numpy arrays of the proper type
        features = []
        for i in batch:
            features.append(i['sentence_matrix'])

        # print("{} {}".format(feature, features))
        return features

    encoder_variables_dict = {
        "encoder_w1": tf.Variable(
            tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE, 256]), name="encoder_w1"),
        "encoder_b1": tf.Variable(tf.random_normal([256]), name="encoder_b1"),
        "encoder_w2": tf.Variable(tf.random_normal([256, 128]), name="encoder_w2"),
        "encoder_b2": tf.Variable(tf.random_normal([128]), name="encoder_b2")
    }

    def encoder(x, name="encoder"):
        with tf.name_scope(name):
            encoder_w1 = encoder_variables_dict["encoder_w1"]
            encoder_b1 = encoder_variables_dict["encoder_b1"]

            layer_1 = tf.nn.sigmoid(tf.matmul(x, encoder_w1) + encoder_b1)

            encoder_w2 = encoder_variables_dict["encoder_w2"]
            encoder_b2 = encoder_variables_dict["encoder_b2"]

            layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, encoder_w2) + encoder_b2)
            return layer_2

    def decoder(x, name="decoder"):
        with tf.name_scope(name):
            decoder_w1 = tf.Variable(tf.random_normal([128, 256]))
            decoder_b1 = tf.Variable(tf.random_normal([256]))

            layer_1 = tf.nn.sigmoid(tf.matmul(x, decoder_w1) + decoder_b1)

            decoder_w2 = tf.Variable(
                tf.random_normal([256, SEQUENCE_LENGTH * EMBEDDING_SIZE]))
            decoder_b2 = tf.Variable(
                tf.random_normal([SEQUENCE_LENGTH * EMBEDDING_SIZE]))

            layer_2 = tf.nn.sigmoid(tf.matmul(layer_1, decoder_w2) + decoder_b2)
            return layer_2

    tf.reset_default_graph
    sess = tf.Session()

    input_x = tf.placeholder(tf.float32, [None, SEQUENCE_LENGTH, EMBEDDING_SIZE], name="input_x")
    flattened = tf.reshape(input_x,
                           [-1, SEQUENCE_LENGTH * EMBEDDING_SIZE])

    encoder_op = encoder(flattened)

    tf.add_to_collection('encoder_op', encoder_op)

    y_pred = decoder(encoder_op)

    y_true = flattened

    with tf.name_scope("xent"):
        consine = tf.div(tf.reduce_sum(tf.multiply(y_pred, y_true), 1),
                         tf.multiply(tf.sqrt(tf.reduce_sum(tf.multiply(y_pred, y_pred), 1)),
                                     tf.sqrt(tf.reduce_sum(tf.multiply(y_true, y_true), 1))))
        xent = tf.reduce_sum(tf.subtract(tf.constant(1.0), consine))
        tf.summary.scalar("xent", xent)

    with tf.name_scope("train"):
        # train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(xent)
        train_step = tf.train.RMSPropOptimizer(0.01).minimize(xent)

    summ = tf.summary.merge_all()

    sess.run(tf.global_variables_initializer())

    for i in range(params["epochs"]):
        print("epoll {}".format(i))
        for data in _read_data(max_records=params["batch_size"]):
            batch_data = feed_dict(data)
            sess.run(train_step, feed_dict={input_x: batch_data})

    sess.close()

我这里还是之前的一个例子,一个auto-encoder程序。

接着通过TextEstimator接入:

代码语言:javascript
复制
        estimator = TextEstimator(inputCol="sentence_matrix", outputCol="sentence_matrix", labelCol="preds",
                                  kafkaParam={"bootstrap_servers": ["127.0.0.1"], "topic": "test",
                                              "mock_kafka_file": mock_kafka_file,
                                              "group_id": "sdl_1", "test_mode": True},
                                  runningMode="Normal",
                                  fitParam=[{"epochs": 5, "batch_size": 64}, {"epochs": 5, "batch_size": 1}],
                                  mapFnParam=map_fun)
estimator.fit(df).collect()

大同小异了。

关于tensorflow,还可以有集群模式,可参考: 为Spark Deep Learning 集成TFoS

本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2017.10.24 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 准备
  • 方便代码提示,package python 源码
  • 开发基于SK-Learn的应用
  • 开发基于TensorFlow的应用
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档