前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >在PySpark上使用XGBoost

在PySpark上使用XGBoost

作者头像
Sam Gor
发布2021-04-26 09:43:02
4.8K1
发布2021-04-26 09:43:02
举报
文章被收录于专栏:SAMshareSAMshareSAMshare

我这里提供一个pyspark的版本,参考了大家公开的版本。同时因为官网没有查看特征重要性的方法,所以自己写了一个方法。本方法没有保存模型,相信大家应该会。

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType,DoubleType,StringType,IntegerType
from pyspark.ml import Pipeline,PipelineModel
from xparkxgb import XGBoostClassifier,XGBoostRegressor
import logging
from datetime import date,timedalta
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler,MinAMaxScaler,IndexToString
conf = SparkConf()\
    .setExecutorEnv('','123')
spark = SparkSession \
    .builder \
    .config(conf=conf)\
    .appName('pyspark demo')
    .getOrCreate()
sc = spark.sparkContext

? 拉取数据

df = spark.sql("select *  from test_table where datadate='20200101'")
#删除不要的字段
df = df.drop("column2")
#选择字段-num_feature:数值,cat_feature:分类值
num_features = ["num1","num2"]
cat_features = ["cat1","cat2"]
label_columns = ["is_true_flag"]
df = df[num_features,cat_features+label_columns  ]
df = df.dropna()
df = df.na.replace('','NA')
df = df.fillna(0)
#change data type
for col in cat_features:
    df = df.withColumn(col,df[col].cast(StringType()))    
for col in num_features:
    df = df.withColumn(col,df[col].cast(DoubleType())) 
df = df.withColumn('is_true_flag',df['ist_true_flag'].cast(IntegerType()))

? 转onehot

#one-hot & standard scaler  
stages = []
for col in cat_features:
    # 字符串转成索引
    string_index = StringIndexer(inputCol = col, outputCol = col + 'Index')
    # 转换为OneHot编码
    encoder = OneHotEncoder(inputCol=string_index.getOutputCol(), outputCol=col + "_one_hot")
    # 将每个字段的转换方式 放到stages中
    stages += [string_index, encoder]

# 将income转换为索引
label_string_index = StringIndexer(inputCol = 'is_true_flag', outputCol = 'label')
# 添加到stages中
stages += [label_string_index]

# 类别变量 + 数值变量
assembler_cols = [c + "_one_hot" for c in cat_features] + num_features
assembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")
stages += [assembler]

# 使用pipeline完成数据处理
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df)
df = pipeline_model.transform(df)
train, test = df.randomSplit([0.7, 0.3], seed=2021)
print(train.count())
print(test.count())

? 创建模型

# 创建模型
xgb = XGBoostClassifier(featuresCol = 'features', labelCol = 'label',predictionCol='predict_val',missing=0.0,numRound=50,numWorkers=10)
preModel = xgb.fit(trainData)
out1 = preModel.transform(testData)

? 查看训练效果

###训练效果##
import pyspark.mllib.eveluation as ev
lr_results = out1.select(['predict_val','label']).rdd.map(lambda row:(row[0],row[1] * 1.0))
lr_ev =ev.BinaryClassificationMetrics(lr_results)
print (":Area under PR:{}".format(lr_ev.areaUnderPR))
print (":Area under ROC:{}".format(lr_ev.areaUnderROC))
tp = out1[(out.label == 1) & (out1.predict_val == 1)].count()
tn = out1[(out.label == 0) & (out1.predict_val == 0)].count()
fn = out1[(out.label == 0) & (out1.predict_val == 1)].count()
fn = out1[(out.label == 1) & (out1.predict_val == 0)].count()
print ('accuracy is : %f'%((tp+tn)/(tp+tn+fp+fn))) #准确率
print ('recall is : %f'%((tp)/(tp+fn))) #召回率
print ('precision is : %f'%((tp)/(tp+fp))) #精确率

? 特征解析

#特征解析
df.schema['features'].metadata
temp = df.schema["features"].metadata["ml_attr"]["attrs"]
df_importance = pd.DataFrame(columns=['idx', 'name'])
for attr in temp['numeric']:
    temp_df = {}
    temp_df['idx'] = attr['idx']
    temp_df['name'] = attr['name']
    #print(temp_df)
    df_importance = df_importance.append(temp_df, ignore_index=True)
    #print(attr['idx'], attr['name'])
    #print(attr)
    #break
df_importance
for attr in temp['binary']:
    temp_df = {}
    temp_df['idx'] = attr['idx']
    temp_df['name'] = attr['name']
    df_importance = df_importance.append(temp_df, ignore_index=True)
df_importance
#解析特征重要值
FeatureScoreMap = preModel.nativeBooster.getScore("","gain")
file_path ="C://Users//Administrator//Desktop//importance.csv"
file  = open(file_path,"w+")
print(FeatureScoreMap ,file = file)
file.close()
f1 = open(file_path)
line = f1.readline()
data=line.replace(',','\n').replace('->',',').replace('Map(','').replace(')','').replace('f','')
file  = open(file_path,"w+")
print(data,file = file)
file.close()
df_temp = pd.read_csv(file_path,header=None,names=["feature","weight"])
df_importance = df_importance.merge(df_temp, left_on="feature", right_on="feature")
df_importance.sort_values(by=['feature_importance'], ascending=False, inplace=True)
df_importance
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-04-22,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 SAMshare 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ? 拉取数据
  • ? 转onehot
  • ? 创建模型
  • ? 查看训练效果
  • ? 特征解析
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档