Uber机器学习平台Michelangelo是如何使用Spark模型的?

Michelangelo是Uber的机器学习(ML)平台,可以训练并服务于整个公司范围内生产环境中的数千种模型。该平台被设计成了一个端到端的工作流,目前支持经典的机器学习、时间序列预测和深度学习模型,可以涵盖大量的用例,从生成市场预测、响应客户支持工单到准确计算预计到达时间(EAT)以及使用自然语言处理(NLP)模型在驾驶员App中提供一键式聊天功能。

本文最初发布于Uber工程博客,由InfoQ中文站翻译并分享。

Michelangelo是Uber的机器学习(ML)平台,可以训练并服务于整个公司范围内生产环境中的数千种模型。该平台被设计成了一个端到端的工作流,目前支持经典的机器学习、时间序列预测和深度学习模型,可以涵盖大量的用例,从生成市场预测响应客户支持工单准确计算预计到达时间(EAT)以及使用自然语言处理(NLP)模型在驾驶员App中提供一键式聊天功能

大多数Michelangelo模型都是基于Apache Spark MLlib的,这是一个可伸缩的Apache Spark机器学习库。为了处理高QPS的在线服务,Michelangelo最初仅通过内部定制的模型序列化和表示支持Spark MLlib模型的一个子集,这使得客户无法灵活地试验任意复杂的模型管道,限制了Michelangelo的扩展速度。为了解决这些问题,我们改进了Michelangelo对Spark MLlib的使用,特别是在模型表示、持久性和在线服务方面。

在Michelangelo中改进Spark MLlib用法的动机

我们最初开发Michelangelo是为了为生产环境提供可扩展的机器学习模型。它对基于Spark的数据摄取调度、模型训练和评估以及批量和在线模型服务部署提供端到端的支持,在Uber获得了广泛的认可。

图1 为了将Spark用于数据预处理和低延迟服务,并使用GPU进行分布式深度学习训练,Michelangelo针对深度学习用例使用了一致的Spark管道架构。

最近,Michelangelo已经发展到可以处理更多的用例,包括提供在核心Michelangelo之外训练过的模型。例如,深度学习模型的扩展和端到端加速训练需要在不同环境中执行操作步骤,以便利用Spark的分布式计算转换、Spark Pipelines在CPU上的低延迟服务以及使用Horovod(Uber的分布深度学习框架)在GPU集群上的分布式深度学习训练。为了使这些需求更容易实现,并保证训练和服务的一致性,有一个一致的架构和模型表示很重要,它可以利用我们现有的低延迟的基于JVM的模型服务基础设施,同时提供正确的抽象来封装这些需求。

图2 具有活动Spark会话的外部环境可以无缝地反序列化来自Michelangelo的经过训练的管道模型,并序列化供Michelangelo生态系统的其余部分使用的模型。Apache Spark是Apache软件基金会在美国和/或其他国家的注册商标。使用这个标识并不意味着Apache软件基金会的认可。

另一个动机是使数据科学家能够使用PySpark在熟悉的Jupyter笔记本(数据科学工作台)中构建和试验任意复杂的模型,同时仍然能够利用Michelangelo生态系统可靠地进行分布式训练、部署和服务。这也为集成学习多任务学习技术所需的更复杂的模型结构提供了可能性,同时让用户可以动态地实现数据操作和自定义评估。

因此,我们回顾了Michelangelo对Spark MLlib和Spark ML管道的使用,概括了其模型持久性和在线服务机制,目的是在不影响可伸缩性的情况下实现可扩展性和互操作性。

Michelangelo最初推出的是一个统一的架构,它负责管理紧耦合的工作流和用于训练和服务的Spark作业。Michelangelo对每种支持的模型类型都有特定的管道模型定义,并使用内部定制的protobuf表示经过训练的服务模型。离线服务通过Spark处理;在线服务使用添加到Spark内部版本的自定义API来处理,以实现高效的单行预测。

