前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >在统一的分析平台上构建复杂的数据管道

在统一的分析平台上构建复杂的数据管道

作者头像
仁梓
发布2018-02-02 17:01:03
3.7K0
发布2018-02-02 17:01:03

介绍

Quora上,大数据从业者经常会提出以下重复的问题:什么是数据工程(Data Engineering)? 如何成为一名数据科学家(Data Scientist)? 什么是数据分析师(Data Analyst)?

除了理解上述三种职业及其职能之外,更重要的问题是:如何去促进这三种不同的职业、职能和其诉求之间的协作?或者怎样去帮助他们采用统一的平台来代替一次性定制解决方案?

现在他们确实可以使用统一的平台进行协作了。上个月,我们发布了统一数据块平台。针对促进数据工程师,数据科学家和数据分析师之间的协作,其软件工件 Databricks WorkspaceNotebook Workflows 实现了这令人梦寐以求的协作。

在这篇博文中,我们将探讨每种角色以下三种赋能

  • 使用 Notebook Workflows来协作和构建复杂的 Apache Spark 的数据管道
  • 将独立和幂等的笔记本作为 单一执行单元 进行编排
  • 无需定制一次性或独特的解决方案。

亚马逊公共产品评级

首先,我们来看看数据场景。我们的数据场景视为亚马逊公共产品评级的语料库,其中每个角色都希望以可被理解的形式执行各自的任务。

这个数据集是产品评论的不同数据文件的集合,对于任何数据科学家或数据分析师都很重要。例如,数据分析师的目的可能是探索数据以检查其存在哪种评级,产品类别或品牌。相比之下,数据科学家的目的可能想要训练一个机器学习模型,有利于定期对用户评论中某些关键词(如“好”、“回归”或“糟糕”)进行评级。

但是,如果没有事先将数据转化为可供每个角色使用的格式,那么既不能方便数据分析员对其进行探索,也不便于数据科学家进行模型训练。这就是数据工程师引入公式的原因:她负责通过创建数据管道将原始数据转换为可用数据。(我们所说的ExamplesIngestingData笔记本工具是数据工程师将摄取到的公共数据集嵌入 Databricks平台的过程。)

接下来,我们将检查我们的第一个数据流水线,第一个笔记本工具TrainModel,其可以提供浏览与每个角色相关的任务的功能。

Apache Spark作业的数据流水线

探索数据

为了简单起见,我们不会涉及将原始数据转换为以供 JSON 文件摄取的 Python 代码 - 代码位于此链接。相反,我们将专注于我们的数据管道笔记本工具,TrainModel,帮助数据科学家和数据分析师进行协作。

我们的数据工程师一旦将产品评审的语料摄入到 Parquet (注:Parquet是面向分析型业务的列式存储格式)文件中, 通过 Parquet 创建一个可视化的 Amazon 外部表, 从该外部表中创建一个临时视图来浏览表的部分, 数据分析员和数据科学家都可以在这个 TrainModel 的笔记本工具中合作工作。

数据分析师可以利用 SQL 查询,而不是用数据工程师或数据科学家比较熟悉的 Python 代码进行查询。这里的要点是,笔记本的语言类型(无论是 ScalaPythonR还是 SQL)的优势是次要的,而以熟悉的语言(即 SQL)表达查询并与其他人合作的能力是最重要的。

现在,每个角色都有可理解的数据,作为临时表 tmp_table 业务问题和数据可视化; 她可以查询此表,例如,以下问题:

数据是什么样的?

有多少个不同的品牌?

如何保证公平地进行品牌评分?

她的初步分析令人很满意,她可能会帮助一位数据科学家,进而设计一个机器学习模型,使他们能够定期预测用户评论的评分。随着用户在亚马逊网站上每天甚至每周购买和评价产品,机器学习模型可以在生产中定期进行训练新的数据。

培训机器学习模型

Apache Spark机器学习库MLlib包含许多用于分类,回归,聚类和协作过滤的算法。在高层次上,spark.ml 包为特征化,流水线,数学实用程序和持久性提供了工具,技术和 API

当涉及基于特定关键字的好(1)或差(0)结果的二元预测时,适合于该分类的最佳模型是Logistic回归模型,这是一种预测有利结果概率的特殊情况的广义线性模型

在我们的案例中,我们希望用一些有利的关键词来预测评论的评分结果。我们不仅要使用 MLlib 提供的逻辑回归模型族的二项逻辑回归,还要使用spark.ml管道及其变形和估计器

创建机器学习管道

Python代码片段如何用变换器和估计器创建管道。

