Your Guide to Python with MLSQL Stack (二)

In the previous post Your Guide to NLP with MLSQL Stack (一), we already have known how to build a RandomForest model to classify text content. The TF/IDF, RandomForest are all built-in algorithms and implemented by Java. In this post, we will show you how to use Python to do the same job.

Requirements

This guide requires MLSQL Stack 1.3.0-SNAPSHOT. You can setup MLSQL stack with following links. We recommend you deploy MLSQL stack in local.

  1. Docker
  2. Mannually Compile
  3. Prebuild Distribution

If you meet any problem when deploying, please let me know, please feel free to address any issue in this link.

Create a project in MLSQL console

I have created a project named store1. Here is the structure of store1:

image.png

You need to write some python code so let the system knows how to

  1. resolve the python dependencies
  2. train the data
  3. predict a table
  4. predict a single record

Create env for python

Create a file named py_env.mlsql, here is the script content:

set py_env='''
name: tutorial
dependencies:
  - python=3.6
  - pip
  - pip:
    - --index-url https://mirrors.aliyun.com/pypi/simple/
    - numpy==1.14.3
    - kafka==1.3.5
    - pyspark==2.4.3
    - pandas==0.22.0
    - scikit-learn==0.21.0
''';
load script.`py_env` as py_env;

We use set statement to describe the python dependency, and finally, use load statement to convert this description into a table.

Kafka, PySpark are required. Since we choose SKlearn to build our model, we should specify the scikit-learn in dependencies block.

That's all.

Create python training script

We put the python training code into py_train.mlsql. Again, we use set/load statement to achieve this:

set py_train='''
from __future__ import absolute_import
import numpy as np
import os
import json
import sys
import pickle
import scipy.sparse as sp
import importlib
import mlsql
from pyspark.mllib.linalg import Vectors, SparseVector
from sklearn.ensemble import RandomForestClassifier

# Module mlsql is auto generated by MLSQL Engine. With mlsql, you 
# can get where is the data and where to save your model.

def train():

    if "tempModelLocalPath" not in mlsql.internal_system_param:
        raise Exception("tempModelLocalPath is not configured")
    
    tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]
    tempModelLocalPath = mlsql.internal_system_param["tempModelLocalPath"]
    
    # In order to get the parameters configured in train statement(where condition)
    # we can use `mlsql.fit_param`. here,  we provide a function to make it more convenient.
    
    def param(key, value):
        if key in mlsql.fit_param:
            res = mlsql.fit_param[key]
        else:
            res = value
        return res
        
    featureCol = param("featureCol", "features")
    labelCol = param("labelCol", "label")
    
    # Once we know the data location, we will try to load the data to a sparse matrix
    def load_sparse_data():
        # train the model on the new data for a few epochs
        datafiles = [file for file in os.listdir(tempDataLocalPath) if file.endswith(".json")]
        row_n = []
        col_n = []
        data_n = []
        y = []
        feature_size = 0
        row_index = 0
        for file in datafiles:
            with open(tempDataLocalPath + "/" + file) as f:
                for line in f.readlines():
                    obj = json.loads(line)
                    fc = obj[featureCol]
                    # convert json to SparseVector
                    if fc["type"] == 1:
                        feature_size = len(fc["values"])
                        dic = [(i, a) for i, a in enumerate(fc["values"])]
                        sv = SparseVector(feature_size, dic)
                    elif fc["type"] == 0:
                        feature_size = fc["size"]
                        sv = Vectors.sparse(feature_size, list(zip(fc["indices"], fc["values"])))
                    # convert SparseVector to scipy matrix
                    for c in sv.indices:
                        row_n.append(row_index)
                        col_n.append(c)
                        data_n.append(sv.values[list(sv.indices).index(c)])
    
                    if type(obj[labelCol]) is list:
                        y.append(np.array(obj[labelCol]).argmax())
                    else:
                        y.append(obj[labelCol])
                    row_index += 1
                    if row_index % 10000 == 0:
                        print("processing lines: %s, values: %s" % (str(row_index), str(len(row_n))))
        print("X matrix : %s %s  row_n:%s col_n:%s classNum:%s" % (
            row_index, feature_size, len(row_n), len(col_n), ",".join([str(i) for i in list(set(y))])))
        sys.stdout.flush()
        return sp.csc_matrix((data_n, (row_n, col_n)), shape=(row_index, feature_size)), y
    
    
    X, y = load_sparse_data()
    
    # more parameters about RandomForestClassifier
    nEstimators = int(param("nEstimators", "100"))
    maxDepth = int(param("maxDepth", "3"))
    randomState = int(param("randomState", "0"))
    
    clf = RandomForestClassifier(n_estimators=nEstimators, max_depth=maxDepth,random_state=randomState)
    model = clf.fit(X,y)
    
    if not os.path.exists(tempModelLocalPath):
        os.makedirs(tempModelLocalPath)
    
    model_file_path = tempModelLocalPath + "/model.pkl"
    print("Save model to %s" % model_file_path)
    pickle.dump(model, open(model_file_path, "wb"))

''';