图3 向本地Spark序列化和反序列化转变在模型持久化管道阶段(Transformer/Estimator)层面上实现了灵活性和跨环境兼容性。Apache Spark是Apache软件基金会在美国和/或其他国家的注册商标。使用这个标识并不意味着Apache软件基金会的认可。

最初的架构支持通用机器学习模型(如广义线性模型GLM和梯度增强决策树模型GBDT)的大规模训练和服务,但是自定义的protobuf表示使添加对新Spark转换器的支持变得困难,并排除了在Michelangelo之外训练的模型。当新版本的Spark可用时,Spark的自定义内部版本也使得每次的升级迭代变得复杂。为了提高对新转换器的支持速度,并允许客户将自己的模型引入Michelangelo,我们考虑了如何改进模型表示并更加顺畅地添加在线服务接口。

Michelangelo的架构和模型表示法的演变

图4 Michelangelo的架构必须处理由不同功能需求带来的复杂性,并保持训练和服务环境之间的一致性。

Uber的机器学习工作流通常很复杂,涉及不同的团队、库、数据格式和语言;为了使模型表示和在线服务接口可以恰当地演化,我们需要考虑所有这些维度。

图5 部署并提供机器学习管道模型服务包括模型前面的所有转换和操作步骤。

要部署用于服务的机器学习模型,需要部署整个管道,包括模型前面的转换工作流。通常还需要打包数据转换、特征提取、预处理甚至预测后转换。原始预测通常需要解释或转换回标签,在某些情况下需要转换到不同的维度空间,比如日志空间,以便下游服务使用。它也可以用于通过额外的数据加强原始预测,如它们的置信区间,通过概率校准来校准概率。我们想要一个能够反映Spark MLlib模型固有管道步骤的模型表示,并允许Michelangelo与外部工具进行无缝交互。

选择一种最新的模型表示

在评估可供选择的模型表示时,我们评估了不同的需求,包括:

  • 表示广义转换序列的能力(必需)
  • 处理在线用例的轻量级服务的可扩展性(必需)
  • 支持使用非Michelangelo原生Spark工具替换存储在Michelangelo中的模型(必需)
  • 训练时和服务时的模型解释偏差风险低(非常想要)
  • Spark更新速度要快,并且易于编写新的估计器/转换器(非常想要)

我们考虑的一种方法是使用MLeap,这是一个独立的库,它通过一个专用的运行时来执行管道,提供了管道和模型序列化(到Bundle.ML)和反序列化支持。MLeap具有所需的表达能力和对轻量级在线服务的支持。但是,它有自己专有的持久化格式,这限制了与序列化和反序列化普通Spark MLlib模型的工具集的互操作性。

MLeap还引入了服务时行为偏离训练时评估的风险,因为在技术上,正在提供服务的模型加载时的格式与训练时内存中的格式不同。MLeap还降低了Spark更新的速度,因为除了Spark MLlib本地使用的方法之外,还必须为每个转换器和估计器添加单独的MLeap保存/加载方法。Databricks的ML模型导出dbml-local提供了类似的方法。

我们考虑的另一种方法就是将训练模型导出到一个标准的格式,如预测模型标记语言(PMML)或可移植分析格式(PFA),它们都具备我们需要的表达能力并且可以和Spark交互,Spark直接提供了PMML支持,而aardpfark可以将Spark模型导出为PFA。然而,这些表示还是存在服务时行为偏离方面的风险,我们认为这一风险高于MLeap,因为一般标准在特定的实现中通常会有不同的解释。这些标准在Spark更新速度方面也带来了更大的阻碍,因为根据Spark变化的性质,标准本身可能需要更新。

我们发现,最直接的方法是使用标准的Spark ML管道序列化来表示模型。Spark ML管道展示了我们想要的表达能力,允许与Michelangelo之外的Spark工具集交互,展现出了低风险的模型解释偏差,对Spark的更新速度影响较小。它还有助于编写自定义的转换器和估计器。

