前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >腾讯云 AI 视觉产品基于流计算 Oceanus(Flink) 计费数据去重尝试

腾讯云 AI 视觉产品基于流计算 Oceanus(Flink) 计费数据去重尝试

作者头像
吴云涛
发布2022-01-28 14:54:37
9950
发布2022-01-28 14:54:37
举报

| 导语: 介绍下最近使用 Flink 来对计费数据进行去重的具体做法

一. 背景

AI 视觉产品在我们腾讯云-人工智能的产品目录下,包括人脸识别、人脸特效、人脸核身、图像识别、文字识别等。 流计算 Oceanus 在腾讯云-大数据的产品目录下,是基于 Apache Flink 构建的企业级实时大数据分析平台。 AI 视觉产品是按调用量计费,毕竟涉及到钱,用户对计量数据准确是非常敏感的; 另外调用量本身也比较大,如何保证数据的准确一致也是一个比较大的挑战。 数据不准: 主要包括数据丢失和数据重复(当然可能有其他问题比如上报的数据本身错误等,暂不属于本次讨论范围)。 数据丢失: 相当于调用量少算,会影响我们的收入。一方面我们通常重试、持久化等方式尽量减少数据的丢失,目标当然是完全不丢,但很难做到100%不丢。另一方面很少量的数据丢失对于实际收入影响很小,对用户基本没有影响。 数据重复: 相当于调用量多算就会多收用户钱,用户一旦发现肯定会投诉过来。所以是必须要去解决的,但是数据量很大,要做到精确去重比较难。

整体的背景和处理逻辑可以参考如下业务流程图, 本次主要介绍下我们在数据去重方面的一些尝试。

业务流程图
业务流程图

系统架构图:

架构图
架构图

二. 思路与调研

去重的触发时机: 数据重复的原因主要是各种重试:包括上游传输环节的超时重试和下游计算环节的系统重启导致的数据重算。因为我们通常使用的是最终的数据,只要保证最终数据不重复即可,所以只要在最后的计算环节进行一次去重就可以,前面的环节不用处理。

去重的技术手段: 保证数据处理中不重复、不丢失(数据一致),通常有 2 个技术手段:事务和幂等可重入(幂等重入可能出现部分数据插入了的时间段,没有事务还能保证过程中的精确,但如上所述我们只要最终数据一致,所以幂等也是可以的)。

事务的实现难度高,尤其在分布式或多个组件要用到 2PC 之类的事务,更加复杂;所以通常事务都是组件本身成熟的实现,很少从头开发的; 而幂等通常是使用数据的唯一键来保证去重,但是在我们数据累计这里不适用,因为聚合时的数据的顺序和数量在每次计算时不是固定的,所以如果出现重启要重新计算时并不能生成和上次一样的唯一键,就难以使用键去重。

经过调研发现 Flink 本身是支持 2PC 事务和内部的状态存储,可以做到 exactly-once,当然使用起来会有成本(包括学习成本、问题排查等,Flink 的开发入门和资料可以参见 Flink 入门 1-零基础用户实现简单 Flink 任务[2] 和 Flink 入门9-Jar 作业开发[3])。 考虑到后续我们数据量增加后的数据处理能力以及其他一些流处理的场景都还是会用到 Flink,所以与其自己 DIY 不如使用成熟开源的组件,也符合当前开源协同的趋势,所以决定直接使用 Flink 里面的去重,下面是 Flink 实现 2PC 的流程:

2PC 事务有 2 个角色:协调者(发起者、控制者)和参与者(要支持本地事务),如上图所示。 Flink 的 JobManager 是协调者;Flink 内部的状态、流程属于内部参与者;Kafka 作为 Source 和 Sink 是外部参与者,尤其是作为 Sink 的 Kafka 要选择支持事务的版本(>=0.11)。

Flink 介绍事务的可以参考 官方文档[4]。

Spark vs Flink 在 excatly-once 上的对比讨论,也说到很多基本概念的理解:https://zhuanlan.zhihu.com/p/77677075

三. 实现

基本的流程和上图一致,基本代码如下:

代码
代码

简单介绍下每个步骤,同时讨论下可能的问题:

0. 前提:

Flink 打开 Checkpoint,相当于 Flink 打开了 redo、undo log 等持久化的机制,是事务的基础。

1. 事务开始:

从上一个处理完成 offset 消费 Kafka 数据,当然这里 Kafka 里面的数据格式需要自行去解析,可以做些简单的处理。

2. 事务处理:

2.1 按照用户 KeyBy 分流,提高并发

按照用户分流可以保证同一个用户在同一个处理流中,从而保证数据去重(不同用户的数据认为不会重复的)。当然这里也有一个数据倾斜的问题:如果某个用户调用量特别大就会导致部分流负载很高,拖累这个处理的速度,目前我们的数据分布根据测试还好。通过学习了解到如果数据倾斜严重可以再次选择更好的 Key 分流:比如可以按照用户 RequestId 的前缀进行分流更均匀,另外 Flink 也提供了 rebalance 的接口强制将数据打散,当然要符合逻辑数据分布要求。

2.2 声明 map 状态存储处理过的数据,用于去重

