我正在开发一个基于web的分析应用程序,将通过UI提供模型培训和测试功能。为了做到这一点,我使用了django和scikit learn。
django with spark on local machine (windows)
现在,我想使用spark在大数据规模上做到这一点。使用django作为后端框架处理请求,使用spark进行处理和建模
Django+pyspark on local machine (windows) and spark on remote cluster
我设置了一个django项目,并在一个由两台linux机器和hdfs组成的集群上设置了spark。
我假设已经实现了向该hdfs上传/下载/流式传输数据。
我将每个模型编写为django项目中的一个视图,视图的实现具有使用pyspark编写的代码。我使用pyspark创建了一个到linux集群上的spark设置的连接。
import pandas as pd
import numpy as np
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
def sample_model_code(trainData, trainDataFileType, trainDataDelimiter,
testData, testDataFileType, testDataDelimiter,
targetIndexTrainData, targetIndexTestData,
modelName):
# trainData = "D:/training-data.csv"
# trainDataFileType = "csv"
# trainDataDelimiter = ","
# testData = "D:/test-data.csv"
# testDataFileType = "csv"
# testDataDelimiter = ","
# targetIndexTrainData = 44
# targetIndexTestData = 44
# modelName = "test_model"
conf = SparkConf().setMaster("local").setAppName("creditDecisonApp2")
sc = SparkContext(conf = conf)
spark = SparkSession(sc)
# spark.conf.set("spark.sql.shuffle.partitions", "2")
training_data_set = spark.read.option("inferSchema", "true").option("header", "true").csv(trainData)
test_data_set = spark.read.option("inferSchema", "true").option("header", "true").csv(testData)
train_data_column_names = training_data_set.columns
train_data_target_variable = train_data_column_names[targetIndexTrainData]
test_data_column_names = test_data_set.columns
test_data_target_variable = test_data_column_names[targetIndexTestData]
train_data_numeric_cols = []
train_data_categorical_cols = []
test_data_numeric_cols = []
test_data_categorical_cols = []
for element in training_data_set.dtypes:
if 'int' in element[1] or 'double' in element[1]:
train_data_numeric_cols.append(element[0])
else:
train_data_categorical_cols.append(element[0])
for element in test_data_set.dtypes:
if 'int' in element[1] or 'double' in element[1]:
test_data_numeric_cols.append(element[0])
else:
test_data_categorical_cols.append(element[0])
stages_train = []
stages_test = []
for categoricalColumn in train_data_categorical_cols:
if categoricalColumn != train_data_target_variable:
stringIndexer = StringIndexer(inputCol = categoricalColumn, outputCol = categoricalColumn + 'Index')
stages_train += [stringIndexer]
label_stringIdx_train = StringIndexer(inputCol = train_data_target_variable, outputCol = 'label')
stages_train += [label_stringIdx_train]
assemblerInputsTrain = [c + "Index" for c in train_data_categorical_cols] + train_data_numeric_cols
assemblerTrain = VectorAssembler(inputCols=assemblerInputsTrain, outputCol="features")
stages_train += [assemblerTrain]
for categoricalColumn in test_data_categorical_cols:
if categoricalColumn != test_data_target_variable:
stringIndexer = StringIndexer(inputCol = categoricalColumn, outputCol = categoricalColumn + 'Index')
stages_test += [stringIndexer]
label_stringIdx_test = StringIndexer(inputCol = test_data_target_variable, outputCol = 'label')
stages_test += [label_stringIdx_test]
assemblerInputsTest = [c + "Index" for c in test_data_categorical_cols] + test_data_numeric_cols
assemblerTest = VectorAssembler(inputCols=assemblerInputsTest, outputCol="features")
stages_test += [assemblerTest]
pipeline_train = Pipeline(stages=stages_train)
pipeline_test = Pipeline(stages=stages_test)
pipeline_train_model = pipeline_train.fit(training_data_set)
pipeline_test_model = pipeline_test.fit(test_data_set)
train_df = pipeline_train_model.transform(training_data_set)
test_df = pipeline_test_model.transform(test_data_set)
dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 5)
dtModel = dt.fit(train_df)
predictions = dtModel.transform(test_df)
#TODO: save the clf as a pickle object
labelIndexer = StringIndexer().setInputCol(train_data_target_variable).setOutputCol("label").fit(training_data_set)
category_preds_col_name = "Predicted_" + test_data_target_variable
categoryConverter = IndexToString().setInputCol("prediction").setOutputCol(category_preds_col_name).setLabels(labelIndexer.labels)
converted = categoryConverter.transform(predictions)
result_df = converted.select(test_data_column_names + [category_preds_col_name])
location_temp = workingDirectory
result_file_name = location_temp +"/"+"credit_decision_predicted_data.csv"
result_df.coalesce(1).write.format('com.databricks.spark.csv').save(result_file_name,header = 'true')
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
eval_metrics_dict = dict()
eval_metrics_dict["accuracy"] = evaluator.evaluate(converted)
result_dict = dict()
result_dict["resultFilePath"] = os.path.normpath(result_file_name).replace(os.sep, "/")
result_dict["evaluationMetricsDetails"] = eval_metrics_dict
return (result_dict) Django应用程序在本地windows计算机上运行,Spark安装在本地windows计算机上,以上代码在使用django运行时工作
我的问题是,如果在远程linux集群上设置Spark,在本地windows计算机上设置django,并传递hdfs文件路径而不是数据的本地文件系统文件路径,是否可以
或者,是否有任何方法可以为这种类型的体系结构设置应用程序
发布于 2019-08-28 02:42:59
我可以通过在windows上安装pySpark和spark来做到这一点。然后通过连接到集群来执行spark代码。
我在两台Linux机器上建立了一个spark集群,然后在windows机器上安装了spark和pySpark。
然后,当spark session在windows上运行时,我将master设置为(spark cluster master),从那时起它就开始在集群上运行。
https://stackoverflow.com/questions/56749463
复制相似问题