load script.`py_train` as py_train;

Since py_train.mlsql is executed by the MLSQL Engine, so there must be a way to communicate between our python script and MLSQL Engine. The bridge is module mlsql. With mlsql, you can get something like the following:

  1. Where is the data?
  2. How can I get the parameters configured in MLSQL script?
  3. Where should I put my model?

Where is the data?

MLSQL Engine will put all trainning data in tempDataLocalPath.

tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]

How can I get the parameters configured in MLSQL script?

You can get parameters with following code:

def param(key, value):
        if key in mlsql.fit_param:
            res = mlsql.fit_param[key]
        else:
            res = value
        return res
        
    featureCol = param("featureCol", "features")
    labelCol = param("labelCol", "label")

and this is how the parameters looks like in MLSQL script:

train data1 as PythonAlg.`/tmp/python/nlp` where
-- algorithm parameters which will be accepted by Sklearn
and fitParam.0.labelCol="label"
and fitParam.0.featureCol="features"
and fitParam.0.maxDepth="3";

Where should I put my model?

You should put your model in tempModelLocalPath, and MLSQL Engine will copy them to HDFS automatically. Please check tempModelLocalPath is weather exist, if not, create it with python code.

tempModelLocalPath = mlsql.internal_system_param["tempModelLocalPath"]

The python code is not difficult, and the only thing you should be careful is how to convert the JSON data to vector. The JSON format is like this:

-- dense vector
{"features":{"type":1,"values":[...]}}
-- sparse vector
{"features":{"type":0,"size":[],"indices":[],"values":[...]}}

When the filed type is 1, this means the JSON object represents a dense vector and if the type is 0, the JSON object represents a sparse vector. The example code above converts all JSON object to parse vector.

Use py_main.mlsql to wrap py_train.mlsql(Optinal)

The main purpose of introducing py_main.mlsql is telling you how to import other python files in your python script.

set py_main='''
import py_train
py_train.train()
''';
load script.`py_main` as py_main;

Batch predict

Once we had trained our model successfully, we will use it to predict in ETL or provide an API service to serving HTTP/RPC requests. Batch predict is used as the first purpose. Again, with module mlsql, you can get information like this:

  1. Where is the data?
  2. Where is the model?
  3. Where should you put your model?
  4. How to get parameters from training stage?
set py_batch_predict='''
from __future__ import absolute_import
import numpy as np
import os
import json
import sys
import pickle
import scipy.sparse as sp
import importlib
import mlsql
from pyspark.mllib.linalg import Vectors, SparseVector

# this is a json file

tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]
# this is also a json file, it looks like `/tmp/__mlsql__/2d10dd4b-385e-44a7-ac0e-45d1c0fcaf4b/output/output-0.json`
tempOutputLocalPath = mlsql.internal_system_param["tempOutputLocalPath"]

# location where model is
tempModelLocalPath = mlsql.internal_system_param["tempModelLocalPath"]

model = pickle.load(open(tempModelLocalPath+"/model.pkl", "rb"))

# get featureCol from trainParams
featureCol=mlsql.params()["trainParams"]["fitParam.0.featureCol"]

with open(tempOutputLocalPath,"w") as output:
      with open(tempDataLocalPath) as f:
            for line in f.readlines():
                obj = json.loads(line)
                fc = obj[featureCol]
                # convert json to SparseVector
                if fc["type"] == 1:
                    feature_size = len(fc["values"])
                    dic = [(i, a) for i, a in enumerate(fc["values"])]
                    sv = SparseVector(feature_size, dic)
                elif fc["type"] == 0:
                    feature_size = fc["size"]
                    sv = Vectors.sparse(feature_size, list(zip(fc["indices"], fc["values"])))
                res = model.predict([sv.toArray()]).tolist()
                output.write(json.dumps({'predicted':res})+"\n")
''';
load script.`py_batch_predict` as py_batch_predict;

API Predict

We can register model as a function as we did in the first article, but we should tell the system how to implmentt this:

Here is py_predict.mlsql:

set py_predict='''
from __future__ import absolute_import
from pyspark.ml.linalg import VectorUDT, Vectors
import pickle
import os
import python_fun

# vector in vector out, and we only support pyspark vector.
def predict(index, s):
    # get the model path; just write like this always.
    items = [i for i in s]
    modelPath = pickle.loads(items[1])[0] + "/model.pkl"
    
    # we should keep the model registered only once and this function
    # will be serialized, so the normal way will not work.
    if not hasattr(os, "mlsql_models"):
        setattr(os, "mlsql_models", {})
    if modelPath not in os.mlsql_models:
        print("Load sklearn model %s" % modelPath)
        os.mlsql_models[modelPath] = pickle.load(open(modelPath, "rb"))
    
    # get the model
    model = os.mlsql_models[modelPath]
    
    # load the vector; Notice that api predict we only supports vector 
    rawVector = pickle.loads(items[0])
    feature = VectorUDT().deserialize(rawVector)
    
    # predict
    y = model.predict([feature.toArray()])
    
    # return vector 
    return [VectorUDT().serialize(Vectors.dense(y))]

python_fun.udf(predict)
''';
load script.`py_predict` as py_predict;

