前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Your Guide to Python with MLSQL Stack (二)

Your Guide to Python with MLSQL Stack (二)

作者头像
用户2936994
发布2019-05-14 10:09:21
5330
发布2019-05-14 10:09:21
举报
文章被收录于专栏:祝威廉

In the previous post Your Guide to NLP with MLSQL Stack (一), we already have known how to build a RandomForest model to classify text content. The TF/IDF, RandomForest are all built-in algorithms and implemented by Java. In this post, we will show you how to use Python to do the same job.

Requirements

This guide requires MLSQL Stack 1.3.0-SNAPSHOT. You can setup MLSQL stack with following links. We recommend you deploy MLSQL stack in local.

  1. Docker
  2. Mannually Compile
  3. Prebuild Distribution

If you meet any problem when deploying, please let me know, please feel free to address any issue in this link.

Create a project in MLSQL console

I have created a project named store1. Here is the structure of store1:

image.png

You need to write some python code so let the system knows how to

  1. resolve the python dependencies
  2. train the data
  3. predict a table
  4. predict a single record

Create env for python

Create a file named py_env.mlsql, here is the script content:

代码语言:javascript
复制
set py_env='''
name: tutorial
dependencies:
  - python=3.6
  - pip
  - pip:
    - --index-url https://mirrors.aliyun.com/pypi/simple/
    - numpy==1.14.3
    - kafka==1.3.5
    - pyspark==2.4.3
    - pandas==0.22.0
    - scikit-learn==0.21.0
''';
load script.`py_env` as py_env;

We use set statement to describe the python dependency, and finally, use load statement to convert this description into a table.

Kafka, PySpark are required. Since we choose SKlearn to build our model, we should specify the scikit-learn in dependencies block.

That's all.

Create python training script

We put the python training code into py_train.mlsql. Again, we use set/load statement to achieve this:

代码语言:javascript
复制
set py_train='''
from __future__ import absolute_import
import numpy as np
import os
import json
import sys
import pickle
import scipy.sparse as sp
import importlib
import mlsql
from pyspark.mllib.linalg import Vectors, SparseVector
from sklearn.ensemble import RandomForestClassifier

# Module mlsql is auto generated by MLSQL Engine. With mlsql, you 
# can get where is the data and where to save your model.

def train():

    if "tempModelLocalPath" not in mlsql.internal_system_param:
        raise Exception("tempModelLocalPath is not configured")
    
    tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]
    tempModelLocalPath = mlsql.internal_system_param["tempModelLocalPath"]
    
    # In order to get the parameters configured in train statement(where condition)
    # we can use `mlsql.fit_param`. here,  we provide a function to make it more convenient.
    
    def param(key, value):
        if key in mlsql.fit_param:
            res = mlsql.fit_param[key]
        else:
            res = value
        return res
        
    featureCol = param("featureCol", "features")
    labelCol = param("labelCol", "label")
    
    # Once we know the data location, we will try to load the data to a sparse matrix
    def load_sparse_data():
        # train the model on the new data for a few epochs
        datafiles = [file for file in os.listdir(tempDataLocalPath) if file.endswith(".json")]
        row_n = []
        col_n = []
        data_n = []
        y = []
        feature_size = 0
        row_index = 0
        for file in datafiles:
            with open(tempDataLocalPath + "/" + file) as f:
                for line in f.readlines():
                    obj = json.loads(line)
                    fc = obj[featureCol]
                    # convert json to SparseVector
                    if fc["type"] == 1:
                        feature_size = len(fc["values"])
                        dic = [(i, a) for i, a in enumerate(fc["values"])]
                        sv = SparseVector(feature_size, dic)
                    elif fc["type"] == 0:
                        feature_size = fc["size"]
                        sv = Vectors.sparse(feature_size, list(zip(fc["indices"], fc["values"])))
                    # convert SparseVector to scipy matrix
                    for c in sv.indices:
                        row_n.append(row_index)
                        col_n.append(c)
                        data_n.append(sv.values[list(sv.indices).index(c)])
    
                    if type(obj[labelCol]) is list:
                        y.append(np.array(obj[labelCol]).argmax())
                    else:
                        y.append(obj[labelCol])
                    row_index += 1
                    if row_index % 10000 == 0:
                        print("processing lines: %s, values: %s" % (str(row_index), str(len(row_n))))
        print("X matrix : %s %s  row_n:%s col_n:%s classNum:%s" % (
            row_index, feature_size, len(row_n), len(col_n), ",".join([str(i) for i in list(set(y))])))
        sys.stdout.flush()
        return sp.csc_matrix((data_n, (row_n, col_n)), shape=(row_index, feature_size)), y
    
    
    X, y = load_sparse_data()
    
    # more parameters about RandomForestClassifier
    nEstimators = int(param("nEstimators", "100"))
    maxDepth = int(param("maxDepth", "3"))
    randomState = int(param("randomState", "0"))
    
    clf = RandomForestClassifier(n_estimators=nEstimators, max_depth=maxDepth,random_state=randomState)
    model = clf.fit(X,y)
    
    if not os.path.exists(tempModelLocalPath):
        os.makedirs(tempModelLocalPath)
    
    model_file_path = tempModelLocalPath + "/model.pkl"
    print("Save model to %s" % model_file_path)
    pickle.dump(model, open(model_file_path, "wb"))

''';

