首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink在中原银行的实践

Flink在中原银行的实践

作者头像
数据社
发布2021-09-23 11:24:27
1.1K0
发布2021-09-23 11:24:27
举报
文章被收录于专栏:数据社数据社

大家好,我是一哥,今天分享一下,Flink在郑州本地银行的实践。

在构建实时场景的过程中,如何快速、正确的实时同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎Apache Flink和数据湖两种技术,来解决业务数据实时入湖的相关问题。两者的结合能良好的支持实时数据落地存储,借助Apache Flink出色的流批一体能力,可以为用户构建一个准实时数仓,满足用户准实时业务探索。

一、Apache Flink和数据湖介绍

1.1 Apache Flink CDC原理

CDC全称是Change Data Capture,捕获变更数据,是一个比较广泛的概念,只要是能够捕获所有数据的变化,比如数据库捕获完整的变更日志记录增、删、改等,都可以称为CDC。该功能被广泛应用于数据同步、更新缓存、微服务间同步数据等场景,本文主要介绍基于Flink CDC在数据实时同步场景下的应用。

Flink在1.11版本开始引入了Flink CDC功能,并且同时支持Table & SQL两种形式。Flink SQL CDC是以SQL的形式编写实时任务,并对CDC数据进行实时解析同步。相比于传统的数据同步方案,该方案在实时性、易用性等方面有了极大的改善。下图是基于Flink SQL CDC的数据同步方案的示意图。

Oracle的变更日志的采集有多种方案,如上图所示,这里采用的Debezium实时同步工具作为示例,该工具能够解析Oracle的change log数据,并实时发送数据到下游Kafka。Flink SQL通过创建Kafka映射表并指定 format格式为debezium-json,然后通过Flink进行解析后直接插入到其它外部数据存储系统。

下面详细解析一下数据同步过程。首先了解一下Debezium抽取的Oracle的change log的格式,以update为例,变更日志上记录了更新之前的数据和更新以后的数据,在Kafka下游的Flink接收到这样的数据以后,一条update操作记录就转变为了先delete、后insert两条记录。日志格式如下所示,该update操作的内容的name字段从tom更新为了jerry。

  {
  "before": {       --更新之前的数据
    "id": 001,
    "name": "tom"
  },
  "after": {        --更新之后的数据
    "id": 001,
    "name": "jerry"
  },
  "source": {...},
  "op": "u",
  "ts_ms": 1589362330904
}

其次再来看一下Flink SQL内部是如何处理update记录的。Flink在1.11版本支持了完整的change log机制,对于每条数据本身只要是携带了相应增、删、改的标志,Flink就能识别这些数据,并对结果表做出相应的增、删、改的动作,如下图所示change log数据流经过Flink解析,同步到下游Sink Database。

通过以上分析,基于Flink SQL CDC的数据同步有如下优点:

a)业务解耦:无需入侵业务,和业务完全解耦,也就是业务端无感知数据同步的存在。

b)性能消耗:业务数据库性能消耗小,数据同步延迟低。

c)同步易用:使用SQL方式执行CDC同步任务,极大的降低使用维护门槛。

d)数据完整:完整的数据库变更记录,不会丢失任何记录,Flink 自身支持 Exactly Once。

1.2 数据湖介绍

数据湖(Data Lake)当前没有统一的定义,通常认为数据湖是一种支持存储多种原始数据格式、多种计算引擎、高效的元数据统一管理和海量统一数据存储。其中以Apache Hudi和Apache Iceberg为代表的表格式和Flink计算引擎组成的数据湖解决方案尤为亮眼。如图所示数据湖生态架构示意图。

提到数据湖就不得不说一下传统数据仓库,两者相比之下传统数仓的缺点有:

  • 不支持ACID 不支持Upsert场景,不支持Row-level delete,数据修改成本高
  • 时效性差 数据难以做到准实时可见,无法支持分钟级延迟的数据分析场景
  • 只能存储结构化数据 传统数仓不支持存储非结构化和半结构化数据

传统数仓有这些缺点,那么就可以使用数据湖代替数仓吗?答案是否定的,数据仓库、数据湖是数据技术不断发展的结果,是传承不是取代。理想中的数据湖或者湖仓一体,对于用户来说不需要清晰的区别湖和仓,数据有着打通的元数据的格式,它们之间可以自由的流动,也可以对接上层多样化的计算生态。

数据仓库:是一个针对OLAP优化的数据库,用于分析来自事务系统和业务线应用程序的关系型数据,因此数据仓库存储的都是经过了清洗、转换的结构化数据。数据仓库对数据提供高效地存储,便于用户通过报表、看板和分析工具来获取查询结果,从数据中获得洞察力、决策指导。

数据湖:可以存储来自业务线应用程序的关系型数据,也可以存储来自移动应用程序的日志、图片视频等非关系型数据。当不清楚某些数据存在的价值时,将数据以原生格式天然沉积在数据湖,为后续用户需要提供更好的分析探索。