代码语言:txt
复制
from pyspark.ml import *
from pyspark.ml.feature import *
from pyspark.ml.feature import Bucketizer
from pyspark.ml.classification import *
from pyspark.ml.tuning import *
from pyspark.ml.evaluation import *
from pyspark.ml.regression import *
#
# Bucketizer transforms a column of continuous features to a column of feature buckets, where the buckets are specified by users.
# It takes the common parameters inputCol and outputCol, as well as the splits for bucketization.
# Feature values greater than the threshold are bucketized to 1.0; values equal to or less than the threshold
# are binarized to 0.0. Both Vector and Double types are supported for inputCol.
# We will use rating as our input, and the output label will have value of 1 if > 4.5
#
# For this model we will use two feature transformer extractors: Bucketizer and Tokenizer
#
splits = [-float("inf"), 4.5, float("inf")]
tok = Tokenizer(inputCol = "review", outputCol = "words")
bucket = Bucketizer(splits=splits, inputCol = "rating", outputCol = "label")
#
# use HashingTF feature extractor, with its input as "words"
#
hashTF = HashingTF(inputCol = tok.getOutputCol(), numFeatures = 10000, outputCol = "features")
#
# create a model instance with some parameters
#
lr = LogisticRegression(maxIter = 10, regParam = 0.0001, elasticNetParam = 1.0)
#
# Create the stages pipeline with all the feature transformers to create an Estimator
#
pipeline = Pipeline(stages = [bucket, tok, hashTF, lr])

创建训练方式和测试数据

接下来,我们使用我们的训练数据来拟合模型,最后用我们的测试框架 perdictions 进行预测和建立标签。

代码语言:txt
复制
# Create our model estimator
#
model = pipeline.fit(trainingData)

#score the model with test data
predictions = model.transform(testData)
#convert dataframe into a table so we can easily query it using SQL
predictions.createOrReplaceTempView('tmp_predictions')

正如您可能注意到, 通过上述的 predictions 函数查询后放入 DataFrame 保存为一个临时表, 在我们的测试数据的评论中出现的单词 return 的结果在价值0的 PredictionLabel 和低评级的预期。

对于评估模型的结果感到满意,数据科学家可以将模型保存为与其他数据科学家共享,甚至进一步评估或与数据工程师共享,以便在生产中部署。

这伴随着实时模型。

实时模式

考虑一下数据科学家生成ML模型,并想要测试和迭代它,将其部署到生产中以进行实时预测服务或与另一位数据科学家共享以进行验证用例和场景。你怎么做到的?

坚持和序列化ML管道是导出 MLlib 模型的一种方法。另一种方法是使用Databricks dbml-local库,这是实时服务的低延迟需求下的首选方式。一个重要的警告: 对于服务模型的低延迟要求,我们建议并倡导使用 dbml-local。然而对于这个例子,因为延迟不是定期产品评论的问题或要求,所以我们使用 MLlib 管线 API 来导出和导入模型。

尽管 dbml-local 是我们首选的导出和导入模型的方式,但是由于很多原因,两种持久性机制都很重要。首先,它很容易和语言无关 - 模型导出为 JSON。其次,它可以从一个用 Python 编写的笔记本中导出,并导入(加载)到另一个用 Scala 写成的笔记本中,持久化和序列化一个 ML 管道,交换格式是独立于语言的。第三,序列化和坚持流水线封装了所有的功能,而不仅仅是模型。最后,如果您希望通过结构化流式传输来实时预测您的模型。

代码语言:txt
复制
model.write().overwrite().save("/mnt/jules/amazon-model")

在我们TrainModel笔记本工具,我们出口我们的模型,以便它可以由其他笔记本工具进口,ServeModel,在我们的笔记本工具链接的下游的工作流程(见下文)。

在下一节中,我们将讨论我们的第二个管道工具CreateStream

创建流

考虑一下这种情况:我们可以访问产品评论的实时流,并且使用我们训练有素的模型,我们希望对我们的模型进行评分。数据工程师可以通过两种方式提供这种实时数据:一种是通过 KafkaKinesis,当用户在 Amazon 网站上评价产品时; 另一个通过插入到表中的新条目(不属于训练集),将它们转换成 S3 上的 JSON 文件。事实上,这只是起作用,因为结构化流式 API以相同的方式读取数据,无论您的数据源是 BlobS3 中的文件,还是来自 KinesisKafka 的流。我们选择了S3分布式队列来实现低成本和低延迟。

在我们的例子中,数据工程师可以简单地从我们的表中提取最近的条目,在 Parquet 文件上建立。这个短的管道包含三个 Spark 作业:

  1. Amazon 表中查询新的产品数据
  2. 转换生成的 DataFrame
  3. 将我们的数据框存储为 S3 上的 JSON 文件

为了模拟流,我们可以将每个文件作为 JSON 数据的集合提供的流数据来评价我们的模型。数据科学家已经培训了一个模型并且数据工程师负责提供一种方法来获取实时数据流,这种情况并不罕见,这种情况持续存在于某个可以轻松读取和评估训练模型的地方。

要了解这是如何实现的,请阅读CreateStream笔记本工具; 它的输出将 JSON 文件作为亚马逊评论的流向ServeModel笔记本工具提供服务,以对我们的持久模型进行评分,这形成了我们的最终管道。

创建服务,导入数据和评分模型

考虑最后的情况:我们现在可以访问新产品评论的实时流(或接近实时流),并且可以访问我们的训练有素的模型,这个模型在我们的 S3 存储桶中保存。数据科学家可以使用这些资产。

让我们看看如何。在我们的例子中,数据科学家可以简单地创建四个 Spark 作业的短管道:

  1. 从数据存储加载模型
  2. 作为 DataFrame 输入流读取 JSON 文件
  3. 用输入流转换模型
  4. 查询预测
