京东Flink优化与技术实践

导读:Flink是目前流式处理领域的热门引擎,具备高吞吐、低延迟的特点,在实时数仓、实时风控、实时推荐等多个场景有着广泛的应用。京东于2018年开始基于Flink+K8s深入打造高性能、稳定、可靠、易用的实时计算平台,支撑了京东内部多条业务线平稳度过618、双11多次大促。本次讲演将分享京东Flink计算平台在容器化实践过程中遇到的问题和方案,在性能、稳定性、易用性等方面对社区版Flink所做的深入的定制和优化,以及未来的展望和规划。

实时计算引进

1.发展历程

最初大数据的模式基本都是 T+1,但是随着业务发展,对数据实时性的要求越来越高,比如对于一个数据,希望能够在分钟级甚至秒级得到计算结果。京东是在 2014 年开始基于 Storm 打造第一代流式计算平台,并在 Storm 的基础上,做了很多优化改进,比如基于 cgroup 实现对 worker 使用资源的隔离、网络传输压缩优化、引入任务粒度 toplogy master 分担 zk 压力等。到 2016 年,Storm 已经成为京东内部流式处理的最主要的计算引擎,服务于各个业务线,可以达到比较高的实时性。

随着业务规模的不断扩大,Storm 也暴露出许多问题,特别是对于吞吐量巨大、但是对于延迟不是那么敏感的业务场景显得力不从心。于是,京东在 2017 年引入了 Spark Streaming 流式计算引擎,用于满足此类场景业务需要。

随着业务的发展,不光是对于数据的延迟有很高要求,同时对于数据的吞吐处理能力也有很高的要求,所以迫切需要一个兼具低延迟和高吞吐能力的计算框架,于是在 2018 年我们引入了 Flink。在 Flink 社区版的基础上,我们从性能、稳定性、易用性还有功能等方面,都做了一些深入的定制和优化。同时我们基于 k8s 实现了实时计算全面的容器化,因为容器化有很多的优点,它可以做到很好的资源隔离,同时它有一个很强的自愈能力,另外它很容易实现资源的弹性调度。同时我们基于 Flink 打造了全新的 SQL 平台,降低用户开发实时计算应用的门槛。

到 2020 年,基于 Flink 和 k8s 实时计算平台已经做的比较完善了。过去流式处理是我们关注的重点,今年我们也开始逐渐的支持批处理,朝着批流一体的方向演进。另外 AI 是目前比较火的一个方向,对于 AI 来说,它的实时化也是一个重要的研究方向。所以我们的实时计算平台将会朝着批流一体和 AI 的方向进行发展。

2.平台架构

上面是京东实时计算平台 JRC 的整体架构,整个架构以定制化改造后的 Flink 为核心,Flink 运行在 K8S 上,状态存储在 HDFS 集群上,通过 Zookeeper 保证集群的高可用。支持流式源 JDQ(京东基于 Kafka 深入定制实现的实时数据总线)和 Hive,数据主要写入 JimDB(京东内存数据库)、ES、Hbase 和京东 OLAP。计算平台支持 SQL 和普通 JAR 包两种方式的作业,具有配置、部署、调试、监控、和日志处理等功能。

3. 业务场景

京东 Flink 服务于京东内部非常多的业务线,有 70 多个一级部门在使用,主要应用场景包括实时数仓,实时大屏,实时推荐,实时报表,实时风控和实时监控,当然还有其他一些应用场景。对数据计算实时性有一定要求的场景,一般都会使用 Flink 进行开发。

4. 业务规模

京东 Flink 集群目前由 5000 多台物理机组成,它服务了京东内部 70 多个一级业务部门,目前线上的流计算任务大概有 3000 多个,数据的处理能力可以达到每分钟数十亿甚至更高。

Flink 容器化实践

1.容器化历程

京东从 2018 年开始进行计算引擎的容器化改造,2019 年初已经实现计算单元全部容器化,2020 年进行了容器化方案升级,使用 native k8s 实现计算资源的弹性扩容。容器化改造的好处是提升了资源使用率,提高了研发效率,增强了业务稳定性,减少了运维部署成本。

2.容器化方案