load script.`py_train` as py_train;

Since py_train.mlsql is executed by the MLSQL Engine, so there must be a way to communicate between our python script and MLSQL Engine. The bridge is module mlsql. With mlsql, you can get something like the following:

  1. Where is the data?
  2. How can I get the parameters configured in MLSQL script?
  3. Where should I put my model?

Where is the data?

MLSQL Engine will put all trainning data in tempDataLocalPath.

代码语言:javascript
复制
tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]

How can I get the parameters configured in MLSQL script?

You can get parameters with following code:

代码语言:javascript
复制
def param(key, value):
        if key in mlsql.fit_param:
            res = mlsql.fit_param[key]
        else:
            res = value
        return res
        
    featureCol = param("featureCol", "features")
    labelCol = param("labelCol", "label")

and this is how the parameters looks like in MLSQL script:

代码语言:javascript
复制
train data1 as PythonAlg.`/tmp/python/nlp` where
-- algorithm parameters which will be accepted by Sklearn
and fitParam.0.labelCol="label"
and fitParam.0.featureCol="features"
and fitParam.0.maxDepth="3";

Where should I put my model?

You should put your model in tempModelLocalPath, and MLSQL Engine will copy them to HDFS automatically. Please check tempModelLocalPath is weather exist, if not, create it with python code.

代码语言:javascript
复制
tempModelLocalPath = mlsql.internal_system_param["tempModelLocalPath"]

The python code is not difficult, and the only thing you should be careful is how to convert the JSON data to vector. The JSON format is like this:

代码语言:javascript
复制
-- dense vector
{"features":{"type":1,"values":[...]}}
-- sparse vector
{"features":{"type":0,"size":[],"indices":[],"values":[...]}}

When the filed type is 1, this means the JSON object represents a dense vector and if the type is 0, the JSON object represents a sparse vector. The example code above converts all JSON object to parse vector.

Use py_main.mlsql to wrap py_train.mlsql(Optinal)

The main purpose of introducing py_main.mlsql is telling you how to import other python files in your python script.

代码语言:javascript
复制
set py_main='''
import py_train
py_train.train()
''';
load script.`py_main` as py_main;

Batch predict

Once we had trained our model successfully, we will use it to predict in ETL or provide an API service to serving HTTP/RPC requests. Batch predict is used as the first purpose. Again, with module mlsql, you can get information like this:

  1. Where is the data?
  2. Where is the model?
  3. Where should you put your model?
  4. How to get parameters from training stage?
代码语言:javascript
复制
set py_batch_predict='''
from __future__ import absolute_import
import numpy as np
import os
import json
import sys
import pickle
import scipy.sparse as sp
import importlib
import mlsql
from pyspark.mllib.linalg import Vectors, SparseVector

# this is a json file

tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]
# this is also a json file, it looks like `/tmp/__mlsql__/2d10dd4b-385e-44a7-ac0e-45d1c0fcaf4b/output/output-0.json`
tempOutputLocalPath = mlsql.internal_system_param["tempOutputLocalPath"]

# location where model is
tempModelLocalPath = mlsql.internal_system_param["tempModelLocalPath"]

model = pickle.load(open(tempModelLocalPath+"/model.pkl", "rb"))

# get featureCol from trainParams
featureCol=mlsql.params()["trainParams"]["fitParam.0.featureCol"]

with open(tempOutputLocalPath,"w") as output:
      with open(tempDataLocalPath) as f:
            for line in f.readlines():
                obj = json.loads(line)
                fc = obj[featureCol]
                # convert json to SparseVector
                if fc["type"] == 1:
                    feature_size = len(fc["values"])
                    dic = [(i, a) for i, a in enumerate(fc["values"])]
                    sv = SparseVector(feature_size, dic)
                elif fc["type"] == 0:
                    feature_size = fc["size"]
                    sv = Vectors.sparse(feature_size, list(zip(fc["indices"], fc["values"])))
                res = model.predict([sv.toArray()]).tolist()
                output.write(json.dumps({'predicted':res})+"\n")
''';
load script.`py_batch_predict` as py_batch_predict;

