问题是这样的,有时候spark ml pipeline中的函数不够用,或者是我们自己定义的一些数据预处理的函数,这时候应该怎么扩展呢? 扩展后保持和pipeline相同的节奏,可以保存加载然后transform。
经过搜索有答案了,问题:How to add my own function as a custom stage in a ML pyspark Pipeline?
可以参考:
(3)https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml
测试代码如下:(pyspark缺失值处理) (pyspark使用可以参考这个:https://cloud.tencent.com/developer/article/1436179 )
#!/usr/bin/env python
# -*- coding:utf8 -*-
"""
-------------------------------------------------
Description : pyspark测试
Author : liupeng
Date : 2019/8/13
-------------------------------------------------
"""
'''
How to add my own function as a custom stage in a ML pyspark Pipeline?
如何在pyspark ml管道中添加自己的函数作为custom stage?
参考:https://stackoverflow.com/questions/51415784/how-to-add-my-own-function-as-a-custom-stage-in-a-ml-pyspark-pipeline
模型保存:https://stackoverflow.com/questions/41399399/serialize-a-custom-transformer-using-python-to-be-used-within-a-pyspark-ml-pipel
'''
from start_pyspark import spark, sc, sqlContext
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import col, mean, min
from pyspark.sql import DataFrame
from typing import Iterable
import pandas as pd
# CUSTOM TRANSFORMER ----------------------------------------------------------------
class ColumnDropper(Transformer):
"""
A custom Transformer which drops all columns that have at least one of the
words from the banned_list in the name.
"""
def __init__(self, banned_list: Iterable[str]):
super(ColumnDropper, self).__init__()
self.banned_list = banned_list
def _transform(self, df: DataFrame) -> DataFrame:
df = df.drop(*[x for x in df.columns if any(y in x for y in self.banned_list)])
# df = df.na.drop()
return df
class DefaultProcessing(Transformer):
def __init__( self, mode = {'col_' : 'default'}, value=1.0 ):
self.mode = mode
self.value = value
def _transform(self, df):
for col_ in df.columns:
try:
if self.mode[ col_ ] == 'default':
df = self.missing_value_fill_default( df, col_ )
if self.mode[ col_ ] == 'mean':
df = self.missing_value_fill_mean( df, col_ )
if self.mode[ col_ ] == 'customize':
df = self.missing_value_fill_customize( df, col_, self.value )
except :
continue
return df
def missing_value_fill_default(self, df, col_):
'''
以 min((min-01),-01)填充缺失值
:param col: 需要进行(最小值-01)进行填充的特征名称
:return: 修改完后的数据 列名 填充的值
'''
# fill_value = df.select( min(col_) ).collect()
fill_value = df.agg( min( col( col_ ) ) ).collect()[0][0]
df = df.na.fill( fill_value, subset=[ col_ ] )
return df
def missing_value_fill_mean(self, df, col_):
'''
以 平均值进行填充缺失值
:param col: 需要用平均值进行填充的特征名称
:return: 修改完后的数据 列名 填充的值
'''
# fill_value = df.select( mean(col_) ).collect()
fill_value = df.agg( mean( col( col_ ) ) ).collect()[0][0]
df = df.na.fill( fill_value, subset=[ col_ ] )
return df
def missing_value_fill_customize(self, df, col_, value):
'''
以设定值进行填充缺失值
:param col: 需要用设定值进行填充的特征名称
:return: 修改完后的数据 列名 填充的值
'''
# df = df.select( col_ ).na.fill( value )
df = df.na.fill( value, subset=[ col_ ] )
return df
# SAMPLE DATA -----------------------------------------------------------------------
df = pd.DataFrame({'ball_column': [0,1,2,3,4,5,6],
'keep_the': [6,5,4,3,2,1,0],
'hall_column': [2,2,2,2,2,2,2] })
df = spark.createDataFrame(df)
# EXAMPLE 1: USE THE TRANSFORMER WITHOUT PIPELINE -----------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
df_example = column_dropper.transform(df)
# EXAMPLE 2: USE THE TRANSFORMER WITH PIPELINE --------------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
bagging = Bucketizer(
splits=[-float("inf"), 3, float("inf")],
inputCol= 'keep_the',
outputCol="keep_the_bucket")
model = Pipeline(stages=[column_dropper,bagging]).fit(df)
bucketedData = model.transform(df)
bucketedData.show()
df = pd.DataFrame({'ball_column': [0,1,None,3,4,5,6],
'keep_the': [6,5,4,3,2,1,0],
'hall_column': [2,2,2,2,2,2,None] })
df = spark.createDataFrame(df)
# EXAMPLE 3: USE THE TRANSFORMER WITH PIPELINE --------------------------------------
column_dropper = DefaultProcessing( mode = {'ball_column':'default', 'hall_column':'customize'} )
bagging = Bucketizer(
splits=[-float("inf"), 3, float("inf")],
inputCol= 'keep_the',
outputCol="keep_the_bucket")
model = Pipeline(stages=[column_dropper,bagging]).fit(df)
bucketedData = model.transform(df)
bucketedData.show()
# model.write().overwrite().save( 'test.model' )
'''
print ('================================================')
df = pd.DataFrame({'ball_column': [None,None,None,3,4,5,6],
'keep_the': [6,5,4,3,2,1,0],
'hall_column': [2,2,2,2,None,None,None] })
PipelineModel.load( 'test.model' )
y = model.transform(df)
y.show()
'''
'''
+--------+---------------+
|keep_the|keep_the_bucket|
+--------+---------------+
| 6| 1.0|
| 5| 1.0|
| 4| 1.0|
| 3| 1.0|
| 2| 0.0|
| 1| 0.0|
| 0| 0.0|
+--------+---------------+
'''
from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import lit # for the dummy _transform
from pyspark.ml import Pipeline, PipelineModel
class SetValueTransformer(
Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable,
):
value = Param(
Params._dummy(),
"value",
"value to fill",
)
@keyword_only
def __init__(self, outputCols=None, value=0.0):
super(SetValueTransformer, self).__init__()
self._setDefault(value=0.0)
kwargs = self._input_kwargs
self._set(**kwargs)
@keyword_only
def setParams(self, outputCols=None, value=0.0):
"""
setParams(self, outputCols=None, value=0.0)
Sets params for this SetValueTransformer.
"""
kwargs = self._input_kwargs
return self._set(**kwargs)
def setValue(self, value):
"""
Sets the value of :py:attr:`value`.
"""
return self._set(value=value)
def getValue(self):
"""
Gets the value of :py:attr:`value` or its default value.
"""
return self.getOrDefault(self.value)
def _transform(self, dataset):
# for col in self.getOutputCols():
# dataset = dataset.withColumn(col, lit(self.getValue()))
for col_ in dataset.columns:
# fill_value = df.agg( min( col( col_ ) ) ).collect()[0][0]
# dataset = dataset.na.fill( fill_value, subset=[ col_ ] )
fill_value = dataset.agg( mean( col( col_ ) ) ).collect()[0][0]
dataset = dataset.na.fill( fill_value, subset=[ col_ ] )
return dataset
from pyspark.ml import Pipeline
svt = SetValueTransformer(outputCols=["a", "b"], value=123.0)
p = Pipeline(stages=[svt])
df = sc.parallelize([(1, None), (2, 1.0), (3, 0.5)]).toDF(["key", "value"])
pm = p.fit(df)
pm.transform(df).show()
pm.write().overwrite().save('./test/test.model')
pm2 = PipelineModel.load('./test/test.model')
print('matches?', pm2.stages[0].extractParamMap() == pm.stages[0].extractParamMap())
pm2.transform(df).show()
start_pyspark.py
import os
import sys
#下面这些目录都是你自己机器的Spark安装目录和Java安装目录
os.environ['SPARK_HOME'] = "/Users/***/spark-2.4.3-bin-hadoop2.7/"
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/bin")
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python")
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/pyspark")
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/lib")
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip")
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/lib/py4j-0.9-src.zip")
# sys.path.append("/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home")
os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home"
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("My App")
sc = SparkContext(conf = conf)
spark = SparkSession.builder.appName('CalculatingGeoDistances').getOrCreate()
sqlContext = SQLContext(sparkContext=sc)