AWS EMR在搭建大数据平台ETL中的应用实践

AWS Elastic MapReduce(EMR)是 Amazon 提供的托管集群平台,用户可以非常方便的使用 EMR 搭建起一套集群,用来支撑大数据框架的应用,如 Apache Spark、Hive、Flink、Presto 等等。因为 EMR 具有很好的可配置性和伸缩性,使用者可以灵活的根据自己的需求进行定制,在满足生产需求的同时,减低对基础设施的运维成本。

FreeWheel 大数据团队在搭建数据仓库的过程中,在 EMR 的使用上积累了大量的实践和运维经验,本文将从 EMR 实践的角度出发,讲述 FreeWheel Transformer 团队在搭建 ETL pipeline 的过程中是如何玩转 EMR 的,以期抛砖引玉。

一个典型的 Spark on EMR 上集群架构概览

我们先来了解一下一个典型的 AWS EMR 集群是什么样子的。EMR 默认使用了 Yarn 来管理集群资源。EMR 将 node 分为了 Master, Core 和 Worker 三个组(通过 Yarn label 实现)。

主节点(Master node)

EMR 的 Master node 用来管理整个集群,集群至少要有一个 Master node(从 EMR-5.23 开始,如果需要 Master node HA,可以选择实例计数为 3,此时根据不同的应用,会有不同的 HA 方案)。

以运行 Hadoop HDFS 和 Yarn 为例,Master 上会运行 Yarn ResourceManger 和 HDFS NameNode。在高可用方案下,Yarn ResourceManager 会在所有三个主节点上运行,并采用 Active/Standby 模式进行工作。如果 ResourceManager 的主节点发生故障,EMR 将启动自动故障转移过程,具有 Standby ResourceManager 的 Master node 将接管所有操作,可以通过 yarn rmadmin -getAllServiceState 来获取当前服务状态。关于 Yarn ResourceManger HA 的具体细节,可以参考ResourceManager HA

与 ResourceManager 类似,NodeManager 会在三个主节点中的两个节点上运行,分别处于 Active 和 Standby 状态。如果 Active NameNode 的主节点发生故障,EMR 会将启动 HDFS 故障转移过程。此时 Standby 状态的 NameNode 将变为 Active 并接管集群中的所有对 HDFS 的操作。我们可以通过hdfs haadmin -getAllServiceState命令来获取当前 NameNode 状态。关于 HDFS HA 的具体细节,可以参考 HDFS HA

核心节点(Core node)

Core node 为可选组,其上运行了 HDFS DataNode。当 Core node 个数少于 4 时,EMR 默认会将 HDFS 的 replica 设置为 1,Core node 少于 10 个时 replica 为 2,其余情况为 3。 如果需要进行自定义设置,可以修改启动 EMR 对 hdfs-site.xml 中 dfs.replication 的配置;这里要注意一点,如果我们在一个已经启动的 EMR 中,在对 Core node 进行伸缩的时候,会影响到 HDFS 数据的 re-balance,这是一个耗时的操作,不建议频繁做 Core node 的 scaling。 而且对于 replica 是 3 的集群,如果将 core node 的个数缩减为少于 3 个的话,也将无法成功。

工作节点(Worker node)

Worker node 为普通的工作节点。非常适合作为单纯的工作节点应对工作负载需要频繁伸缩的场景,其上运行了 NodeManager,在其启动之后,会加入到 Yarn 的集群管理之中。对于需要频繁 scale 的场景,仅仅 scale Worker node 是一个比较实际的方案,效率比较高。

任务队列(InstanceFleet)

Core node 和 Worker node 都可以配置实例队列,实例队列是一个非常实用的功能,尤其是在 Spot 实例的场景,在一些热门事件发生时,一些热门机型的中断率会变得很高,如果选择单一类型实例的话,很容易出现无法满足需求,我们可以在spot advisor上查看当前的实例中断情况。后面章节我会详细描述我们是如何使用实例队列的。

EMR 版本

在创建 EMR 时,需要额外注意 EMR 的 release 版本,不同版本支持的应用版本也不同。 从 EMR-6.1.0 版本起,已经开始支持 Spark3.0.0 + Hadoop 3.2.1 的组合了,目前最新的 EMR-6.2.0 版本,已经可以支持稳定版本的 Spark3.0.1 了。如果需要运行 Spark2.x 版本,可以选择 EMR-5.x 版本或者 EMR-6.0.0。