旧的容器化方案是基于 k8s Deployment 部署的 Standalone Session 集群,它需要事先预估出集群所需资源,比如需要的 JobManager 和 TaskManager 的资源规格和个数。然后 JRC 平台通过 K8S 客户端向 K8S Master 提出请求,创建 JobManager 的 Deployment 和 TaskManager 的 Deployment。其中使用 ZK 保证高可用,使用 Hdfs 实现状态存储,使用 Prometheus 实现监控指标的上传,结合 Grafana 实现指标的直观展示。集群使用 ES 存储日志,方便日志的查询。

3.容器化遇到的问题 &对策

容器化过程中可能遇到很多问题:

JM/TM 故障自动恢复

应用部署在容器中,当应用出现异常时,如何发现应用或者异常的情况呢?比如可以使用存活探针,编写检测脚本定期读取应用的心跳信息。当检测到 Pod 处于不健康状态时,可以采用 k8s 的重启机制来重启不健康的容器。

减少 Pod 异常对业务影响

在 k8s 中由于硬件异常、资源过载、Pod 不健康等问题会导致 Pod 被驱逐或自动重启,Pod 重启时势必会影响到该 Pod 上分布计算任务的正常运行。这个时候可以考虑采用适当的重启策略、改造内核等方案来减少对任务影响。比如京东实现了 JM Failover 优化,当 Pod 异常引起 JM Failover 时采用的是任务不恢复、重建任务状态恢复的方式,可以一定程度上减少 Pod 重启对业务带来的影响。

性能问题

在容器环境下,JVM 对 cpu 和内存的感知会有一定的问题,在 Java8 版本中,一些参数就要进行显式的设置。对于机器性能差异或热点等问题导致部分 Pod 计算慢的问题,可以考虑进行针对性优化(比如实现基于负载的数据分发)或处理(比如检测到计算慢的 Pod 将其驱逐到负载较低的机器)。此外,对于使用容器网络的情况下,可能会带来一定的网络性能损耗,此时可以根据情况选择使用主机网络避免网络虚拟化带来的开销,或者选择更高性能的网络插件。

重要业务稳定性

如何保证业务的稳定性是一个需要重点考虑的问题。除了保证系统各个环节的高可用外,还可以根据业务情况考虑使用其它合理的方案,例如业务分级管理,独立资源池,多机房互备等。

4.容器化方案升级(Native k8s)

原有容器化方案存在一定的问题:

  • 资源需要提前分配
  • 无法实现资源弹性伸缩
  • 极端场景下Pod不能正常拉起,影响任务恢复
  • 重要业务稳定性

容器化升级的解决方案是采用 Native K8s 的方式。由 JRC 平台先向 K8S Master 发出请求,创建 JobManager 的 Deployment;然后在用户通过 Rest 服务提交任务后,由 JobMaster 通过 JDResourceManager 向 JRC 平台发出请求,然后 JRC 平台向 K8s Master 动态申请资源去创建运行 TaskManager 的 Pod。

此处,通过引入 JRC 平台与 K8s 交互,屏蔽了不同容器平台的差异,解耦了镜像与平台集群配置 &逻辑变化。另外,为了兼容原有 Slot 分配策略,在提交任务时会预估出任务所需资源并一次性申请,之后采用等待一定时间后进行 slot 分配的方式达到兼容目的。

Flink 优化改进

主要做了以下四个方面的优化:

  • 性能
  • 稳定性
  • 易用性
  • 功能扩展

下边分几个重要的点进行讲解:

1.预览拓扑

预览拓扑主要是为了解决业务的一些痛点:比如任务调优繁琐、SQL 任务无法指定并行度、任务需要的额 Slot 数不清楚、并行度调整后网络 buffer 不足等。在 Flink 任务调试阶段,对任务并行度、Slot 分组、Chaining 策略的调整是个反复的过程,如果把参数写到命令行就太繁琐了。而基于预览拓扑就可以很方便地对这些参数进行配置。

预览拓扑基本的实现方案如上图:用户提交 JAR 包后可根据 JAR 包生成对应的拓扑图,之后用户根据拓扑图可以进行在线调整,最后自动将修改后的配置和原来的 JAR 包一起进行任务提交。

预览拓扑机制使得不修改程序多次提交任务调优成为可能,但是如何保证前后两次提交生成算子稳定的对应关系呢?解决方案的关键是保证算子有稳定的唯一身份标识,具体算法是:如果算子指定了 uidHash 就用 uidHash,如果算子指定了 uid 就使用 uid,否则就从 source 开始广度优先遍历,利用算子在 graph 中的位置生成一个稳定 hash 值。