测试时可以选 memory,但是线上还是要使用 Rocksdb 应对数据量大的场景,同时要开启 TTL 机制避免状态太大对内存和 Checkpoint 保存和恢复时产生太大的 IO 压力,开始时建议先选择比较短的 TTL,观察内存和负载再逐步调大,目前我们 TTL 可以到 15 分,希望可以逐步调大到 1 小时 - 10 小时。 当然当这里如果数据量特别大时,用到的存储也就很,很容易磁盘、内存产生大的压力,所以这里要进行实际的测试和调整:比如增多机器提高并发,或者使用 Rocksdb 增量式的 Checkpoint 等。 这里存储数据的时间长短决定了去重的数据的范围,如果太大如上所述对存储压力很大,造成 Flink 运行不稳定;但如果太小只能小局部去重,对于跨度比较大的数据重复不能应对,比如跨天的数据也可能重复,在离线上报的链路中就可能跨天重试的,通常在实时上报的链路不会出现,对于这种长时间还有重复的,目前想到有 2 个处理的方向(还没具体落地):

  1. 使用 Redis 存储处理过的数据(不要求很及时),上报时先去这里去重;问题首先是对存储压力增大不少,同时要增加一次查重的耗时
  2. 要求上报方记录下上报的结果不要重复上报,即使重复上报时间间隔也不能太长这里虽然对业务不友好,但是可以理解,毕竟极端情况下也有现在 30 号要从新上报 1 号的数据也可能出现,那如果用方案 1,就要 1 个月的数据完全存储下来成本太大。
2.2.1 去重 Key 的选择

通常来说直接选 RequestId 就可以,当然保险起见,加上用户维度也是可以的(可以应对下 RequestId 少量重复的情况)。但测试中发现几个问题:

  1. 用户一次的请求,到后台业务对应多次处理都上报给计费了,组合结果后返回给用户——这样就会导致只统计了其中一个操作,其他操作被去重过滤。解决的方法就是去重 Key 也加上 action。
  2. 接入层重试的情况,第一次请求处理失败上报了失败,然后接入层重试成功了上报了成——这样就会导致只统计了失败的,成功的上报被过滤了。解决的方法也是加上错误码。

3. 数据聚合:

3.1 目前 window 选择1分钟粒度聚合汇总
3.2 聚合 Key 根据业务需要进行选择
3.3 出库到 Kafka 时生成一个 uuid

uuid 是 java 自带的函数生成,相当于一个全局唯一随机数,好处就是有了唯一键,后面数据处理、入库时就很方便。

4. 输出到 Kafka:

目前 Flink 内置的支持事务的 Sink 只有 Kafka>0.11。当然可以根据 Flink 的 2PC (两阶段提交) 接口自行去实现需要后端的 Sink,比如 MySQL、PG。这里我们使用 Kafka 作为输出除了简单成熟,另外就是考虑到如果数据量增大,Kafka 这里的大数据能力就是天生的,数据库就需要扩容或替换——当然这里增加了一个 Kafka 到 PG 的同步的流程,流程变的更长了;但是考虑到后续数据量大和解耦的考虑,还是推荐出库到 Kafka。

下面是 Flink 2PC 的 Sink 要继承实现的接口:

代码语言:javascript
复制
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT> extends RichSinkFunction<IN>
        implements CheckpointedFunction, CheckpointListener {
  
    // 开始一个事务,返回事务信息的句柄
    protected abstract TXN beginTransaction() throws Exception;
  
    // 预提交(即提交请求)阶段
    protected abstract void preCommit(TXN transaction) throws Exception;
​
    // 正式提交阶段
    protected abstract void commit(TXN transaction);
​
    // 取消事务
    protected abstract void abort(TXN transaction);
}
4.1 打开 Kafka 事务出库

如上所述,Flink 的 Kafka 连接器在流计算 Oceanus (Flink) 平台已经支持,可以直接使用。这里是事务出库到 Kafka 的,那么后续读取 Kafka 这里的数据也要配置 read_commited 的级别的读,整个链路数据一致。

四. 问题

在上面的流程说明中已经就每步可能的问题进行了说明和讨论,但是肯定还有新的问题,就需要后续运营过程中发现进行修复。这里我们预料比较麻烦的问题是: 如果 2PC 事务过程中出现异常问题时,是否可以比较快、完美的恢复回来;否则可能出现死锁或启动不起来的情况。 以上是最近 2-3 个月的实现的情况,后面还会继续验证、继续发现问题,所以还是要进一步的学习和理解 Flink 的底层机制,甚至可以进行代码级的贡献——这一步肯定非常难,短时间不可能完成,初期投入很多但是产出不多,但是可以肯定的是值得长期投入。 本文作为 Flink 应用的一次尝试,如发现有错误请直接指出,同时欢迎有相同需求的同学一起讨论。

五. 参考链接

[1] Flink 实践教程:入门 1-零基础用户实现简单 Flink 任务:https://cloud.tencent.com/developer/article/1895677

[2] Flink 实践教程:入门9-Jar 作业开发:https://cloud.tencent.com/developer/article/1907822

[3] Flink 事务介绍:https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html

转自:李凯斌,腾讯专家工程师

本文系外文翻译,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文系外文翻译前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 一. 背景
  • 二. 思路与调研
  • 三. 实现
    • 0. 前提:
      • 1. 事务开始:
        • 2. 事务处理:
          • 2.1 按照用户 KeyBy 分流,提高并发
          • 2.2 声明 map 状态存储处理过的数据,用于去重
          • 2.2.1 去重 Key 的选择
        • 3. 数据聚合:
          • 3.1 目前 window 选择1分钟粒度聚合汇总
          • 3.2 聚合 Key 根据业务需要进行选择
          • 3.3 出库到 Kafka 时生成一个 uuid
        • 4. 输出到 Kafka:
          • 4.1 打开 Kafka 事务出库
      • 四. 问题
      • 五. 参考链接
      相关产品与服务
      流计算 Oceanus
      流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
      领券
      问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档