除应用版本区别,从 EMR-6.0.0 起,系统基于 Amazon Linux 2 和 Corretto JDK 8 构建,相对于以往的 Amazon Linux 1 最大的区别是提供了新的系统工具,如 Systemctl,以及优化的 Amazon linux LTS 内核。另外 Amazon Corretto JDK 8 提供经过 Java SE 认证的兼容 JDK,包括长期支持、性能增强和安全修复程序。 关于 emr-6.x 的 release note,可以参考EMR release note

另外,AWS 最新已经支持 EMR on EKS, 我们可以更灵活的将 EMR 创建在 EKS 中,以期更灵活使用和更低的费用,目前这一块我们团队正在调研和 adoption,我会在和后续的文章中专门来聊这一块。

EMR 在 FreeWheel 批处理 ETL pipeline 中的实践

在批处理 ETL pipeline 中有两个重要的组件分别叫 Optimus 和 JetFire,就是大家耳熟能详的擎天柱和天火,是由我司 Transformer team 建立的一套基于 Spark 的数据建模和处理框架(所以 team 的产品都以 Transformer 中的角色来命名)。Optimus 主要针对的是数据仓库层的建设,主要功能是将广告系统产生的 log 经前端模块收集起来后,根据商业逻辑的需求进行抽取转换,统一建模,并做了大量业务的 enrichment,将原始数据转换成方便下游应用端进行分析使用的 Context Data,最终由 Spark SQL 生成宽表落盘。JetFire 更偏向于一个灵活通用的基于 Spark 的 ETL Framework,能让用户更简单方便的将基于宽表之上的数据加工需求进行快速实现。这些 pipelines 都是由 Airflow 进行任务的编排和调度。

任务特点

基于 Optimus 的批处理 ETL pipeline

  • ETL pipeline 每小时一个 batch,由于客户分布在全球各个时区,所以数据的发布需求是按照客户所在时区的零点之后发布当前时区客户前 24 小时的数据;另外有些产品也需要支持每个小时的数据都不能延迟,每个小时的数据处理需要控制在 30min 以内;
  • 数据量不稳定;客户在不同时区分布的密度,各个小时的流量也不同,以及区域性事件的发生,以及内部上游模块可能会发生 delay 等都会造成每小时数据量不均匀。虽然每天 24 个小时的数据量分布虽然大致趋势相同,但是不能准确预测,只有将要开始处理这个 batch 的时候,才能从上游拿到这个 batch 的数据量信息;
  • 数据在 ETL 的中间过程中在 HDFS 上没有持久化需求;对于 HDFS 的需求是支撑 Spark 任务以及 EMR 集群的访问,保证一次批任务内部的事务性即可。需要持久化的数据会由后续的模块 load 到 clickhouse,以及同步发布到 S3 上交由 hive 来管理;
  • Spark 任务对于集群资源的需求:Optimus 中由于存在大量的计算(如数据序列化反序列化,metric 的计算,数据的排序聚合,Hyperloglog 计算等)和缓存(分层建模中,在 DAG 中被反复用到的数据),Spark 任务在不同阶段对集群资源的需求点是不同的:从数据 load 进内存到对数据进行 transform 进行建模的过程,是计算密集型的,需要消耗大量的 CPU,同时由于有些 dataframe 需要被更上层的模型复用,需要 cache 起来,这里需要大量的 memory; 而最终在支撑大量并发 SparkSQL 的数据抽取和聚合的运算中,网络和 CPU 都是很大性能瓶颈。我们针对 Spark 应用做了非常精细的调优工作,具体可以参考文章《Apache Spark 3.0 新特性在 FreeWheel 核心业务数据团队的应用与实战》。在处理一个 batch 的过程中,集群资源使用情况可以对应比较下面三个图。