2.背压量化

第二个重要的优化是背压量化。

在 Flink 开发的时候,主要有两种方式:

通过 Flink UI 背压面板观察是否背压。使用这种方式在某些场景比较方便,但是它存在几个问题:

  • 在有些场景下采集不到背压
  • 对于历史背压情况无法跟踪
  • 背压影响不直观
  • 大并行度时背压采集压力

通过任务背压相关指标进行观察和分析,通过将指标定期采集并存储起来,可以进行实时或历史的背压分析。但是它也有一些不足的地方:

  • 不同Flink版本中指标含义有一定差异
  • 分析背压有一定门槛,需要对于指标含义有深入理解,联合进行分析
  • 背压未量化,对业务影响程度不够直观

京东的解决方案是采集背压发生的位置、时间和次数指标,并对这些指标进行上报存储。同时对量化的背压指标结合运行时拓扑,可以精确反映发生背压现场的情况。

3.HDFS 优化

随着业务数量的增多,HDFS 集群的压力就会变得很大。这会直接导致 RPC 响应时间变慢,造成请求堆积,同时大量小文件也会对 NN 内存造成很大压力。对此京东尝试的解决方案有 4 方面:限制 checkpoint 最小间隔,时间最小设置在 1min 左右可以满足大部分业务需求;进行小文件合并;降低 cp 创建和删除时的 hdfs rpc 请求;HDFS 集群多 ns 分散均衡压力。

4.网络分发优化

在实践过程中我们发现,即使业务使用了 rebalance 并且对任务进行了打散分布,但是由于机器处理能力和负载的差异,会导致任务各个并行度不同程序的背压表现,严重影响了任务的性能。为此,我们开发了基于负载的动态 rebalance,在数据进行分发时优先选择下游负载最小的 channel 进行分发。

经测试,在特定场景下性能能够提升近一倍。

5.ZK 防抖

目前一般都是使用 ZK 集群实现 Flink 集群的高可用,但是当网络抖动、机器繁忙、ZK 集群暂时无响应或运维机器的时候,都可能会导致任务重启。

任务重启的原因是由于在这些场景发生时,Curator 会将状态设置为 suspended,并且 Curator 认为 suspended 为 Error 状态,从而会释放 leader,Flink 发现 notleader 后会 revokeLeadership,从而造成任务重启。

一个可行的解决办法是升级 Curator 的版本,同时将 connectionStateErrorPolicy 设置为 SessionConnetionStateErrorPolicy。

6.日志分离

目前我们一个集群是支持跑多个任务的,这时日志会出现的问题是:任务的日志和集群 Framework 日志混在一起,同时集群的多个任务日志也是混在一起的,不太方便用户查看日志,快速定位问题。

为了解决这个问题,首先要弄清楚目前 Flink 加载日志框架的基本机制:为了避免跟业务 Job 中可能包含的日志框架的依赖、配置文件产生冲突,Flink 日志相关类的加载都代理给 TaskManager 框架的类加载器,也就是 Parent Classloader,而框架加载的这些类都是从 Flink 安装包的 lib 目录下加载的。对于日志配置文件,Flink 通过 JVM 启动参数来指定配置日志配置文件路径。

日志分离的解决方案是:将日志相关 jar 包加入到各个 task 自己 classloader(user classloader)的类路径中;同时确保使用 user classloader 加载日志类和加载自己的日志配置;

另外对于使用了 Flink 框架的类(比如 PrintSinkFunction),日志不能做到很好的分离,可以考虑使用 logback MDC 机制。

未来规划

未来规划主要包括四个方面:

统一计算引擎

引擎 Storm 全部升级为 Flink,这样可以减少平台的运维成本,同时可以提高作业性能(目前已经接近完成)。

更多 SQL 作业

持续完善 SQL 平台,降低用户的使用门槛,推动用户更多使用 SQL 开发作业。

智能运维

使用智能诊断,自适应调整运行参数,提升任务的鲁棒性

批流一体

深度打造批流一体实时计算平台,兼具低延迟的流处理和高性能的批处理能力。另外统一架构,实现代码复用,降低用户的使用成本。

本文转载自: [DataFunTalk](ID:datafuntalk)

原文链接:京东Flink优化与技术实践

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/1NVLdUu82iHMUsXXQRUq
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码关注腾讯云开发者

领取腾讯云代金券