广告主和代理商通过广告投放平台来进行广告投放,由多个媒介进行广告展示 ,从而触达到潜在用户。整个过程中会产生各种各样的数据,比如展现数据、点击数据。其中非常重要的数据是计费数据,以计费日志为依据向上可统计如行业维度、客户维度的消耗数据,分析不同维度的计费数据有助于业务及时进行商业决策,但目前部门内消耗统计以离线为主,这种T+1延迟的结果已经无法满足商业分析同学的日常分析需求,所以我们的目标为:建设口径统一的实时消耗数据,结合BI工具的自动化配置和展现能力,满足业务实时多维消耗分析,提高数据运营的效率和数据准确性。
由于部门内在过去一段时间主要以离线数据分析为主,所以经过持续对数据仓库治理,现有离线数据平台能力为:
基于此,初步规范的方案为:不改变原有离线数据架构,在现有离线数据架构链路上再增加实时计算链路,该方案所形成的架构为Lambda架构,是目前业界比较主流的整体数仓架构方案。
Lambda架构分为三层:离线处理层,实时处理层,对外服务层,对应图中的左下、左上和中间部分:
Lambda架构存在如计算稳定、数据易于订正等优点,但也存在很明显的缺点:
基于以上缺点,进一步探索其他可行性方案。
对Lambda架构缺陷进一步分析:
基于Flink+数据湖实现计算统一以及存储统一的架构设计方案如下:
该方案存在如下优点:
最终选用批流一体架构实现实时消耗统计项目。
数据湖概念
数据湖特点
数据湖架构
Apache Hudi vs Iceberg
由于Delta Lake更多的功能在其商业版本(如SQL模式下ALTER 变更等操作),所以这里重点比对Hudi(Hadoop Upserts Delete Incrementals)和Iceberg:
场景 | 详情 | Iceberg | Hudi |
---|---|---|---|
(日志流水)Append | 离线数仓 | 支持 | 支持 |
(实时数仓)Upsert | - 近实时数仓 - 大任务批量化/增量化 | 只支持Binlog CDC 增量读时合并会丢数据 | 支持Binlog CDC 支持增量流读 |
分钟入库 | Flink job | 支持 | 支持 |
分钟级查询 | Presto OLAP查询 | 支持 | 支持 |
端到端分钟级 | 从数据采集到dal层(数据访问层) | 支持,近10分钟 | 支持,近分钟级 |
compaction影响 | 小文件合并 | 数平提供了DLA服务需要额外的资源 | 默认inline compaction 也支持单独部署 |
clean | 数据清理 | 数平提供了DLA服务需要额外的资源 | 默认inline clean(三种清理策略)也支持单独部署 |
copy on write | 分析场景 | 支持 | 支持 |
merge on read | 低延迟 | 支持 | 支持 |
离线数仓兼容 | 现有离线数仓平滑迁移 | 需要改造thive做适配 | 需要改造thive做适配 |
Presto查询(OLAP) | 分析 | 无索引 | 有索引,性能更优 |
Spark分析计算 | 离线数仓 | 无索引 | 有索引,性能更优 |
分析当前业务需求希望实时技术具备的能力
综合以上对比,结合当前业务所希望具备的数据能力,Hudi支持upsert、streaming read(增量流读)等功能和特性更适合实现批流一体的能力。
Hudi将一个表映射为如下文件结构
Hudi存储分为两个部分:
Hudi维护着一条对Hudi数据集所有操作的不同 Instant组成的 Timeline(时间轴),通过时间轴,用户可以轻易的进行增量查询或基于某个历史时间点的查询。如下为实践过程中产生的Timeline:
一个Instant的组成如下
如下为具体一个basePath/.hoodie/xxx.deltacommit 文件内容:
文件包含如下重要元素
数据文件/基础文件
Hudi将数据以列存格式(Parquet)存放,称为数据文件/基础文件。
增量日志文件
在 MOR 表格式中,更新被写入到增量日志文件中,该文件以 avro 格式存储。这些增量日志文件始终与基本文件相关联。假设有一个名为 data_file_1 的数据文件,对 data_file_1 中记录的任何更新都将写入到新的增量日志文件。在服务读取查询时,Hudi 将实时合并基础文件及其相应的增量日志文件中的记录。
文件组(FileGroup)
通常根据存储的数据量,可能会有很多数据文件。每个数据文件及其对应的增量日志文件形成一个文件组。在 COW表中,只有基本文件。
文件版本
比如COW表每当数据文件发生更新时,将创建数据文件的较新版本,其中包含来自较旧数据文件和较新传入记录的合并记录。
文件切片(FileSlice)
对于每个文件组,可能有不同的文件版本。因此文件切片由特定版本的数据文件及其增量日志文件组成。对于 COW表,最新的文件切片是指所有文件组的最新数据/基础文件。对于 MOR表,最新文件切片是指所有文件组的最新数据/基础文件及其关联的增量日志文件。
在每个分区内,文件被组织为文件组,由文件ID充当唯一标识。每个文件组包含多个文件切片,其中每个切片包含在某个即时时间的提交/压缩生成的基本列文件(.parquet)以及一组日志文件(.log),该文件包含自生成基本文件依赖对基本文件的插入/更新;数据构成关系:table -> partition -> FileGroup -> FileSlice -> parquet + log
Hudi支持两种表类型:Copy On Write(COW) & Merge On Read(MOR)。
COW表:在数据写入的时候,通过复制旧文件数据并且与新写入的数据进行合并,对 Hudi 的每一个新批次写入都将创建相应数据文件的新版本。
data_file1 和 data_file2 都将创建更新的版本,data file 1 V2 是数据文件 data file 1 V1 的内容与数据文件data file 1 中传入批次匹配记录的记录合并。由于在写入期间进行合并,COW 会产生一些写入延迟。但是COW 的优势在于它的简单性,不需要其他表服务(如压缩)
MOR表:对于具有要更新记录的现有数据文件,Hudi 创建增量日志文件记录更新数据。此在写入期间不会合并或创建较新的数据文件版本;在进行数据读取的时候,将本批次读取到的数据进行Merge。Hudi 使用压缩机制来将数据文件和日志文件合并在一起并创建更新版本的数据文件。
COW vs MOR
COW | MOR | 说明 |
---|---|---|
更新代价 | 高 | 低 |
读取延迟 | 低 | 一般 |
写放大问题 | 高 | 低 |
总结:COW适用于读多写少的场景;MOR适用于写多读少的场景;在本项目实践中,采用的是MOR表,在创建Hudi表中进行指定:'table.type' = 'MERGE_ON_READ'。
重点分析Hudi与Flink集成时流式数据写入过程:
分为三个模块:数据写入、数据压缩与数据清理。
1.数据写入分析
2.数据压缩
压缩( compaction)用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。compaction首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件进行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance:
具体策略分为4种,具体见官网说明:
compaction.trigger.strategy:
Strategy to trigger compaction, options are
1.'num_commits': trigger compaction when reach N delta commits;
2.'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;
3.'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;
4.'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits'
Default Value: num_commits (Optional)
在项目实践中需要注意参数'read.streaming.skip_compaction' 参数的配置,其表示在流式读取该表是否跳过压缩后的数据,若该表用于后续聚合操作表的输入表,则需要配置值为true,表示聚合操作表不再消费读取压缩数据。若不配置或配置为false,则该表中的数据在未被压缩之前被聚合操作表读取了一次,在压缩后数据又被读取一次,会导致聚合表的sum、count等算子结果出现双倍情况。
3.数据清理
随着用户向表中写入更多数据,对于每次更新,Hudi会生成一个新版本的数据文件用于保存更新后的记录(COPY_ON_WRITE) 或将这些增量更新写入日志文件以避免重写更新版本的数据文件 (MERGE_ON_READ)。在这种情况下,根据更新频率,文件版本数可能会无限增长,但如果不需要保留无限的历史记录,则必须有一个流程(服务)来回收旧版本的数据,这就是 Hudi 的清理服务。具体清理策略可参考官网,一般使用的清理策略为:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 个文件版本而不受时间限制的效果。会删除N之外的FileSlice。
4.Job图
如下为生产环境中flink Job图,可以看到各task和上述分析过程对应,需要注意的是可以调整并行度来提升写入速度。
Hudi支持如下三种查询类型:
Snapshot Queries:可以查询最新COMMIT的快照数据。针对Merge On Read类型的表,查询时需要在线合并列存中的Base数据和日志中的实时数据;针对Copy On Write表,可以查询最新版本的Parquet数据。Copy On Write和Merge On Read表支持该类型的查询。
Incremental Queries:支持增量查询的能力,可以查询给定COMMIT之后的最新数据。Copy On Write和Merge On Read表支持该类型的查询。
Read Optimized Queries:只能查询到给定COMMIT之前所限定范围的最新数据。Read Optimized Queries是对Merge On Read表类型快照查询的优化,仅限于 MergeOnRead 表,可以查询到列存文件的数据,其原理是通过牺牲查询数据的时效性,来减少在线合并日志数据产生的查询延迟。
如下为Hudi数据流式读取Job图:
其过程为:
简化的数据流图如下,若大家和该数据流类似,那么在开发过程中会遇到并发导致的数据一致性问题、读任务在无数据时操作类型封装不正确问题 数据流图
问题&原因分析
问题出现在第4步:flink会启动split_monitor任务,每隔N秒(可配置)监听TimeLine上变化;同时会开启split_reader task任务,split_reader task会定位到具体文件中进行数据读取,而sink算子同样会集成在同一个split_reader task任务中(flink oprator chain原理,可节省数据传输带来的序列化反序列化和网络传输开销)。split_monitor对split_reader task采取的是Rebanlance分发策略,若同一个key在并发下,提交到不同Instance中,则split_monitor可能将包含同一个key的两次Instance分发到不同split_reader task任务中,当读取到数据向外部存储sink时,由于网络速度等因素,先处理的split_reader task任务对应的结果可能会后sink,导致外部存储结果的错误,即之前更新结果覆盖了最新的更新结果。具体分析流程如下:
方案一:将split_reader并行度指定为1,此时只有一个task处理log数据文件,保证处理顺序性,具体改动是在定义Hudi-DWS表的时候指定参数'read.tasks' = '1',但该方案会影响sink处理速度;
方案二:修改源码:在分发log文件时候,按照fileId值进行keyBy,保证同一file group下数据文件都给一个Task进行处理,从而保证数据处理的有序性。
最终采用方案二,并向Hudi社区提交PR,大家需要拉取最新master代码使用,具体PR见链接:https://github.com/apache/hudi/pull/5516 修改完毕后,可以看到JOB图中数据分发变成了Hash:
由于Hudi ods表作为dwd表的输入,dwd表作为dws表的输入,dws表作为sink到外部存储的输入,所以在创建表时,需要指定流式读取,增量消费数据:
'read.streaming.enabled' = 'true',
'read.streaming.check-interval' = '2' ,
'read.start-commit' = '20210316134557' ,
'changelog.enabled' = 'true',
'read.streaming.skip_compaction' = 'true'