首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >Pyspark ML -随机森林分类器-一种不适用于标签的热编码

Pyspark ML -随机森林分类器-一种不适用于标签的热编码
EN

Stack Overflow用户
提问于 2020-06-30 14:42:59
回答 1查看 420关注 0票数 0

我尝试使用pyspark ml (spark 2.4.0)运行一个随机森林分类器,并使用OHE对目标标签进行编码。当我将标签作为整数(字符串索引器)输入时,该模型训练得很好,但当我使用OneHotCodeEstimator输入一个热编码的标签时,该模型就失败了。这是火花限制吗?

代码语言:javascript
运行
复制
#%%
# Test dataframe
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
tst=sqlContext.createDataFrame([(1,'01/01/2020','buy',10000,2000),(1,'01/01/2020','sell',10000,3000),(1,'02/01/2020','buy',10000,1000),(1,'02/01/2020','sell',1000,2000),(2,'01/01/2020','sell',1000,3000),(2,'02/01/2020','buy',1000,1000),(2,'02/01/2020','buy',1000,100)],schema=("id","date","transaction","limit","amount"))
# label pipeleing
str_indxr = StringIndexer(inputCol='transaction', outputCol="label")
ohe = OneHotEncoderEstimator(inputCols=['label'],outputCols=['label_ohe'],dropLast=False)
label_pipeline=Pipeline(stages=[str_indxr,ohe])
#%data data pipeleine
data_trans = label_pipeline.fit(tst).transform(tst)
vecAssembler = VectorAssembler(inputCols=["limit","amount"], outputCol="features",handleInvalid='skip')
classifier = RandomForestClassifier(featuresCol='features', labelCol='label_ohe')
data_pipeline = Pipeline(stages=[vecAssembler,classifier])

data_fit = data_pipeline.fit(data_trans)

我得到了这个错误:

代码语言:javascript
运行
复制
  ---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-18-f08a05d86e2c> in <module>()
      1 if(train_labdata_rf):
----> 2     pipeline_trained,accuracy,test_result_rf = train_test("rf",train_d,test_d)
      3     print("Test set accuracy = " + str(accuracy))
      4     #pipeline_trained.write().overwrite().save("/projects/projectwbvplatformpc/dev/PS-ET_Pipeline/CDM_Classifier/output/pyspark_classifier/pipelines/random_forest")
      5 else:

<ipython-input-4-9709037baa80> in train_test(modelname, train_data, test_data)
     11     """
     12     pipeline=create_pipeline(modelname)
---> 13     pipeline_fit = pipeline.fit(train_data)
     14 
     15     result = pipeline_fit.transform(test_d)

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
    107                     dataset = stage.transform(dataset)
    108                 else:  # must be an Estimator
--> 109                     model = stage.fit(dataset)
    110                     transformers.append(model)
    111                     if i < indexOfLastEstimator:

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    293 
    294     def _fit(self, dataset):
--> 295         java_model = self._fit_java(dataset)
    296         model = self._create_model(java_model)
    297         return self._copyValues(model)

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    290         """
    291         self._transfer_params_to_java()
--> 292         return self._java_obj.fit(dataset._jdf)
    293 
    294     def _fit(self, dataset):

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     77                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
     78             if s.startswith('java.lang.IllegalArgumentException: '):
---> 79                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     80             raise
     81     return deco

IllegalArgumentException: u'requirement failed: Column label_ohe must be of type numeric but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'

我找不到任何合适的资源。任何建议都会很有帮助。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-06-30 16:31:56

编辑: pyspark不支持向量作为目标标签,因此只支持字符串编码。

有问题的代码是-

代码语言:javascript
运行
复制
classifier = RandomForestClassifier(featuresCol='features', labelCol='label_ohe')

问题出在labelCol= label_ohe的类型上,它必须是NumericType的实例

OHE的输出类型为Vector

ref - spark-git

直接使用StringIndexer输出,如-

代码语言:javascript
运行
复制
# Test dataframe
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
tst=sqlContext.createDataFrame([(1,'01/01/2020','buy',10000,2000),(1,'01/01/2020','sell',10000,3000),(1,'02/01/2020','buy',10000,1000),(1,'02/01/2020','sell',1000,2000),(2,'01/01/2020','sell',1000,3000),(2,'02/01/2020','buy',1000,1000),(2,'02/01/2020','buy',1000,100)],schema=("id","date","transaction","limit","amount"))
# label pipeleing
str_indxr = StringIndexer(inputCol='transaction', outputCol="label")
label_pipeline=Pipeline(stages=[str_indxr])
#%data data pipeleine
data_trans = label_pipeline.fit(tst).transform(tst)
vecAssembler = VectorAssembler(inputCols=["limit","amount"], outputCol="features",handleInvalid='skip')
classifier = RandomForestClassifier(featuresCol='features', labelCol='label')
data_pipeline = Pipeline(stages=[vecAssembler,classifier])

data_fit = data_pipeline.fit(data_trans)
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62651679

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档