基于 JetFire 的 DataFeed pipeline

  • Datafeed 中的任务具有不同的 schedule,不同的 input 数据量,提交到 EMR 集群的任务负载无法提前预知。这种场景下,EMR 更像是一个共享的计算平台,我们很难从单独一个应用的角度对整体资源进行规划。
  • 考虑到基于 Optimus 的 ETL pipeline 是众多下游应用的共同依赖,需要保证很高的稳定性,我们期望能够在独占一个 EMR 集群的前提下尽量降低开销。而 Datafeed pipeline 任务零碎且多,绝大部分任务轻量需要较快完成。

我们来各个击破上述需求

EMR 集群配置

基于前文对 EMR 集群的介绍以及实际应用需求,总的来说,我们使用了在 Long term 的 EMR 集群,运行单点 Ondemand 机型的 Master node + 单点 Ondemand 机型的 Core node + 动态伸缩的由 Spot&Ondemand 机型的 InstanceFleet 组成的 Worker。

Long term EMR VS Ondemand 创建 EMR

我们不选择在每次需要 EMR 集群的时候去重新创建一个集群的原因是,除了机器实例 provision 的时间外,EMR 还需要额外运行较长时间的 bootstraping 脚本去启动很多服务才能让整个集群 ready。而 Master node 和 Core node 我们在上文介绍过,相较于 Worker node,他们不仅需要支撑 Hadoop 服务,还需要下载并配置 Spark, Ganglia 等环境,以及更多服务(根据用户勾选的 application 决定),这样频繁创建和销毁带来的时间开销相对于 hourly schedule 的任务而言,是不能忽略的。所以我们选择创建维护一个 long term EMR 集群,通过 scale worker node 来控制负载的方案。而对于使用间隔较长,如 daily 甚至 weekly job 来讲,在每次需要使用 EMR 的时候进行临时创建是一个更合理的方案。

选择 Long term 之后我们就需要关注集群的稳定性,然而我们仍然选择了没有 HA 的方案,由于 Master 和 Core 均为 Ondemand 机型,这样就可以保证集群在非极端特殊情况下不会出现 crash 的情况。而对于极端特殊情况,考虑到两个 pipeline 对数据持久化并无需求,如果能在分钟级别的时间内重建 EMR 集群并恢复任务,相对于长期存在的 HA 方案的 master + core 方案来说,在都能满足生产需求的情况下 ROI 更高。所以综合考虑,我们选择了单点 master 和单点 core node 的方案,配合上 Terraform 脚本和 Airflow 的调度,我们可以在发生小概率集群事故得到情况下快速重建 EMR 集群并重跑任务,从而恢复 pipeline。

额外值得一提的是,在 EMR 使用初期,Terraform 上并不支持创建具有 InstanceFleet 的 EMR,当时仅能够使用命令行或者使用 boto3 的库创建一个 EMR 集群,由于没有 state 文件进行 track,所以也无法通过 Terraform 进行统一管理(如销毁和修改等),针对 EMR 这部分只能自己实现一套 stateful 的管理脚本。目前最新版本得到 Terraform 已经加入了对带有 InstanceFleet 的 EMR 的支持。但是实际使用当中,由于 EMR 中的很多配置只能在创建时一次性修改,如 Master&Core&Task 的机型配置,InstanceFleet 的机型选择,Hadoop 和 Spark 的一些 customize 的配置(这个依然可以通过启动之后修改对应的 xml 或者 conf 文件,然后重启对应服务进行修改,但是这种做法对于产品环境并不友好),Security group 等,如果需要修改这些,仍然需要销毁集群,用新的配置重新创建。

关于 Spot 机型和 AZ 的选择

考虑到成本,我们会优先使用 Spot 机型作为 Worker,使用 Ondemand 机型做补充。Spot 机型在资源紧俏的情况下有可能申请不到,即使申请到也有可能会出现中断被收回的情况,我们会在创建 EMR 时将 AZ 设置为多个,以增加成功申请到机器的概率。不过多 AZ 的配置仅在创建 EMR 时生效,EMR 会选择 创建时 资源较为宽裕的 AZ 申请 instances,而在 EMR 创建好之后,就只能在一个固定的 AZ 中使用了,后续即使这个 AZ 机器资源紧俏而其他可选的 AZ 资源充裕,由于集群 Master 和 Core 已经在当前 AZ 存在,不能再到其他 AZ 申请机器。考虑到 Spark 集群通信开销,跨 AZ 带来对性能的影响不能忽略,这种设计也是很合理的。因此,多 AZ 的设置对于每次需要重新创建 EMR 的场景是一个非常有用的功能,而对于 long term 集群的来说就有限了。在创建 long term 的集群之后,为了降低 Spot 机型带来的影响,就需要使用 InstanceFleet 配合 Spot 和 ondemand 混合使用来降低机器了,这一部分我会在 Worker node 伸缩策略里面详细描述。

