In the previous post Your Guide to NLP with MLSQL Stack (一), we already have known how to build a RandomForest model to classify text content. The TF/IDF, RandomForest are all built-in algorithms and implemented by Java. In this post, we will show you how to use Python to do the same job.
This guide requires MLSQL Stack 1.3.0-SNAPSHOT. You can setup MLSQL stack with following links. We recommend you deploy MLSQL stack in local.
If you meet any problem when deploying, please let me know, please feel free to address any issue in this link.
I have created a project named store1
. Here is the structure of store1:
image.png
You need to write some python code so let the system knows how to
Create a file named py_env.mlsql
, here is the script content:
set py_env='''
name: tutorial
dependencies:
- python=3.6
- pip
- pip:
- --index-url https://mirrors.aliyun.com/pypi/simple/
- numpy==1.14.3
- kafka==1.3.5
- pyspark==2.4.3
- pandas==0.22.0
- scikit-learn==0.21.0
''';
load script.`py_env` as py_env;
We use set statement to describe the python dependency, and finally, use load statement to convert this description into a table.
Kafka, PySpark are required. Since we choose SKlearn to build our model, we should specify the scikit-learn in dependencies block.
That's all.
We put the python training code into py_train.mlsql
. Again, we use set/load statement to achieve this:
set py_train='''
from __future__ import absolute_import
import numpy as np
import os
import json
import sys
import pickle
import scipy.sparse as sp
import importlib
import mlsql
from pyspark.mllib.linalg import Vectors, SparseVector
from sklearn.ensemble import RandomForestClassifier
# Module mlsql is auto generated by MLSQL Engine. With mlsql, you
# can get where is the data and where to save your model.
def train():
if "tempModelLocalPath" not in mlsql.internal_system_param:
raise Exception("tempModelLocalPath is not configured")
tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]
tempModelLocalPath = mlsql.internal_system_param["tempModelLocalPath"]
# In order to get the parameters configured in train statement(where condition)
# we can use `mlsql.fit_param`. here, we provide a function to make it more convenient.
def param(key, value):
if key in mlsql.fit_param:
res = mlsql.fit_param[key]
else:
res = value
return res
featureCol = param("featureCol", "features")
labelCol = param("labelCol", "label")
# Once we know the data location, we will try to load the data to a sparse matrix
def load_sparse_data():
# train the model on the new data for a few epochs
datafiles = [file for file in os.listdir(tempDataLocalPath) if file.endswith(".json")]
row_n = []
col_n = []
data_n = []
y = []
feature_size = 0
row_index = 0
for file in datafiles:
with open(tempDataLocalPath + "/" + file) as f:
for line in f.readlines():
obj = json.loads(line)
fc = obj[featureCol]
# convert json to SparseVector
if fc["type"] == 1:
feature_size = len(fc["values"])
dic = [(i, a) for i, a in enumerate(fc["values"])]
sv = SparseVector(feature_size, dic)
elif fc["type"] == 0:
feature_size = fc["size"]
sv = Vectors.sparse(feature_size, list(zip(fc["indices"], fc["values"])))
# convert SparseVector to scipy matrix
for c in sv.indices:
row_n.append(row_index)
col_n.append(c)
data_n.append(sv.values[list(sv.indices).index(c)])
if type(obj[labelCol]) is list:
y.append(np.array(obj[labelCol]).argmax())
else:
y.append(obj[labelCol])
row_index += 1
if row_index % 10000 == 0:
print("processing lines: %s, values: %s" % (str(row_index), str(len(row_n))))
print("X matrix : %s %s row_n:%s col_n:%s classNum:%s" % (
row_index, feature_size, len(row_n), len(col_n), ",".join([str(i) for i in list(set(y))])))
sys.stdout.flush()
return sp.csc_matrix((data_n, (row_n, col_n)), shape=(row_index, feature_size)), y
X, y = load_sparse_data()
# more parameters about RandomForestClassifier
nEstimators = int(param("nEstimators", "100"))
maxDepth = int(param("maxDepth", "3"))
randomState = int(param("randomState", "0"))
clf = RandomForestClassifier(n_estimators=nEstimators, max_depth=maxDepth,random_state=randomState)
model = clf.fit(X,y)
if not os.path.exists(tempModelLocalPath):
os.makedirs(tempModelLocalPath)
model_file_path = tempModelLocalPath + "/model.pkl"
print("Save model to %s" % model_file_path)
pickle.dump(model, open(model_file_path, "wb"))
''';
load script.`py_train` as py_train;
Since py_train.mlsql is executed by the MLSQL Engine, so there must be a way to communicate between our python script and MLSQL Engine. The bridge is module mlsql
. With mlsql, you can get something like the following:
MLSQL Engine will put all trainning data in tempDataLocalPath
.
tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]
You can get parameters with following code:
def param(key, value):
if key in mlsql.fit_param:
res = mlsql.fit_param[key]
else:
res = value
return res
featureCol = param("featureCol", "features")
labelCol = param("labelCol", "label")
and this is how the parameters looks like in MLSQL script:
train data1 as PythonAlg.`/tmp/python/nlp` where
-- algorithm parameters which will be accepted by Sklearn
and fitParam.0.labelCol="label"
and fitParam.0.featureCol="features"
and fitParam.0.maxDepth="3";
You should put your model in tempModelLocalPath
, and MLSQL Engine will copy them to HDFS automatically. Please check tempModelLocalPath is weather exist, if not, create it with python code.
tempModelLocalPath = mlsql.internal_system_param["tempModelLocalPath"]
The python code is not difficult, and the only thing you should be careful is how to convert the JSON data to vector. The JSON format is like this:
-- dense vector
{"features":{"type":1,"values":[...]}}
-- sparse vector
{"features":{"type":0,"size":[],"indices":[],"values":[...]}}
When the filed type is 1, this means the JSON object represents a dense vector and if the type is 0, the JSON object represents a sparse vector. The example code above converts all JSON object to parse vector.
The main purpose of introducing py_main.mlsql is telling you how to import other python files in your python script.
set py_main='''
import py_train
py_train.train()
''';
load script.`py_main` as py_main;
Once we had trained our model successfully, we will use it to predict in ETL or provide an API service to serving HTTP/RPC requests. Batch predict is used as the first purpose. Again, with module mlsql, you can get information like this:
set py_batch_predict='''
from __future__ import absolute_import
import numpy as np
import os
import json
import sys
import pickle
import scipy.sparse as sp
import importlib
import mlsql
from pyspark.mllib.linalg import Vectors, SparseVector
# this is a json file
tempDataLocalPath = mlsql.internal_system_param["tempDataLocalPath"]
# this is also a json file, it looks like `/tmp/__mlsql__/2d10dd4b-385e-44a7-ac0e-45d1c0fcaf4b/output/output-0.json`
tempOutputLocalPath = mlsql.internal_system_param["tempOutputLocalPath"]
# location where model is
tempModelLocalPath = mlsql.internal_system_param["tempModelLocalPath"]
model = pickle.load(open(tempModelLocalPath+"/model.pkl", "rb"))
# get featureCol from trainParams
featureCol=mlsql.params()["trainParams"]["fitParam.0.featureCol"]
with open(tempOutputLocalPath,"w") as output:
with open(tempDataLocalPath) as f:
for line in f.readlines():
obj = json.loads(line)
fc = obj[featureCol]
# convert json to SparseVector
if fc["type"] == 1:
feature_size = len(fc["values"])
dic = [(i, a) for i, a in enumerate(fc["values"])]
sv = SparseVector(feature_size, dic)
elif fc["type"] == 0:
feature_size = fc["size"]
sv = Vectors.sparse(feature_size, list(zip(fc["indices"], fc["values"])))
res = model.predict([sv.toArray()]).tolist()
output.write(json.dumps({'predicted':res})+"\n")
''';
load script.`py_batch_predict` as py_batch_predict;
We can register model as a function as we did in the first article, but we should tell the system how to implmentt this:
Here is py_predict.mlsql:
set py_predict='''
from __future__ import absolute_import
from pyspark.ml.linalg import VectorUDT, Vectors
import pickle
import os
import python_fun
# vector in vector out, and we only support pyspark vector.
def predict(index, s):
# get the model path; just write like this always.
items = [i for i in s]
modelPath = pickle.loads(items[1])[0] + "/model.pkl"
# we should keep the model registered only once and this function
# will be serialized, so the normal way will not work.
if not hasattr(os, "mlsql_models"):
setattr(os, "mlsql_models", {})
if modelPath not in os.mlsql_models:
print("Load sklearn model %s" % modelPath)
os.mlsql_models[modelPath] = pickle.load(open(modelPath, "rb"))
# get the model
model = os.mlsql_models[modelPath]
# load the vector; Notice that api predict we only supports vector
rawVector = pickle.loads(items[0])
feature = VectorUDT().deserialize(rawVector)
# predict
y = model.predict([feature.toArray()])
# return vector
return [VectorUDT().serialize(Vectors.dense(y))]
python_fun.udf(predict)
''';
load script.`py_predict` as py_predict;
Notice that module python_fun
is also generated by MLSQL engine.
main.mlsql
First, mock some data:
set jsonStr='''
{"features":[5.1,3.5,1.4,0.2],"label":0.0},
{"features":[5.1,3.5,1.4,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[4.4,2.9,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[4.7,3.2,1.3,0.2],"label":1.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
{"features":[5.1,3.5,1.4,0.2],"label":0.0}
''';
load jsonStr.`jsonStr` as data;
select vec_dense(features) as features ,label as label from data
as data1;
then, include all files we have written:
-- include python script
include store1.`alg.python.text_classify.py_env.mlsql`;
include store1.`alg.python.text_classify.py_train.mlsql`;
include store1.`alg.python.text_classify.py_batch_predict.mlsql`;
include store1.`alg.python.text_classify.py_predict.mlsql`;
include store1.`alg.python.text_classify.py_main.mlsql`;
Create python env:
-- run command as PythonEnvExt.`/tmp/python/nlp` where condaFile="dependencies" and command="create";
!createPythonEnv /tmp/python/nlp py_env;
Use ET PythonAlg to train:
train data1 as PythonAlg.`/tmp/python/nlp` where
-- global configuration
keepVersion="true"
and dataLocalFormat="json"
and dataLocalFileNum="-1"
and generateProjectOnly="true"
-- configure python project
and scripts="py_train,py_main,py_batch_predict,py_predict"
and entryPoint="py_main"
and apiPredictEntryPoint="py_predict"
and condaFile="py_env"
-- algorithm parameters which will be accepted by Sklearn
and fitParam.0.labelCol="label"
and fitParam.0.featureCol="features"
and fitParam.0.maxDepth="3";
register the model as a function:
-- Ok, let's convert the model to a function
register PythonAlg.`/tmp/python/nlp` as rf_predict;
select rf_predict(vec_dense(array(5.1,3.5,1.4,0.2))) as predicted as output;
or batch predict all data:
-- you can also predict the table
predict data1 as PythonAlg.`/tmp/python/nlp`;