如何实现端对端的 exactly once?

001

Exactly Once的实现

如果实时作业要实现端对端的 exactly once 则需要数据源、数据处理与数据存储的三个阶段都保证 exactly once 的语义。目前基 于Kafka Direct API加上Spark RDD 算子精确一次的保证能够实现端对端的 exactly once 的语义。

在数据存储阶段一般实现 exactly once 需要保证存储的过程是幂等操作或事务操作。很多系统本身就支持了幂等操作,比如相同数据写 hdfs 同一个文件,这本身就是幂等操作,保证了多次操作最终获取的值还是相同;HBase、ElasticSearch 与 redis 等都能够实现幂等操作。对于关系型数据库的操作一般都是能够支持事务性操作。

官方在创建 DirectKafkaInputStream 时只需要输入消费 Kafka 的 From Offset,然后其自行获取本次消费的 End Offset,也就是当前最新的 Offset。保存的 Offset 是本批次的 End Offset,下次消费从上次的 End Offset 开始消费。

当程序宕机或重启任务后,这其中存在一些问题。如果在数据处理完成前存储 Offset,则可能存在作业处理数据失败与作业宕机等情况,重启后会无法追溯上次处理的数据导致数据出现丢失。如果在数据处理完成后存储 Offset,但是存储 Offset 过程中发生失败或作业宕机等情况,则在重启后会重复消费上次已经消费过的数据。

而且此时又无法保证重启后消费的数据与宕机前的数据量相同数据相当,这又会引入另外一个问题,如果是基于聚合统计指标作更新操作,这会带来无法判断上次数据是否已经更新成功。

所以在 muise spark core 中我们加入了自己的实现用以保证 Exactly once 的语义。具体的实现是我们对 Spark 源码进行了改造,保证在创建 DirectKafkaInputStream 可以同时输入 From Offset 与 End Offset,并且我们在存储 Kafka Offset 的时候保存了每个批次的起始Offset 与结束 Offset,具体格式如下:

如此做的用意在于能够确保无论是宕机还是人为重启,重启后的第一个批次与重启前的最后一个批次数据一模一样。这样的设计使得后面用户在后面对于第一个批次的数据处理非常灵活可变,如果用户直接忽略第一个批次的数据,那此时保证的是 at most once 的语义,因为我们无法获知重启前的最后一个批次数据操作是否有成功完成。

如果用户依照原有逻辑处理第一个批次的数据,不对其做去重操作,那此时保证的是 at least once 的语义,最终结果中可能存在重复数据;最后如果用户想要实现 exactly once,muise spark core 提供了根据topic、partition 与 offset 生成 UID 的功能。

只要确保两个批次消费的 Offset 相同,则最终生成的 UID 也相同,用户可以根据此 UID 作为判断上个批次数据是否有存储成功的依据。下面简单的给出了重启后第一个批次操作的行为。

002

Metrics系统

Musie spark core 基于 Spark 本身的 metrics 系统进行了改造,添加了许多定制的 metrics,并且向用户暴露了 metrics 注册接口,用户可以非常方便的注册自己的 metrics 并在程序中更新 metrics 的数值。最后所有的 metrics 会根据作业设定的批次间隔写入 Graphite,基于公司定制的预警系统进行报警,前端可以通过 Grafana 展现各项 metrics 指标。

Muise spark core本身定制的metrics包含以下三种:

Fail 批次时间内 spark task 失败次数超过4次便报警,用于监控程序的运行状态。

Ack 批次时间内 spark streaming 处理的数据量小0便报警,用于监控程序是否在正常消费数据。

Lag 批次时间内数据消费延迟大于设定值便报警。

其中由于我们大部分作业开启了 Back Pressure 功能,这就导致在Spark UI 中看到每个批次数据都能在正常时间内消费完成,然而可能此时 kafka 中已经积压了大量数据,故每个批次我们都会计算当前消费时间与数据本身时间的一个平均差值,如果这个差值大于批次时间,说明本身数据消费就已经存在了延迟。

下图展现了预警系统中,基于用户自定义注册的Metrics以及系统定制的Metrics进行预警。

003

容错

其实在上面 Exactly Once 一章中已经详细的描述了 muise spark core如何在程序宕机后能够保证数据正确的处理。但是为了能够让 Spark Sreaming 能够长时间稳定的运行在Yarn集群上,还需要添加许多配置,感兴趣的朋友可以查看:Long running Spark Streaming Jobs on Yarn Cluster。

除了上述容错保证之外,Muise Portal(后面会讲)也提供了对 Spark Streaming 作业定时检测的功能。目前每过5分钟对当前所有数据库中状态标记为 Running 的 Spark Streaming 作业进行状态检测,通过Yarn提供的REST APIs可以根据每个作业的Application Id查询作业在Yarn上的状态,如果状态处于非运行状态,则会尝试重启作业。

Tips:

下周,我会分享一下在封装完所有的Spark Streaming之后,我们就需要有一个平台能够管理配置作业,也就是 Muise Portal ,大家记得每天17:40 来看文章哈。祝大家周末愉快~

  • 发表于:
  • 原文链接https://kuaibao.qq.com/s/20180914B1FDBS00?refer=cp_1026
  • 腾讯「云+社区」是腾讯内容开放平台帐号(企鹅号)传播渠道之一,根据《腾讯内容开放平台服务协议》转载发布内容。
  • 如有侵权,请联系 yunjia_community@tencent.com 删除。

扫码关注云+社区

领取腾讯云代金券