我们看到,使用Spark管道序列化的主要挑战是它与在线服务需求的不兼容性(Nick Pentreath在2018年的Spark AI峰会上也讨论了这一点)。这种方法会启动一个本地Spark会话,并用它来加载一个训练好的Spark MLlib模型,这相当于在单台主机上运行一个小型集群,内存开销和延迟都很大,使它不适合许多在线服务场景要求的p99毫秒级延迟。虽然现有的用于提供服务的Spark API集在性能上还不足以满足Uber的用例,但我们发现,我们可以在这种开箱即用的体验中进行许多直接的更改,以满足我们的需求。

为了提供用于在线服务的轻量级接口,我们将anOnlineTransformer添加到可以提供在线服务的转换器中,包括利用低级Spark预测方法的单个方法和小型方法列表。我们还调整了模型加载的性能,以满足我们的目标开销要求。

使用增强型转换器和估计器的管道

为了实现一个可以由Michelangelo在线训练并提供服务的Transformer或Estimator,我们构建了一个OnlineTransformer接口,它扩展了开箱即用的Spark Transformer接口,并执行两个方法:1)Transform(instance: Dataset[Any]) ;2)ScoreInstance(instance: Map[String, Any])。

Transform(instance: Dataset[Object])作为分布式批处理服务的入口,提供了开箱即用的基于数据集的执行模型。ScoreInstance(instance: Map[String, Object]): Map[String, Object]作为较为轻量级的API,用于低延迟、实时服务场景中出现的针对单一特征值映射集的单行预测请求。ScoreInstance背后的动机是提供一个更轻量级的API,它可以绕过依赖于Spark SQL Engine的Catalyst Optimizer的Dataset所带来的巨大开销,从而对每个请求进行查询规划和优化。如上所述,这对于实时服务场景(如市场营销和欺诈检测)非常重要,其中p99延迟的SLA通常是毫秒级的。

当加载Spark PipelineModel时,任何具有相似类(包含OnlineTransformer特性)的Transformer都会映射到该类。这使得现有的训练好的Spark模型(由支持的转换器组成)获得了提供在线服务的能力,而且没有任何额外的工作。注意,OnlineTransformer还实现了Spark的MLWritable和MLReadable接口,这为Spark免费提供了对序列化和反序列化的本地支持。

保持在线和离线服务一致性

转向标准的PipelineModel驱动的架构,通过消除PipelineModel之外的任何自定义预评分和后评分实现,进一步加强了在线和离线服务准确性之间的一致性。在每个管道阶段,实现自定义评分方法时的标准实践是首先实现一个公共评分函数。在离线转换中,它可以作为DataFrame输入上的一组Spark用户定义函数(UDF)运行,相同的评分函数也可以应用于在线scoreInstance和scoreInstances方法。在线和离线评分一致性将通过单元测试和端到端集成测试进一步保证。

性能优化

我们最初的度量结果显示,与我们自定义的protobuf表示的加载延迟相比,原生Spark管道加载延迟非常高,如下表所示:

这种序列化模型加载时间上的性能差异对于在线服务场景是不可接受的。模型实际上被分片到每个在线预测服务实例中,并在每个服务实例启动时、新模型部署期间或接收到针对特定模型的预测请求时加载。在我们的多租户模型服务设置中,过长的加载时间会影响服务器资源的敏捷性和健康状况监控。我们分析了加载延迟的来源,并进行了一些调优更改。

影响所有转换器加载时间的一个开销来源是Spark本地使用sc.textFile读取转换器元数据;从一个小的单行文件生成一个字符串RDD非常慢。对于本地文件,用Java I/O替换这段代码要快得多:

[loadMetadata in src/main/scala/org/apache/spark/ml/util/ReadWrite.scala]

