首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用django和spark框架的基于web的分析应用程序的部署架构

使用django和spark框架的基于web的分析应用程序的部署架构
EN

Stack Overflow用户
提问于 2019-06-25 16:04:02
回答 1查看 125关注 0票数 0

我正在开发一个基于web的分析应用程序,将通过UI提供模型培训和测试功能。为了做到这一点,我使用了django和scikit learn。

django with spark on local machine (windows)

现在,我想使用spark在大数据规模上做到这一点。使用django作为后端框架处理请求,使用spark进行处理和建模

Django+pyspark on local machine (windows) and spark on remote cluster

我设置了一个django项目,并在一个由两台linux机器和hdfs组成的集群上设置了spark。

我假设已经实现了向该hdfs上传/下载/流式传输数据。

我将每个模型编写为django项目中的一个视图,视图的实现具有使用pyspark编写的代码。我使用pyspark创建了一个到linux集群上的spark设置的连接。

代码语言:javascript
复制
import pandas as pd
import numpy as np
import os

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, IndexToString
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def sample_model_code(trainData, trainDataFileType, trainDataDelimiter,
                      testData, testDataFileType, testDataDelimiter,
                      targetIndexTrainData, targetIndexTestData,
                      modelName):

    # trainData = "D:/training-data.csv"
    # trainDataFileType = "csv"
    # trainDataDelimiter = ","
    # testData = "D:/test-data.csv"
    # testDataFileType = "csv"
    # testDataDelimiter = ","
    # targetIndexTrainData = 44
    # targetIndexTestData = 44
    # modelName = "test_model"

    conf = SparkConf().setMaster("local").setAppName("creditDecisonApp2")
    sc = SparkContext(conf = conf)  
    spark = SparkSession(sc)

    # spark.conf.set("spark.sql.shuffle.partitions", "2")

    training_data_set = spark.read.option("inferSchema", "true").option("header", "true").csv(trainData)
    test_data_set = spark.read.option("inferSchema", "true").option("header", "true").csv(testData)

    train_data_column_names = training_data_set.columns
    train_data_target_variable = train_data_column_names[targetIndexTrainData] 

    test_data_column_names = test_data_set.columns
    test_data_target_variable = test_data_column_names[targetIndexTestData]

    train_data_numeric_cols = []
    train_data_categorical_cols = []
    test_data_numeric_cols = []
    test_data_categorical_cols = []

    for element in training_data_set.dtypes:

        if 'int' in element[1] or 'double' in element[1]:
            train_data_numeric_cols.append(element[0])       
        else:
            train_data_categorical_cols.append(element[0])

    for element in test_data_set.dtypes:

        if 'int' in element[1] or 'double' in element[1]:
            test_data_numeric_cols.append(element[0])       
        else:
            test_data_categorical_cols.append(element[0])

    stages_train = []
    stages_test = []

    for categoricalColumn in train_data_categorical_cols:
        if categoricalColumn != train_data_target_variable:
            stringIndexer = StringIndexer(inputCol = categoricalColumn, outputCol = categoricalColumn + 'Index')
            stages_train += [stringIndexer]

    label_stringIdx_train = StringIndexer(inputCol = train_data_target_variable, outputCol = 'label')
    stages_train += [label_stringIdx_train]

    assemblerInputsTrain = [c + "Index" for c in train_data_categorical_cols] + train_data_numeric_cols
    assemblerTrain = VectorAssembler(inputCols=assemblerInputsTrain, outputCol="features")
    stages_train += [assemblerTrain]

    for categoricalColumn in test_data_categorical_cols:
        if categoricalColumn != test_data_target_variable:
            stringIndexer = StringIndexer(inputCol = categoricalColumn, outputCol = categoricalColumn + 'Index')
            stages_test += [stringIndexer]

    label_stringIdx_test = StringIndexer(inputCol = test_data_target_variable, outputCol = 'label')
    stages_test += [label_stringIdx_test]

    assemblerInputsTest = [c + "Index" for c in test_data_categorical_cols] + test_data_numeric_cols
    assemblerTest = VectorAssembler(inputCols=assemblerInputsTest, outputCol="features")
    stages_test += [assemblerTest]

    pipeline_train = Pipeline(stages=stages_train)
    pipeline_test = Pipeline(stages=stages_test)

    pipeline_train_model = pipeline_train.fit(training_data_set)
    pipeline_test_model = pipeline_test.fit(test_data_set)

    train_df = pipeline_train_model.transform(training_data_set)
    test_df = pipeline_test_model.transform(test_data_set)

    dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 5)
    dtModel = dt.fit(train_df)
    predictions = dtModel.transform(test_df)

    #TODO: save the clf as a pickle object

    labelIndexer = StringIndexer().setInputCol(train_data_target_variable).setOutputCol("label").fit(training_data_set)
    category_preds_col_name = "Predicted_" + test_data_target_variable
    categoryConverter = IndexToString().setInputCol("prediction").setOutputCol(category_preds_col_name).setLabels(labelIndexer.labels)
    converted = categoryConverter.transform(predictions)

    result_df = converted.select(test_data_column_names + [category_preds_col_name])

    location_temp = workingDirectory
    result_file_name = location_temp +"/"+"credit_decision_predicted_data.csv"

    result_df.coalesce(1).write.format('com.databricks.spark.csv').save(result_file_name,header = 'true')


    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")

    eval_metrics_dict = dict()
    eval_metrics_dict["accuracy"] = evaluator.evaluate(converted)


    result_dict = dict()
    result_dict["resultFilePath"] = os.path.normpath(result_file_name).replace(os.sep, "/")
    result_dict["evaluationMetricsDetails"] = eval_metrics_dict

    return (result_dict)   

Django应用程序在本地windows计算机上运行,Spark安装在本地windows计算机上,以上代码在使用django运行时工作

我的问题是,如果在远程linux集群上设置Spark,在本地windows计算机上设置django,并传递hdfs文件路径而不是数据的本地文件系统文件路径,是否可以

或者,是否有任何方法可以为这种类型的体系结构设置应用程序

EN

回答 1

Stack Overflow用户

发布于 2019-08-28 02:42:59

我可以通过在windows上安装pySpark和spark来做到这一点。然后通过连接到集群来执行spark代码。

我在两台Linux机器上建立了一个spark集群,然后在windows机器上安装了spark和pySpark。

然后,当spark session在windows上运行时,我将master设置为(spark cluster master),从那时起它就开始在集群上运行。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56749463

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档