API Predict

We can register model as a function as we did in the first article, but we should tell the system how to implmentt this:

Here is py_predict.mlsql:

代码语言:javascript
复制
set py_predict='''
from __future__ import absolute_import
from pyspark.ml.linalg import VectorUDT, Vectors
import pickle
import os
import python_fun

# vector in vector out, and we only support pyspark vector.
def predict(index, s):
    # get the model path; just write like this always.
    items = [i for i in s]
    modelPath = pickle.loads(items[1])[0] + "/model.pkl"
    
    # we should keep the model registered only once and this function
    # will be serialized, so the normal way will not work.
    if not hasattr(os, "mlsql_models"):
        setattr(os, "mlsql_models", {})
    if modelPath not in os.mlsql_models:
        print("Load sklearn model %s" % modelPath)
        os.mlsql_models[modelPath] = pickle.load(open(modelPath, "rb"))
    
    # get the model
    model = os.mlsql_models[modelPath]
    
    # load the vector; Notice that api predict we only supports vector 
    rawVector = pickle.loads(items[0])
    feature = VectorUDT().deserialize(rawVector)
    
    # predict
    y = model.predict([feature.toArray()])
    
    # return vector 
    return [VectorUDT().serialize(Vectors.dense(y))]

python_fun.udf(predict)
''';
load script.`py_predict` as py_predict;

Notice that module python_fun is also generated by MLSQL engine.

Finally, let's write our main.mlsql

First, mock some data:

代码语言:javascript
复制
set jsonStr='''
{"features":[5.1,3.5,1.4,0.2],"label":0.0},
{"features":[5.1,3.5,1.4,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[4.4,2.9,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[4.7,3.2,1.3,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
''';
load jsonStr.`jsonStr` as data;
select vec_dense(features) as features ,label as label from data
as data1;

then, include all files we have written:

代码语言:javascript
复制
-- include python script
include store1.`alg.python.text_classify.py_env.mlsql`;
include store1.`alg.python.text_classify.py_train.mlsql`;
include store1.`alg.python.text_classify.py_batch_predict.mlsql`;
include store1.`alg.python.text_classify.py_predict.mlsql`;
include store1.`alg.python.text_classify.py_main.mlsql`;

Create python env:

代码语言:javascript
复制
-- run command as PythonEnvExt.`/tmp/python/nlp` where condaFile="dependencies" and command="create";
!createPythonEnv /tmp/python/nlp py_env;

Use ET PythonAlg to train:

代码语言:javascript
复制
train data1 as PythonAlg.`/tmp/python/nlp` where
-- global configuration
keepVersion="true"
and dataLocalFormat="json" 
and dataLocalFileNum="-1"
and generateProjectOnly="true"

-- configure python project
and scripts="py_train,py_main,py_batch_predict,py_predict"
and entryPoint="py_main"
and apiPredictEntryPoint="py_predict"
and condaFile="py_env"

-- algorithm parameters which will be accepted by Sklearn
and fitParam.0.labelCol="label"
and fitParam.0.featureCol="features"
and fitParam.0.maxDepth="3";

register the model as a function:

代码语言:javascript
复制
-- Ok, let's convert the model to a function
register PythonAlg.`/tmp/python/nlp` as rf_predict;
select rf_predict(vec_dense(array(5.1,3.5,1.4,0.2))) as predicted as output;

or batch predict all data:

代码语言:javascript
复制
-- you can also predict the table
predict data1 as PythonAlg.`/tmp/python/nlp`;
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2019.05.13 ,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • Requirements
  • Create a project in MLSQL console
  • Create env for python
  • Create python training script
    • Where is the data?
      • How can I get the parameters configured in MLSQL script?
      • Where should I put my model?
      • Use py_main.mlsql to wrap py_train.mlsql(Optinal)
      • Batch predict
      • API Predict
      • Finally, let's write our main.mlsql
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档