专栏首页SAMshare在PySpark上使用XGBoost

在PySpark上使用XGBoost

我这里提供一个pyspark的版本,参考了大家公开的版本。同时因为官网没有查看特征重要性的方法,所以自己写了一个方法。本方法没有保存模型,相信大家应该会。

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType,DoubleType,StringType,IntegerType
from pyspark.ml import Pipeline,PipelineModel
from xparkxgb import XGBoostClassifier,XGBoostRegressor
import logging
from datetime import date,timedalta
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler,MinAMaxScaler,IndexToString
conf = SparkConf()\
    .setExecutorEnv('','123')
spark = SparkSession \
    .builder \
    .config(conf=conf)\
    .appName('pyspark demo')
    .getOrCreate()
sc = spark.sparkContext

? 拉取数据

df = spark.sql("select *  from test_table where datadate='20200101'")
#删除不要的字段
df = df.drop("column2")
#选择字段-num_feature:数值,cat_feature:分类值
num_features = ["num1","num2"]
cat_features = ["cat1","cat2"]
label_columns = ["is_true_flag"]
df = df[num_features,cat_features+label_columns  ]
df = df.dropna()
df = df.na.replace('','NA')
df = df.fillna(0)
#change data type
for col in cat_features:
    df = df.withColumn(col,df[col].cast(StringType()))    
for col in num_features:
    df = df.withColumn(col,df[col].cast(DoubleType())) 
df = df.withColumn('is_true_flag',df['ist_true_flag'].cast(IntegerType()))

? 转onehot

#one-hot & standard scaler  
stages = []
for col in cat_features:
    # 字符串转成索引
    string_index = StringIndexer(inputCol = col, outputCol = col + 'Index')
    # 转换为OneHot编码
    encoder = OneHotEncoder(inputCol=string_index.getOutputCol(), outputCol=col + "_one_hot")
    # 将每个字段的转换方式 放到stages中
    stages += [string_index, encoder]

# 将income转换为索引
label_string_index = StringIndexer(inputCol = 'is_true_flag', outputCol = 'label')
# 添加到stages中
stages += [label_string_index]

# 类别变量 + 数值变量
assembler_cols = [c + "_one_hot" for c in cat_features] + num_features
assembler = VectorAssembler(inputCols=assembler_cols, outputCol="features")
stages += [assembler]

# 使用pipeline完成数据处理
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df)
df = pipeline_model.transform(df)
train, test = df.randomSplit([0.7, 0.3], seed=2021)
print(train.count())
print(test.count())

? 创建模型

# 创建模型
xgb = XGBoostClassifier(featuresCol = 'features', labelCol = 'label',predictionCol='predict_val',missing=0.0,numRound=50,numWorkers=10)
preModel = xgb.fit(trainData)
out1 = preModel.transform(testData)

? 查看训练效果

###训练效果##
import pyspark.mllib.eveluation as ev
lr_results = out1.select(['predict_val','label']).rdd.map(lambda row:(row[0],row[1] * 1.0))
lr_ev =ev.BinaryClassificationMetrics(lr_results)
print (":Area under PR:{}".format(lr_ev.areaUnderPR))
print (":Area under ROC:{}".format(lr_ev.areaUnderROC))
tp = out1[(out.label == 1) & (out1.predict_val == 1)].count()
tn = out1[(out.label == 0) & (out1.predict_val == 0)].count()
fn = out1[(out.label == 0) & (out1.predict_val == 1)].count()
fn = out1[(out.label == 1) & (out1.predict_val == 0)].count()
print ('accuracy is : %f'%((tp+tn)/(tp+tn+fp+fn))) #准确率
print ('recall is : %f'%((tp)/(tp+fn))) #召回率
print ('precision is : %f'%((tp)/(tp+fp))) #精确率

? 特征解析

#特征解析
df.schema['features'].metadata
temp = df.schema["features"].metadata["ml_attr"]["attrs"]
df_importance = pd.DataFrame(columns=['idx', 'name'])
for attr in temp['numeric']:
    temp_df = {}
    temp_df['idx'] = attr['idx']
    temp_df['name'] = attr['name']
    #print(temp_df)
    df_importance = df_importance.append(temp_df, ignore_index=True)
    #print(attr['idx'], attr['name'])
    #print(attr)
    #break
df_importance
for attr in temp['binary']:
    temp_df = {}
    temp_df['idx'] = attr['idx']
    temp_df['name'] = attr['name']
    df_importance = df_importance.append(temp_df, ignore_index=True)
df_importance
#解析特征重要值
FeatureScoreMap = preModel.nativeBooster.getScore("","gain")
file_path ="C://Users//Administrator//Desktop//importance.csv"
file  = open(file_path,"w+")
print(FeatureScoreMap ,file = file)
file.close()
f1 = open(file_path)
line = f1.readline()
data=line.replace(',','\n').replace('->',',').replace('Map(','').replace(')','').replace('f','')
file  = open(file_path,"w+")
print(data,file = file)
file.close()
df_temp = pd.read_csv(file_path,header=None,names=["feature","weight"])
df_importance = df_importance.merge(df_temp, left_on="feature", right_on="feature")
df_importance.sort_values(by=['feature_importance'], ascending=False, inplace=True)
df_importance

