前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >pyspark-ml学习笔记:如何在pyspark ml管道中添加自己的函数作为custom stage?

pyspark-ml学习笔记:如何在pyspark ml管道中添加自己的函数作为custom stage?

作者头像
MachineLP
发布2019-08-29 11:39:22
3.1K0
发布2019-08-29 11:39:22
举报
文章被收录于专栏:小鹏的专栏小鹏的专栏

问题是这样的,有时候spark ml pipeline中的函数不够用,或者是我们自己定义的一些数据预处理的函数,这时候应该怎么扩展呢? 扩展后保持和pipeline相同的节奏,可以保存加载然后transform。

经过搜索有答案了,问题:How to add my own function as a custom stage in a ML pyspark Pipeline?

可以参考:

(1)https://stackoverflow.com/questions/51415784/how-to-add-my-own-function-as-a-custom-stage-in-a-ml-pyspark-pipeline

(2)https://stackoverflow.com/questions/41399399/serialize-a-custom-transformer-using-python-to-be-used-within-a-pyspark-ml-pipel

(3)https://stackoverflow.com/questions/32331848/create-a-custom-transformer-in-pyspark-ml

测试代码如下:(pyspark缺失值处理) (pyspark使用可以参考这个:https://cloud.tencent.com/developer/article/1436179 )

代码语言:javascript
复制
#!/usr/bin/env python
# -*- coding:utf8 -*-

"""
------------------------------------------------- 
   Description :  pyspark测试 
   Author :       liupeng 
   Date :         2019/8/13 
------------------------------------------------- 

"""


'''
How to add my own function as a custom stage in a ML pyspark Pipeline? 
如何在pyspark ml管道中添加自己的函数作为custom stage?
参考:https://stackoverflow.com/questions/51415784/how-to-add-my-own-function-as-a-custom-stage-in-a-ml-pyspark-pipeline 
模型保存:https://stackoverflow.com/questions/41399399/serialize-a-custom-transformer-using-python-to-be-used-within-a-pyspark-ml-pipel 
'''


from start_pyspark import spark, sc, sqlContext  
import pyspark.sql.functions as F 
from pyspark.ml import Pipeline, Transformer 
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import col, mean, min
from pyspark.sql import DataFrame
from typing import Iterable 
import pandas as pd

# CUSTOM TRANSFORMER ----------------------------------------------------------------
class ColumnDropper(Transformer):
    """
    A custom Transformer which drops all columns that have at least one of the
    words from the banned_list in the name.
    """

    def __init__(self, banned_list: Iterable[str]):
        super(ColumnDropper, self).__init__()
        self.banned_list = banned_list

    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.drop(*[x for x in df.columns if any(y in x for y in self.banned_list)])
        # df = df.na.drop() 
        return df



class DefaultProcessing(Transformer):
    def __init__( self, mode = {'col_' : 'default'}, value=1.0 ):
        self.mode = mode
        self.value = value

    def _transform(self, df):
        for col_ in df.columns:
            try:
                if self.mode[ col_ ] == 'default':
                    df = self.missing_value_fill_default( df, col_ )
                if self.mode[ col_ ] == 'mean':
                    df = self.missing_value_fill_mean( df, col_ )
                if self.mode[ col_ ] == 'customize':
                    df = self.missing_value_fill_customize( df, col_, self.value )
            except :
               continue
        return df 

    def missing_value_fill_default(self, df, col_):
        '''
        以 min((min-01),-01)填充缺失值
        :param col: 需要进行(最小值-01)进行填充的特征名称
        :return: 修改完后的数据 列名 填充的值
        '''
        # fill_value = df.select( min(col_) ).collect()
        fill_value = df.agg( min( col( col_ ) ) ).collect()[0][0]
        df = df.na.fill( fill_value, subset=[ col_ ] ) 
        return df

    def missing_value_fill_mean(self, df, col_):
        '''
        以 平均值进行填充缺失值
        :param col: 需要用平均值进行填充的特征名称
        :return: 修改完后的数据 列名 填充的值
        '''
        # fill_value = df.select( mean(col_) ).collect()
        fill_value = df.agg( mean( col( col_ ) ) ).collect()[0][0]
        df = df.na.fill( fill_value, subset=[ col_ ] ) 
        return df

    def missing_value_fill_customize(self, df, col_, value):
        '''
        以设定值进行填充缺失值
        :param col: 需要用设定值进行填充的特征名称
        :return: 修改完后的数据 列名 填充的值
        '''
        # df = df.select( col_ ).na.fill( value )
        df = df.na.fill( value, subset=[ col_ ] ) 
        return df




# SAMPLE DATA -----------------------------------------------------------------------
df = pd.DataFrame({'ball_column': [0,1,2,3,4,5,6],
                   'keep_the': [6,5,4,3,2,1,0],
                   'hall_column': [2,2,2,2,2,2,2] })
df = spark.createDataFrame(df)


# EXAMPLE 1: USE THE TRANSFORMER WITHOUT PIPELINE -----------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
df_example = column_dropper.transform(df)


# EXAMPLE 2: USE THE TRANSFORMER WITH PIPELINE --------------------------------------
column_dropper = ColumnDropper(banned_list = ["ball","fall","hall"])
bagging = Bucketizer(
        splits=[-float("inf"), 3, float("inf")],
        inputCol= 'keep_the',
        outputCol="keep_the_bucket")
model = Pipeline(stages=[column_dropper,bagging]).fit(df)
bucketedData = model.transform(df)
bucketedData.show()



df = pd.DataFrame({'ball_column': [0,1,None,3,4,5,6],
                   'keep_the': [6,5,4,3,2,1,0],
                   'hall_column': [2,2,2,2,2,2,None] })
df = spark.createDataFrame(df)
# EXAMPLE 3: USE THE TRANSFORMER WITH PIPELINE --------------------------------------
column_dropper = DefaultProcessing( mode = {'ball_column':'default', 'hall_column':'customize'} )
bagging = Bucketizer(
        splits=[-float("inf"), 3, float("inf")],
        inputCol= 'keep_the',
        outputCol="keep_the_bucket")
model = Pipeline(stages=[column_dropper,bagging]).fit(df)
bucketedData = model.transform(df)
bucketedData.show() 
# model.write().overwrite().save( 'test.model' )


'''
print ('================================================')
df = pd.DataFrame({'ball_column': [None,None,None,3,4,5,6],
                   'keep_the': [6,5,4,3,2,1,0],
                   'hall_column': [2,2,2,2,None,None,None] })
PipelineModel.load( 'test.model' )
y = model.transform(df)
y.show() 
'''

'''
+--------+---------------+
|keep_the|keep_the_bucket|
+--------+---------------+
|       6|            1.0|
|       5|            1.0|
|       4|            1.0|
|       3|            1.0|
|       2|            0.0|
|       1|            0.0|
|       0|            0.0|
+--------+---------------+
'''



from pyspark import keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasOutputCols, Param, Params
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.functions import lit # for the dummy _transform
from pyspark.ml import Pipeline, PipelineModel

class SetValueTransformer(
    Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable,
):
    value = Param(
        Params._dummy(),
        "value",
        "value to fill",
    )

    @keyword_only
    def __init__(self, outputCols=None, value=0.0):
        super(SetValueTransformer, self).__init__()
        self._setDefault(value=0.0)
        kwargs = self._input_kwargs
        self._set(**kwargs)

    @keyword_only
    def setParams(self, outputCols=None, value=0.0):
        """
        setParams(self, outputCols=None, value=0.0)
        Sets params for this SetValueTransformer.
        """
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def setValue(self, value):
        """
        Sets the value of :py:attr:`value`.
        """
        return self._set(value=value)

    def getValue(self):
        """
        Gets the value of :py:attr:`value` or its default value.
        """
        return self.getOrDefault(self.value) 

    def _transform(self, dataset):
        # for col in self.getOutputCols():
            # dataset = dataset.withColumn(col, lit(self.getValue()))
        for col_ in dataset.columns: 
            # fill_value = df.agg( min( col( col_ ) ) ).collect()[0][0]
            # dataset = dataset.na.fill( fill_value, subset=[ col_ ] ) 
            fill_value = dataset.agg( mean( col( col_ ) ) ).collect()[0][0]
            dataset = dataset.na.fill( fill_value, subset=[ col_ ] ) 
        return dataset


from pyspark.ml import Pipeline

svt = SetValueTransformer(outputCols=["a", "b"], value=123.0)

p = Pipeline(stages=[svt])
df = sc.parallelize([(1, None), (2, 1.0), (3, 0.5)]).toDF(["key", "value"])
pm = p.fit(df)
pm.transform(df).show()
pm.write().overwrite().save('./test/test.model')
pm2 = PipelineModel.load('./test/test.model')
print('matches?', pm2.stages[0].extractParamMap() == pm.stages[0].extractParamMap())
pm2.transform(df).show()

start_pyspark.py

代码语言:javascript
复制
import os
import sys
 
#下面这些目录都是你自己机器的Spark安装目录和Java安装目录
os.environ['SPARK_HOME'] = "/Users/***/spark-2.4.3-bin-hadoop2.7/" 

sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/bin") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/pyspark") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/lib") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip") 
sys.path.append("/Users/***/spark-2.4.3-bin-hadoop2.7/lib/py4j-0.9-src.zip") 
# sys.path.append("/Library/Java/JavaVirtualMachines/jdk1.8.0_144.jdk/Contents/Home") 
os.environ['JAVA_HOME'] = "/Library/Java/JavaVirtualMachines/jdk1.8.0_181.jdk/Contents/Home" 

from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkConf, SparkContext 
conf = SparkConf().setMaster("local").setAppName("My App") 
sc = SparkContext(conf = conf) 
spark = SparkSession.builder.appName('CalculatingGeoDistances').getOrCreate() 
sqlContext = SQLContext(sparkContext=sc) 
本文参与 腾讯云自媒体分享计划,分享自作者个人站点/博客。
原始发表:2019年08月13日,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档