在我们的用例中(例如,LogisticRegression、StringIndexer和LinearRegression),影响许多转换器的另一个开销来源是对与转换器相关的少量数据使用Spark分布式read/select命令。对于这些情况,我们使用ParquetUtil.read代替sparkSession.read.parquet;直接进行Parquet read/getRecord大大降低了转换器的加载时间。

树集成(Tree ensemble)转换器有一些特殊的调优机会。加载树集成模型需要将序列化到磁盘的模型元数据文件读取到磁盘,这会触发小文件的groupByKey、sortByKey以及Spark的分布式read/select/sort/collect操作,这些操作非常慢。我们直接用更快的Parquet read/getRecord代替了它们。在树集成模型保存方面,我们合并了树集成节点和元数据权重DataFrame,以避免写入大量读起来很慢的小文件。

通过这些调优工作,我们能够将基准示例的本地Spark模型加载时间从8到44倍减少到仅比从自定义protobuf加载慢2到3倍,这相当于比Spark原生模型快4到15倍。考虑到使用标准表示的好处,这种水平的开销是可以接受的。

需要注意的是,Michelangelo在线服务创建了一个本地的SparkContext来处理任何未加密的转换器的负载,因此,在线服务不需要SparkContext。我们发现,当没有模型加载处于活动状态时,让SparkContext继续运行会对性能产生负面影响并导致延迟,例如,通过SparkContext清理器的操作。为了避免这种影响,我们在没有运行负载时停止SparkContext。

可服务管道的灵活结构

使用管道模型作为Michelangelo的模型表示,用户可以灵活地组合和扩展可服务组件单元,使它们在线和离线服务时保持一致。然而,这并没有完全封装管道模型在机器学习工作流的各个阶段使用时的操作需求差异。有些操作步骤或概念本质上与机器学习工作流的特定阶段相关,而与其他阶段完全无关。例如,当用户对模型进行评估和迭代时,常常需要进行超参数优化、交叉验证以及生成模型解释及评估所需的特定元数据等操作。这些步骤允许用户帮助生成模型、与模型交互以及评估管道模型,但是一旦准备好进行产品化,就不应该将这些步骤合并到模型服务中。

同时,在机器学习工作流不同阶段的需求差异,促使我们开发了一个基于通用编排引擎的工作流和操作员框架。除了组合自定义可服务管道模型的灵活性之外,这还允许用户以有向图或工作流的形式组合和编排自定义操作的执行,以实现最终的可服务管道模型,如图6所示:

图6 Michelangelo基于Operator Framework的工作流提供了另一种程度的灵活性,通过优化的执行计划来方便地定制操作,从而生成可服务的、序列化的Michelangelo管道模型以及有用的工件。Apache Spark是Apache软件基金会在美国和/或其他国家的注册商标。使用这个标记并不意味着Apache软件基金会的认可。Docker和Docker标识是Docker公司在美国和/或其他国家的商标或注册商标。Docker公司及其他各方也可以在本协议中使用的其他条款中享有商标权。本标记的使用并不意味着Docker的认可。TensorFlow、TensorFlow标识及任何相关标识均为谷歌公司的商标。

未来展望

在这一点上,Spark原生模型表示已经在Michelangelo生产环境中运行了一年多,作为一种健壮且可扩展的方法,在我们公司范围内提供ML模型。

得益于这种演变和Michelangelo平台的其他更新,Uber的技术栈可以支持新的用例(如灵活的试验和在Uber的数据科学工作台中训练模型)、Jupyter笔记本环境以及使用TFTransformers的端到端深度学习。为了介绍我们的经验并帮助其他人扩展他们自己的ML建模解决方案,我们在2019年4月的Spark AI峰会上讨论了这些更新,并提交了SPIP和JIRA,把我们对Spark MLlib的更改开源。

原文链接:

Evolving Michelangelo Model Representation for Flexibility at Scale

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/hvbF36LKDw21pJ1F6ayk

扫码关注云+社区

领取腾讯云代金券