本文分享自微信公众号 - SAMshare(gh_8528ce7b7e80),作者:量化达仔

原文出处及转载信息见文内详细说明,如有侵权,请联系 yunjia_community@tencent.com 删除。

原始发表时间:2021-04-22

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • 0835-5.16.2-如何按需加载Python依赖包到Spark集群

    在开发Pyspark代码时,经常会用到Python的依赖包。在PySpark的分布式运行的环境下,要确保所有节点均存在我们用到的Packages,本篇文章主要介...

    Fayson
  • pyspark-ml学习笔记:pyspark下使用xgboost进行分布式训练

    问题是这样的,如果我们想基于pyspark开发一个分布式机器训练平台,而xgboost是不可或缺的模型,但是pyspark ml中没有对应的API,这时候我们需...

    MachineLP
  • 《大数据+AI在大健康领域中最佳实践前瞻》---- 基于 pyspark + xgboost 算法的 欺诈检测 DEMO实践

    银行需要面对数量不断上升的欺诈案件。随着新技术的出现,欺诈事件的实例将会成倍增加,银行很难检查每笔交易并手动识别欺诈模式。RPA使用“if-then”方法识别潜...

    流川疯
  • 闲话 Spark 的一个重要改变

    最近看到了 Apache Spark 发布了 3.2 版本的预告 Pandas API on Upcoming Apache Spark™ 3.2,文章写得很简...

    哒呵呵
  • pyspark-ml学习笔记:逻辑回归、GBDT、xgboost参数介绍

    逻辑回归、GBDT可以参考pyspark开发文档:http://spark.apache.org/docs/latest/api/python/pyspark....

    MachineLP
  • Jupyter在美团民宿的应用实践

    做算法的同学对于Kaggle应该都不陌生,除了举办算法挑战赛以外,它还提供了一个学习、练习数据分析和算法开发的平台。Kaggle提供了Kaggle Kernel...

    美团技术团队
  • [1014]PySpark使用笔记

    PySpark 通过 RPC server 来和底层的 Spark 做交互,通过 Py4j 来实现利用 API 调用 Spark 核心。 Spark (wri...

    周小董
  • 在mac上安装Xgboost Python库

    最近在mac上用到xgboost库,安装时遇到颇多大坑,网上查了很多答案几乎都是win上的问题,没遇到理想的,自己也就摸着石头把几个大坑给填了,总结一下,给后...

    MachineLP
  • 【产品新闻】一份来自国际权威分析机构 Forrester 的认可

    在 Forrester 最新发布的《Now Tech: Predictive Analytics And Machine Learning In China, ...

    腾讯云TI平台
  • 大数据入门与实战-PySpark的使用教程

    Apache Spark是用Scala编程语言编写的。为了用Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpa...

    致Great
  • 金色传说,开源教程!属于算法的大数据工具-pyspark

    spark是目前大数据领域的核心技术栈,许多从事数据相关工作的小伙伴都想驯服它,变成"驯龙高手",以便能够驾驭成百上千台机器组成的集群之龙来驰骋于大数据之海。

    Sam Gor
  • 如何将PySpark导入Python的放实现(2种)

    优点:简单快捷 缺点:治标不治本,每次写一个新的Application都要加载一遍findspark

    砸漏
  • 数据岗面试:常用哪些Python第三方库?

    当下,数据从业者大多需要掌握Python语言,更准确的说要学会使用Python提供的一些主流第三方库。考虑眼下正值金三银四的找工作最佳时机,现将个人曾经历过的一...

    luanhz
  • 在python中使用pyspark读写Hive数据操作

    pyspark读取hive数据非常简单,因为它有专门的接口来读取,完全不需要像hbase那样,需要做很多配置,pyspark提供的操作hive的接口,使得程序可...

    砸漏
  • xgboost 库使用入门

    本文 github 地址:1-1 基本模型调用. ipynb,里面会记录自己kaggle大赛中的内容,欢迎start关注。

    zhuanxu
  • 使用CDSW和运营数据库构建ML应用2:查询/加载数据

    在本期中,我们将讨论如何执行“获取/扫描”操作以及如何使用PySpark SQL。之后,我们将讨论批量操作,然后再讨论一些故障排除错误。在这里阅读第一个博客。

    大数据杂货铺
  • 什么是Python中的Dask,它如何帮助你进行数据分析?

    Python由于其易用性而成为最流行的语言,它提供了许多库,使程序员能够开发更强大的软件,以并行运行模型和数据转换。

    HuangWeiAI
  • Eat pyspark 1st day | 快速搭建你的Spark开发环境

    下载地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-213...

    超哥的杂货铺
  • PySpark做数据处理

    PySpark是一种适合在大规模数据上做探索性分析,机器学习模型和ETL工作的优秀语言。若是你熟悉了Python语言和pandas库,PySpark适合你进一步...

    陆勤_数据人网

扫码关注云+社区

领取腾讯云代金券