二、实时数据入湖实践

当前使用Flink最新版本1.12,支持CDC功能和更好的流批一体。Apache Iceberg最新版本0.12和Apache Hudi 0.9两者均已经适配Flink CDC功能。其中比较重点的是数据湖的更新删除功能,先来了解一下什么是Row-Level Delete。

Row-Level Delete功能是指根据从一个数据集里面删除指定行。那么为什么这个功能那么重要呢?众所周知,大数据中的行级删除不同于传统数据库的更新和删除功能,在基于HDFS架构的文件系统上数据存储只支持数据的追加,为了在该构架下支持更新删除功能,删除操作演变成了一种标记删除,更新操作则是转变为先标记删除、后插入一条新数据。具体实现方式可以分为Copy on Write(COW)模式和Merge on Read(MOR)模式,其中Copy on Write模式可以保证下游的数据读具有最大的性能,而Merge on Read模式保证上游数据插入、更新、和删除的性能,减少传统Copy on Write模式下写放大问题。

Flink SQL CDC和数据湖的架构设计和整合如何巧妙,不能局限于纸上谈兵,下面就实际操作一下,体验其功能的强大和带来的便捷。

2.1 数据入湖环境准备

以Flink SQL CDC方式将实时数据导入数据湖的环境准备非常简单直观,因为Flink支持流批一体功能,所以实时导入数据湖的数据,也可以使用Flink SQL离线或实时进行查询。如下测试是使用Flink SQL完成实时同步的示意步骤,以Iceberg为例,其中有一部分开发和适配工作。

step1:新建Kafka映射表,用于实时接收Topic中的change log数据

CREATE TABLE KafkaTable (
  id STRING,
  name STRING
) WITH (
  'connector' = 'kafka',
  'xxx' = 'xxx'
);

step2:新建iceberg结果表,用于存储实时入湖的数据

CREATE TABLE IcebergTable (
  id STRING, 
name STRING 
)WITH (
  'connector'='iceberg',
  'xxx' = 'xxx'
);

step3:启动实时入湖的Flink SQL任务

INSERT INTO IcebergTable 
SELECT * FROM KafkaTable;

step4:离线或者实时查询统计IcebergTable表中的数据行数,在Flink的sql-client中执行

----/ *a.离线方式* /----
SET execution.type=batch;
SELECT COUNT(*) FROM IcebergTable;
----/ *b.实时方式* /----
SET execution.type=streaming;
SELECT COUNT(*) FROM IcebergTable;

2.2 数据入湖速度测试

数据入湖速度测试会根据环境配置、参数配置、数据格式等不同有所不同,下面是列出主要配置和测试出的数据作为参考。

  • 资源配置情况

TaskManager

内存4G,slot:1

Checkpoint

1分钟

测试数据列数

10列

测试数据行数

1000万

iceberg存储格式

parquet

  • 测试数据情况

数据入湖分为append和upsert两种方式。append方式只能新增数据,不能对结果数据进行更新操作;upsert方式即能够对结果数据更新。

append方式使用场景是导入数据之前已经明确该表数据不需要更新,如离线数据导入数据湖的场景,append方式下导入数据速度如下:

SQL

INSERT INTO IcebergTable SELECT * FROM KafkaTable;

并行度1

12.2万/秒

并行度2

19.6万/秒

并行度4

28.3万/秒

update方式使用场景是既有插入的数据又有对之前插入数据的更新的场景,如数据库实时同步,upsert方式下导入数据速度,该方式需要指定在更新时以那个字段查找,类似于update语句中的where条件,一般设置为表的主键即可,如下:

SQL

INSERT INTO IcebergTable /*+OPTIONS('equality-field-columns'='id')*/SELECT * FROM KafkaTable;

导入的数据

只有数据插入

只有数据更新

并行度1

3.2万/秒

2.9万/秒

并行度2

4.9万/秒

4.2万/秒

并行度4

6.1万/秒

5.1万/秒

  • 结论

(1)append方式导入速度远大于upsert导入数据速度。在使用的时候,如没有更新数据的场景时,则不需要upsert方式导入数据;

(2)导入速度随着并行度的增加而增加;

(3)upsert方式数据的插入和更新速度相差不大,主要得益于MOR原因;

三、实时数据入湖经验

3.1 实时性

数据湖的实时性是什么级别的呢?

如图所示,实时计算或者流计算处理的是时延要求比较高的场景,可以实现端到端秒级实时分析,但是查询端的能力欠缺,无法长时间存储历史数据。批处理的数仓能力丰富但是数据时延比较大,用户可以实现小时级别的数据注入 HDFS/OSS,并且不支持更新和删除操作。然而在秒级到小时级时的分钟级场景还存在大量的用例,通常称之为准实时或者近实时(NEAR-REAL-TIME),数据湖的出现,恰巧解决了准实时场景的用例。如下图所示实时、准实时、批量处理时延。