代码语言:txt
复制
···scala
// load the model from S3 path
import org.apache.spark.ml.PipelineModel
val model = PipelineModel.load(model_path)

import org.apache.spark.sql.types._
// define a the JSON schema for our stream of JSON files
val streamSchema = new StructType()
  .add(StructField("rating",DoubleType,true))
  .add(StructField("review",StringType,true))
  .add(StructField("time",LongType,true))
  .add(StructField("title",StringType,true))
  .add(StructField("user",StringType,true))

//read streams 
spark.conf.set("spark.sql.shuffle.partitions", "4")

val inputStream = spark
  .readStream
  .schema(streamSchema)
  .option("maxFilesPerTrigger", 1)
  .json(stream_path)

// transform with the new data in the stream
val scoredStream = model.transform(inputStream)

// and use the stream query for predictions

val queryStream = scoredStream.writeStream
  .format("memory")
  .queryName("streamPrediction")
  .start()
// query the transformed DataFrame with new predictions

由于所有的特征都被封装在持久化的模型中,所以我们只需要从磁盘加载这个序列化的模型,并使用它来服务和评分我们的新数据。此外,请注意,我们在笔记本TrainModel中创建了这个模型,它是用 Python 编写的,我们在一个 Scala 笔记本中加载。这表明,无论每个角色用于创建笔记本的语言如何,他们都可以共享 Apache Spark 中支持的语言的持久化模型。

Databricks Notebook工作流程编排

协作和协调的核心是Notebook Workflows的API。使用这些API,数据工程师可以将所有上述管道作为 单个执行单元 串在一起。

实现这一目标的一个途径是在笔记本电脑中分享输入和输出。也就是说,笔记本的输出和退出状态将作为流入下一个笔记本的输入。Notebook Widgets允许参数化笔记本输入,而笔记本的退出状态可以将参数传递给流中的下一个参数。

在我们的示例中,RunNotebooks使用参数化参数调用流中的每个笔记本。它将编排另外三个笔记本,每个笔记本都执行自己的数据管道,在其中创建自己的 Spark 作业,最后发出一个 JSON 文档作为退出状态。这个 JSON 文档然后作为管道中后续笔记本的输入参数。

代码语言:txt
复制
# do the usual import packages
import json
import sys
#
#Run the notebook and get the path to the table
#fetch the return value from the callee 001_TrainModel
returned_json = json.loads(dbutils.notebook.run("001_TrainModel", 3600, {}))
if returned_json['status'] == 'OK':
  model_path = returned_json['model_path']
  try:
    #Create a Stream from the table
    #Fetch the return value from the callee 002_CreateStream
    returned_json = json.loads(dbutils.notebook.run("002_CreateStream", 3600, {}))
    if returned_json ['status'] == 'OK':
      stream_path = returned_json['stream_path']
      map = {"model_path": model_path, "stream_path": stream_path }
          #fetch the return value from the callee 003_ServeModelToStructuredStream
      result = dbutils.notebook.run("003_ServeModelToStreaming", 7200, map)
      print result
    else:
      raise "Notebook to create stream failed!"
  except:
    print("Unexpected error:", sys.exc_info()[0])
    raise
else:
  print "Something went wrong " + returned_json['message']

最后,不仅可以运行这个特定的笔记本执行一个简单的任务,而且你也可以使用Job Scheduler来安排流程。

下一步是什么

为了真正感受统一分析平台中三个人物角色之间的端到端协作,请在Databricks平台上试用这五款笔记本工具。

  1. 由数据工程师创建的 RunNotebooks
  2. 由数据工程师,数据分析师和数据科学家创建的 TrainModel
  3. 由数据工程师创建的 CreateStream
  4. 由数据科学家和数据工程师创建的 ServeModel
  5. 为数据工程师提供的样品笔记本 ExamplesIngestingData

总之,我们证明了大数据从业者可以在 Databricks统一分析平台中一起工作,创建笔记本,探索数据,训练模型,导出模型,并根据新的实时数据评估他们的训练模型。当复杂的数据管道时,当由不同的人物角色构建的无数笔记本可以作为一个单一且连续的执行单元来执行时,它们一起变得高效。通过 Notebook Workflows API,我们展示了一个统一的体验,而不是定制的一次性解决方案。这些好处是有保证的。

阅读更多

要了解Github中的笔记本工作流和Widgets以及笔记本集成,请阅读以下内容:

免费试用 Databricks从今天开始

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 介绍
  • 亚马逊公共产品评级
  • Apache Spark作业的数据流水线
    • 探索数据
      • 数据是什么样的?
        • 有多少个不同的品牌?
          • 如何保证公平地进行品牌评分?
          • 培训机器学习模型
          • 创建机器学习管道
            • 创建训练方式和测试数据
            • 实时模式
            • 创建流
            • 创建服务,导入数据和评分模型
            • Databricks Notebook工作流程编排
            • 下一步是什么
            • 阅读更多
            相关产品与服务
            大数据
            全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
            领券
            问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档