Notice that module python_fun is also generated by MLSQL engine.

Finally, let's write our main.mlsql

First, mock some data:

set jsonStr='''
{"features":[5.1,3.5,1.4,0.2],"label":0.0},
{"features":[5.1,3.5,1.4,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[4.4,2.9,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[4.7,3.2,1.3,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
''';
load jsonStr.`jsonStr` as data;
select vec_dense(features) as features ,label as label from data
as data1;

then, include all files we have written:

-- include python script
include store1.`alg.python.text_classify.py_env.mlsql`;
include store1.`alg.python.text_classify.py_train.mlsql`;
include store1.`alg.python.text_classify.py_batch_predict.mlsql`;
include store1.`alg.python.text_classify.py_predict.mlsql`;
include store1.`alg.python.text_classify.py_main.mlsql`;

Create python env:

-- run command as PythonEnvExt.`/tmp/python/nlp` where condaFile="dependencies" and command="create";
!createPythonEnv /tmp/python/nlp py_env;

Use ET PythonAlg to train:

train data1 as PythonAlg.`/tmp/python/nlp` where
-- global configuration
keepVersion="true"
and dataLocalFormat="json" 
and dataLocalFileNum="-1"
and generateProjectOnly="true"

-- configure python project
and scripts="py_train,py_main,py_batch_predict,py_predict"
and entryPoint="py_main"
and apiPredictEntryPoint="py_predict"
and condaFile="py_env"

-- algorithm parameters which will be accepted by Sklearn
and fitParam.0.labelCol="label"
and fitParam.0.featureCol="features"
and fitParam.0.maxDepth="3";

register the model as a function:

-- Ok, let's convert the model to a function
register PythonAlg.`/tmp/python/nlp` as rf_predict;
select rf_predict(vec_dense(array(5.1,3.5,1.4,0.2))) as predicted as output;

or batch predict all data:

-- you can also predict the table
predict data1 as PythonAlg.`/tmp/python/nlp`;

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

发表于

我来说两句

0 条评论
登录 后参与评论

相关文章

来自专栏kalifaの日々

python实现爬山算法

爬山算法会收敛到局部最优,解决办法是初始值在定义域上随机取乱数100次,总不可能100次都那么倒霉。

21300
来自专栏程序员的知识天地

9个用来爬取网络站点的 Python 库

Grab 是一个用于构建 Web scraper 的 python 框架。 使用 Grab,您可以构建各种复杂性的 Web scraper,从简单的5行脚本到处...

9600
来自专栏有趣的Python和你

在线Python环境(Azure Notebooks)

上文说道了anaconda的安装和使用。对于动手能力较差的小伙伴们来说,安装还是太过麻烦,一出现问题也不知道如何查错,然后就从入门选择放弃。

33200
来自专栏程序员的知识天地

人工智能行业薪酬数据曝光,这是要逼我们学python啊

人工智能可谓是目前最热门的行业,从走在前沿的科技公司,到努力创新的传统行业,几乎都想把握这个新“风口”。而人工智能的核心就是人才,热门的行业通常意味着工作机会和...

12300
来自专栏程序员的知识天地

4 月排行:Python 最流行,Java 还行不行?

PYPL 发布了 4 月份的编程语言排行榜。PYPL 是非常流行的参考指标,其榜单数据的排名均是根据榜单对象在 Google 上相关的搜索频率进行统计排名

9600
来自专栏SeanCheney的专栏

996.ICU - Python之父在行动

两天前(4月6日),Python之父范·罗瑟姆在Python官方论坛(https://discuss.python.org)发表了关于中国IT996的帖子(ht...

17800
来自专栏kalifaの日々

python实现模拟退火算法

24500
来自专栏程序员的知识天地

Python机器学习工具&库,再也不怕找不到工具了

用Python搞机器学习、数据科学,需要很多相关的资料,各种库、工具,都是常用、常找、常查的内容。

11600
来自专栏Pulsar-V

Python内核源码解析与C/CPP-API拓展编程(二)类型对象

在上一篇中我们了解到了 PyObject 这个结构只有两个内容:一个引用计数, 一个类型信息. 但是在现实的编程过程中缺失了一些去区别python每一个类型的信...

10400
来自专栏Pulsar-V

Python内核源码解析与C/CPP-API拓展编程(一)PyObject

基于C++的调试对于已经到Python虚拟机中存储起来的字节码命令是无法被观察到的,我们只能把它们解析成AST才能看懂字节码在解释器内存中的状态,所以这里我们借...

35900

扫码关注云+社区

领取腾讯云代金券

年度创作总结 领取年终奖励