实时数据发送到数据湖采用的是mini-batch增量写入方案,实时数据周期内可见,一般根据业务需求和数据量的大小设置为分钟级别。该方案既能节省计算成本,又能提高数据时延,实时的数据写入可供实时OLAP分析场景等。

3.2 一致性

当程序BUG或者任务重启等导致数据传输中断,如何保证数据的一致性呢?

数据一致性保证通过两个方面实现

a)借助Flink实现的exactly once语义和故障恢复能力,实现数据严格一致性。在运行过程中,checkpoint周期内任务异常重启时,会从上一个checkpoint点恢复,重新消费数据写入下游的数据湖。

b)借助Hudi/Iceberg ACID能力来隔离写入对分析任务的不利影响。在checkpoint周期内写入的数据,下游数据湖对这部分未commit的数据是不可见的,从而隔离这部分未提交数据对分析任务的影响,周期性的commit也是数据湖分钟级延迟的主要原因。

3.3 顺序性

数据入湖否可保证全局顺序性插入和更新?

这个问题类似于Kafka是否可以保证全局顺序性,答案是否定的,也就是不可以全局保证数据生产和数据消费的顺序性,但是可以保证同一条数据的插入和更新的顺序性。首先数据抽取的时候是单线程的,然后分发到Kafka的各个partition中,此时同一个key的变更数据打入到同一个Kafka的分区里面,Flink读取的时候也能保证顺序性消费每个分区中的数据,进而保证同一个key的插入和更新的顺序性。

3.4 初始化

数据湖中的历史数据如何初始化,才能够和实时增量的数据无缝对接呢?

为了构建实时同步链路,首先需要通过各种方式,将历史数据从数仓或者源库等导入到数据湖中,离线批量同步数据这块就不再阐述啦。然后将实时增量数据对接到历史数据上,先使用同步工具把数据的变更写到Kafka消息队列,然后通过Flink消费Kafka的数据进行实时的分析计算,最后将结果数据实时的写到数据湖中,在数据湖中完成历史数据和实时数据的无缝对接。如何将历史数据和实时数据正好对接上呢?主要有以下几种情况。

a)数据有主键,也就是数据写入到下游能够保证幂等

首先实时同步工具把变更数据写入Kafka,Kafka默认保存7天数据。

然后开始同步历史数据,保证历史数据和Kafka中的数据有交集。

最后启动Flink任务实时写入数据湖,且从Kafka中指定消费时间要早于批量同步的数据,因为存在主键,数据库提供upsert的能力,对相同主键的数据进行更新覆盖。

b)数据有时间字段

可以通过业务属性中的时间戳区别离线数据和实时数据,比如导入离线数据为某日凌晨之前,实时数据则是该日凌晨之后的数据,这就能保证数据无缝对接。

c)如果不满足a、b两种情况

这种情况无法完全保证数据正好对接上,退化为数据是否允许少量的重复或者丢失,实际的业务中这种表也几乎不存在的。

四、未来规划

高新的技术最终是要落地才能发挥其内在价值的,针对行内纷繁复杂的数据,结合流计算Flink和数据湖技术,在未来的落地规划集中在两个方面,一是数据湖集成到本行的实时计算平台中,解决易用性的问题;二是构建行内准实时数仓,提供近实时的场景探索。

4.1 整合数据湖数据源

中原银行实时计算平台是一个基于SQL的高性能实时大数据处理平台,该平台彻底规避繁重的底层流计算处理逻辑、繁琐的提交过程等,为用户打造一个只需关注实时计算逻辑的平台,助力企业向实时化、智能化大数据转型。

实时计算平台未来将会整合Apache Hudi和Apache Iceberg数据源,用户可以在界面配置Flink SQL任务,该任务既可以以upsert方式实时解析change log并导入到数据湖中,也可以分析数据湖中的实时或历史数据。并增加小文件监控、定时任务压缩小文件、清理过期数据等功能。

4.2 准实时数仓探索

本文对数据实时入湖原理做了比较多的阐述,入湖后的数据有哪些场景的使用呢?下一个目标当然是入湖的数据分析实时化。比较多的讨论是关于准实时数据湖的探索,结合行内数据特点探索适合落地的实时数据分析场景成为当务之急。

随着数据量的持续增大,和业务对时效性的严苛要求,基于Apache Flink和Apache Hudi/Iceberg构建准实时数仓愈发重要和迫切,作为实时数仓的两大核心组件,可以缩短数据导入、方便数据行级变更、支持数据流式读取等。并且有助于流批一体相关的探索,最终达到数据同源,存储一体,流批计算一致等终极诉求。

END

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2021-09-09,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 数据社 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
相关产品与服务
大数据
全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档