前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pyspark-ml学习笔记:模型评估

pyspark-ml学习笔记:模型评估

作者头像
MachineLP
发布2019-08-29 14:07:50
1.2K0
发布2019-08-29 14:07:50
举报
文章被收录于专栏:小鹏的专栏小鹏的专栏

问题是这样的,如果我们想基于pyspark开发一个分布式机器训练平台,那么肯定需要对模型进行评估,而pyspark本身自带模型评估的api很少,想进行扩展的话有几种方案:

(1)使用udf自行编写代码进行扩展。

(2)使用现有的,像sklearn中的api。(不同框架的之间的切换往往需要转换数据结构)

例子如下所示:

代码语言:javascript
复制
'''
模型评估模块:
· pyspark api
· sklearn api
'''

import numpy as np
from pyspark.ml.linalg import Vectors
from start_pyspark import spark, sc, sqlContext 
from pyspark.ml.evaluation import BinaryClassificationEvaluator

scoreAndLabels = map(lambda x: (Vectors.dense([1.0 - x[0], x[0]]), x[1]), [(0.1, 0.0), (0.1, 1.0), (0.4, 0.0), (0.6, 0.0), (0.6, 1.0), (0.6, 1.0), (0.8, 1.0)])
dataset = spark.createDataFrame(scoreAndLabels, ["prediction", "label"])
evaluator = BinaryClassificationEvaluator(rawPredictionCol="prediction")
res = evaluator.evaluate(dataset)
print ( 'res>>>>>>', res)

res = evaluator.evaluate(dataset, {evaluator.metricName: "areaUnderPR"})
print ( 'areaUnderPR>>>>>>', res)

res = evaluator.evaluate(dataset, {evaluator.metricName: "areaUnderROC"})
print ( 'areaUnderROC>>>>>>', res)


bb = dataset.select(dataset["prediction"])
print ('bbbbbb>>>>>', bb.collect() )

print ('rdd>>>>>', dataset.rdd.collect() )



pandas_pd = dataset.toPandas()
print ('bb>>>>>', pandas_pd )
import numpy as np
print ('bb>>>>>', pandas_pd['prediction'].values.tolist() )

print ('==========================================================================================================')
from sklearn.metrics import precision_score
y_true = np.array( [0.0, 1, 2, 0, 1, 2] )
print ('y_true:', y_true)
y_pred = np.array(  [0, 2, 1, 0, 0, 1] )
res = precision_score(y_true, y_pred, average='macro')  
print('precision_score:', res)


print ('==========================================================================================================')
from sklearn.metrics import precision_score, recall_score, roc_auc_score, roc_curve, \
    accuracy_score, r2_score, f1_score

y_pred = np.array( [ np.array ( per_pd ) for per_pd in pandas_pd['prediction'].values ] )
print ( 'y_pred:', y_pred )
y = np.array( [ np.array ( per_pd ) for per_pd in pandas_pd['label'].values ] )
print ( 'y:', y )
print ( 'y_pred[:, 1]:',  y_pred[:, 1] )
# 需要概率值。
roc_auc_score_res = roc_auc_score(y, y_pred[:, 1])
print('roc_auc_score_res:', roc_auc_score_res)


def prob2label(prob, thres = 0.5):
    prob[ prob>thres ] = 1.0
    prob[ prob==thres ] = 1.0 
    prob[ prob<thres ] = 0.0 
    return prob

# 需要label值。
precision_score_res = precision_score(y, prob2label( y_pred[:, 1] ) )
print('precision_score_res:', precision_score_res)

recall_score_res = recall_score(y, prob2label( y_pred[:, 1] ) )
print('recall_score_res:', recall_score_res)

roc_curve_res = roc_curve(y, prob2label( y_pred[:, 1] ) )
print('roc_curve_res:', roc_curve_res)

accuracy_score_res = accuracy_score(y, prob2label( y_pred[:, 1] ) )
print('accuracy_score_res:', accuracy_score_res)

r2_score_res = r2_score(y, prob2label( y_pred[:, 1] ) )
print('r2_score_res:', r2_score_res)

f1_score_res = f1_score(y, prob2label( y_pred[:, 1] ) )
print('f1_score_res:', f1_score_res)

start_pyspark.py

代码语言:javascript
复制
import os
import sys
 
''' 
#下面这些目录都是你自己机器的Spark安装目录和Java安装目录
os.environ['SPARK_HOME'] = "/Users/***/spark-2.4.3-bin-hadoop2.7/" 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/bin") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/pyspark") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/lib") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/lib/py4j-0.9-src.zip") 
# sys.path.append("/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home") 
os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home" 
'''
 
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext 
#conf = SparkConf().setMaster("local").setAppName("My App") 
conf = SparkConf().setMaster("yarn").setAppName("My App") 
sc = SparkContext(conf = conf) 
spark = SparkSession.builder.appName('CalculatingGeoDistances').getOrCreate() 
sqlContext = SQLContext(sparkContext=sc)
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年08月21日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档