Master 和 Core node 机型选择

在 Master 和 Core node 的机型选择上,由于 Core node 需要运行 HDFS Datanode,我们期望在运行 Spark 任务时在存储这里有更高的吞吐和 IOPS,所以我们选择了存储优化型的 i 系列。另外由于 Spark 的 driver executor 默认会运行在 core node 上,对于一些需要 Spark 中需要使用到并发的代码块,driver 的 CPU core 的数量决定了并发的上限,所以这里需要按应用需求选择合适的 CPU 机型。这里需要提及的一点,从 EMR 6.x 版本开始,默认情况下禁用的 Yarn label 的功能,但是考虑到 core node 为 ondemand 机型,而 worker node 为 spot 机型并且会频繁伸缩,运行在 ondemand 的 core node 上会更加稳定, 所以我们仍然选择开启 Yarn label,并让 Driver executor 运行在 Core node 之上,可以通过

yarn.node-labels.enabled: trueyarn.node-labels.am.default-node-label-expression: 'CORE'

复制代码

来开启这项配置。

对于 Master node,除了 NameNode 的内存开销外,就是 Spark historyServer 和 ResourceManager 的内存开销,相对 worker node 来讲,资源并不是很紧张,所以适当选择就好。

Worker node 伸缩策略

为了满足对不同 hourly 数据量的处理能够在限定时间内完成,需要根据当前小时 input 的数据量进行集群 capacity 的调整,在对 Worker node 的伸缩过程中,我们考虑了以下方面。

InstanceFleet 的使用

出于减低成本的考虑,我们会优先选择 Spot 机型来代替 onDemand 机型,然而在遇到机型紧俏,有大型活动的时候,热门机型就会供不应求,甚至会出现频繁的被收回的情况,对于集群中有节点被回收的情况,Spark 任务虽然可以 handle,并在 resource 可行的情况下将那一部分数据 shuffle 到其余可用的 worker 上进行调度处理,但是数据的搬移以及 DAG 后续 task 的数据倾斜带来的性能的下降,甚至是资源不足导致的任务最终失败,可能会导致 spark 任务运行更长的时间,从而可能引发更大的 spot 收回的机会。因此,我们采用了 InstanceFleet 来降低对单一机型的依赖,InstanceFleet 可以针对 EMR 的 Master,Core 和 Worker node 分别设置混合机型和 lifeCycle 类型,以期能够在实例资源紧张的情况下满足目标容量。

一个 InstanceFleet 目前可以最大支持配置 15 种机型,目前常用的几类可以用来运行 Spark 的机型有 C 型,R 型和 M 型,C 型更倾向于计算密集型的任务。除了 C 型的 CPU 主频更高以外,几类机型的主要区别在于 vCPU/memory 的比例,R 型更适合内存使用比较高的任务,C 型比较适合计算类型的任务,而 M 型比较折中。

不同类型的 instance capacity 不同,在 InstanceFleet 中可以根据容量设置不同的 unit 值,unit 值默认等于 vCore 的数量。实际使用中,我们有两种做法:

  • 一种是时候用默认,如 R5.12xlarge 是 48 个 unit,那么 c5.24xlarge 就是 96 个,这样在预估好了集群所需资源之后,可以直接根据所需 CPU 资源转换成 unit 个数就可以申请了,这种情况适合于配置到 InstanceFleet 的机型都具备一样的 CPU/memory 比例,或者是部分机型的 MEMORY 比例大于目标值,且愿意承担一部分的 memory 资源浪费。如果选入了 memory 比例小的机型就可能会由于该种机型 memory 不足,无法无法按照预期申请到足够的 executor,导致 spark application 由于资源不够停留在 accept 阶段;
  • 另一种方案是根据 Spark 应用的 executor 配置资源,根据预期所选的机型上可以启动的 executor 最大个数的最大公约数作为一个 unit,这种情况的限制在于集群的申请和 spark 应用的配置绑定了起来,虽然可以更高效的使用集群资源,但是一旦应用配置变化,就需要重新部署集群,定制化程度较高,有利有弊。

InstanceFleet 目前还存在一些 limitation,如:

  • 我们无法主动设置不同 instance type 的 provision 优先级。在一些场景下,我们期望加入更多的机型备选,然而其中的某些实例类型我们仅希望在主力机型不足的情况下作为补充机型加入,这种诉求目前是无法实现的;官方给出的解释是,其内部会有算法来计算我们需要的 units,会结合总体开销和实例资源情况综合考虑进行机器的 provision。
  • 第二个 limitation 是无法在集群创建之后进行机型的增删改,也不能对 fleet 中机型的配置进行修改,如 EBS 存储大小。这对于 long term 运行的集群需要进行调整的时候并不是一个友好的行为。
  • 另外就是在 AWS console 上查看 InstanceFleet 的状态时,无法高效的找到当前处于某种状态的机器,因为 fleet 会保留历史上 1000 个机器的记录,当超过 1000 个历史机器时,就会丢弃掉最早的记录,始终保持但是页面上需要不断刷新才能获取完整的 list,此时对于在 WEB 上进行直观调试的用户而言不太友好,而对于使用 aws cli 的用户来说,我们可以通过 list fleet 并 filter 某种状态的 instance 来获取自己想要的结果。
  • 下面是使用的 InstanceFleet 的配置情况:
集群伸缩策略

对于不同的应用场景,我们目前使用了两种伸缩策略,一种是由任务调度端根据任务情况进行主动 scale,一种是通过监控集群状态由 EMR 进行被动的 scale。

主动伸缩的方式

对于基于 Optimus 的 ETL pipeline 来说,对于每个 batch,Spark 需要多少 resource 进行计算,我们可以通过历史数据进行拟合,找出数据量和所需资源的关系,并通过不断反馈,可以越来越准确的估计出对于给定量的数据集所需的集群资源,由于 Optimus 运行的 EMR 集群是 dedicated 的,所以我们就可以在提交 Spark 任务之前就去精准的 scale 集群达到目的 capacity。下图是 Worker node 伸缩在 Airflow 调度下的 workflow。

被动伸缩的方式

Datafeed pipeline,实际上是多条不同 schedule 间隔,不同资源需求,以及不同任务 SLA 需求的任务集合,由于每个任务之间相互独立,此时 EMR 相当于一个共享的计算资源池,很难从每个任务 schedule 的维度去管理 EMR 集群的 capacity。所以我们采用了EMR managed scaling。启动了此项功能之后,EMR 会持续评估集群指标,做出扩展决策,动态的进行扩容。此功能适用于由实例组或实例队列组成的集群。另外我们在 Yarn 上设置了不同的 queue,以期能够将不同资源需求和优先级的任务做粗粒度的隔离,结合上 Yarn 的 capacity scheduler,让整体集群资源尽量合理使用。在实际使用中,我们多次遇到了无法自动伸缩的情况,此时需要手动触发一次伸缩,之后就可以恢复自动了,目前原因不详。

以上采用的两种方案各有利弊,对于第一种方案,更适用于一个 dedicated 集群用于提交完全可预知 capcity 的场景,这种情况下我们在提交任务之前就可以主动的将集群 size 设置成我们想要的 capcity,快速而准确; 而对于第二种场景,非常适合用于 EMR 作为一个共享的计算平台,应用端作为单一的任务提交者无法获取当前及未来提交的任务全貌,也就无法计算 EMR 所需扩充的 capacity,这种情况下 EMR 集群需要在任务提交之后根据一些集群 metrics 才能进行动态调整伸缩,在时效性上会有延迟,对于一些 DAG 较为复杂,中间步骤较多且对 shuffle 和数据倾斜敏感的 Spark 应用来讲也不友好。

针对以上 case,Transformer 团队正在开发一套运行与 EMR 和 Yarn 之上的基于应用和策略角度运维的 Framework - Cybertron。我们期望能通过这个服务可以站在更全局的角度去合理的管理多个 EMR 集群资源,能够让应用端不去关注 EMR 集群的资源情况,综合 Yarn 的 scheduler 和 Queue 等的配置,对集群资源和任务调度能有一个更高视角的控制。

Spot 和 Ondemand 机型的混用

实际应用中,即使我们选择了 InstanceFleet,也会由于极端的情况导致无法配齐资源,此时我们不得不 Ondemand 机型来补足缺口,如上面的流程图所示,当等待集群 scale 一定时间之后,如果依然无法配齐需求目标,我们就会申请 ondemand 机型用来补足,在 InstanceFleet 的场景下,ondemand 类型的机型是和 Spot 机型范围是一样的。值得注意的一点是,虽然 EMR 中可以配置 Provisioning timeout 的 action 是After xx minutes, Switch to On-Demand instances,但实际情况是不会起作用,仍然需要我们根据实际 InstanceFleet 的情况去主动申请 Ondemand 机型。

对于更极端的情景,如黑五到来或者美国大选期间,我们会主动直接将所以申请机型直接替换成 ondemand,以防止频繁的无法配齐 Spot 带来的额外时间开销。

另外,在 Spark job 运行期间,如果遇到了因为 Spot 机型回收导致的任务中断的情况,我们会在 Airflow 的调度下,根据当前集群状态加大 ondemand 机型进行配比,以期能够快速恢复。

效果

下图是使用了主动 scale 方案的集群,在一天内 24 个小时集群根据数据量的情况进行伸缩的情况,红色柱子是集群的 Memory capcity,蓝色+绿色的累加部分为实际使用的内存量。从实际使用来看,我们可以做到在任务提交前主动 scale out 到期望的 capacity,每个 batch 的任务可以充分利用集群资源,在 SLA 时间内完成数据处理,并在不需要集群之后马上 scale in 进来。在降低开销的同时满足任务需求。

下图是使用了 EMR autoScale 方案的集群。我们可以看到随着更多的 Spark 任务提交,集群负载变高,集群根据负载情况 scale out 了两次,最终在任务全部做完,并空闲一段时间后将 Worker node 缩回了 0。

HDFS 的依赖

在我们的使用场景中,EMR 仅作为计算资源,其上的 HDFS 仅需要支撑 Spark 应用即可。在一次批处理的过程中,生成的数据会先存储在 HDFS 上,然后由 publisher 将数据搬移并持久化到 S3 上。关于为什么不直接写入 S3,一方面是考虑到数据的特点需要在发布的时候进行一次重新组织,而 S3 的最终一致性模型会带来第二次 copy 的时候发生数据丢失(针对这个 case,我们仍然可以由 producer 端在写出数据的同时产生一份 file list,作为上下游数据的接口来解决;另外也可以通过开启一致性视图,不过这个带来额外的组件依赖和开销;根据最新的 AWS 文档,S3 已经解决的 read-after-write 的问题read-after-write-consistency,但是对于先读后写再读的 case,仍然会在 list 的情况下存在一致性问题)。另外,Spark 直接写 S3 文件也存在着一定的性能问题,而且由 Spark 应用直接针对不同的数据发布特点组织数据形式,也会造成逻辑耦合太紧,不利于维护,还会加大 Spark 应用的运行时间,变相增加了成本同时,对于使用 Spot 竞价机器的场景,更长的运行时间也就意味着更大的被中断机会。

总结

随着我们对 EMR 使用的越来越深入,在满足产品需求的同时,我们也在不断“榨干”EMR 开销,本文期望能通过我们的 EMR 使用经验给读者启发。也感谢在这个过程中 AWS EMR 团队的支持。

另外,AWS EMR 团队也在根据客户的实际需求不断完善和推出新的功能,目前我们正在调研试用 AWS 最新推出的 EMR on EKS,期望能够有更灵活的使用方式、更快的 scale 速度和更小的开销。我们会在后续的文章中继续更新进展。

作者介绍

彭康,Lead Software Engineer FreeWheel,毕业于中科院计算所,目前就职于 Comcast FreeWheel 数据产品团队,主要负责广告数据平台数据仓库的建设。

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/LAY1Uag3LGLq9OCDFc6W
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券