快手的传统离线链路和很多公司是一致的,基于 Hive做离线分层数仓的建设。在入仓环节和层与层之间是基于 Spark 或者 Hive做清洗加工和计算。这个链路有以下四个痛点:
更新成本高:Hive 表最细的更新粒度是分区级,需要先扫出分区的全量数据,关联这次更新的增量数据得到这次的全量数据并覆盖原来的分区。这个过程导致计算开销比较大,且降低时效性;
缺少索引:不仅影响更新,也影响读取。因为查询大部分会以扫描为主,由此会导致查询效率低;
缺少事务:多个写入任务之间,写入任务和读取任务之间缺少事务机制,需要读写锁来避免数据的不一致;
启动调度晚:目前离线任务调度最细粒度是小时级别,会影响下游各层的数据可见性;
针对传统离线链路的缺点,我们决定引入数据湖来解决上述的痛点。快手是在21年开始探索数据湖方向,我们进行了技术选型,考虑到HUDI 对更新能力的支持,以及活跃的社区生态,由此便选择了HUDI。HUDI 具备如下几个特点:
写入:由于 HUDI提供多种内置的索引,基于这些索引可以提供高效的更新能力;写入支持流式入湖,也支持离线入湖;支持多种的写入操作,比如插入、更新、删除、覆盖;支持多种输入源,比如更新流,日志流。
查询:支持多种的查询方式,比如读优化查询、快照查询和增量查询;提供时间旅行的特点解锁查询历史版本的能力;社区做了很多优化提高查询的效率。
并发控制:HUDI 引入 MVCC 来控制写入任务和查询任务之间的并发。引入 OCC 来控制多个写入任务之间的并发。同时社区也有一些关于无锁的并发控制。
丰富的表服务:Compaction、Clustering、Clean 等。
开放性:适配多种计算引擎和查询引擎,比如 Spark,Flink,Presto,Trino,Starrokcs,Doris 等
Schema Evolution:提供Schema 演进的能力。
下面通过快手在数据湖上的几个典型业务场景介绍如何用 HUDI重塑离线链路产生。分为三个方向:数据同步、数据更新、宽表拼接。每个方向都会介绍两类最有代表性的场景。
首先是数据同步里日志流入湖。快手内部的数据同步工具有一个限制:只支持日期和小时两级分区。所以一个日志流从 Kafka 到入仓整个链路需要多个离线任务加工,这就导致了链路长,重复计算和冗余存储的问题。
基于 HUDI 改进后的方案,整个链路得到极大的简化。直接用 Flink 任务做日志流数据入湖。最后一层将 HUDI 表落到 DWD 层数据主要是做兼容性,这样下游业务依然可以访问原来的 Hive 表,同时获得时效性的提升,在资源持平情况下,时效性从之前1h40min缩减到40min,也降低了了链路的复杂度。
第二个场景是更新场景入湖。历史上 Mysql to Hive的方案有两个链路,一个全量初始化任务,一个是增量同步任务。初始化任务把全量数据落到一个HIVE 全量快照表,完成后启动增量同步任务把增量binlog 数据落到一个 HIVE增量表,每天合并前一天的全量和今天的增量生成一个新的全量快照表。
Mysql to Hive 方案的痛点是时效低。时效低有两方面原因:第一个是离线任务调度周期是T+1级别,第二个是任务调度以后才做全量和增量的合并。
改造后的Mysql to HUDI,链路得到了简化,直接把 CDC 更新数据落到一个 HUDI 表里,这个 HUDI 表是没有日期或者小时分区的。内部的 MySQL to HUDI 和其他公司的 CDC 更新流入湖比较起来有一些差异化的需求,因此我们在设计上也是有所不同。
避免在全量同步完成后再启动增量同步任务:因为采用传统的串行调度,如果全量同步任务执行很久才结束,增量同步启动后可能发现最开始的一些 Kafka 数据已经被清理了,导致数据丢失。因此,支持全量初始化任务和增量同步任务的并行,不需要等全量初始化任务完成后再去调度增量同步任务。
按照事件时间来查询某个版本:HUDI 的版本是一个 processing time 的语义,但是用户需要能按照 event time 语义来访问某个 HUDI 版本。为了支持按照事件时间方案,在元数据里维护 Processing time 到 Event time 的映射关系。收到按照事件时间的快照查询请求,先做一下映射得到 processing time,再基于time travel能力查询对应的版本。
数据就绪后尽快发布对应版本:如果完全依赖周期性的 checkpoint 来做分区发布会导致数据就绪后不能立刻发布对应的版本。这里修改了 Flink 引擎的逻辑,除了周期性的 checkpoint 以外,又增加一种非周期性的checkpoint 用于监听到整点数据就绪以后立刻发布分区。
兼容当前 HIVE 表的使用方式:1. Mysql to HUDI 链路里的HUDI 表是没有日期分区,如何能按照日期分区查询。2.长生命周期管理,用户可能需要访问很久以前的数据。为了支持这两个需求,Mysql to HUDI 的链路会输出两个表,一个是无时间分区的 HUDI 表,一个是HIVE 表。在发布分区时,会在HIVE 表里添加一个新分区,这个时候分区 location下是没有数据,分区元数据里维护了它对应哪个 HUDI 表的哪个版本。无时间分区的HUDI 表是没有办法直接做长生命周期的,所以定期把HUDI 数据同步到Hive 表中去。归档后的 HIVE 表分区就是一个普通的 HIVE 分区,它的 location 下有对应的分区数据。因此,这个HIVE 表是一个异构的HIVE 表。异构性体现在两个方面,第一个元数据是异构的,第二个是数据是异构的。这个异构设计对用户是透明的。当用户查询HIVE分区的时候,引擎通过 Hive 元数据判断这个日期是否被归档,如果还没有被归档,会通过分区元数据里的HUDI 表和版本把请求路有到HUDI 表上。如果是归档后的分区,直接走正常的HIVE查询流程把分区数据返回给用户。
Mysql to HUDI的整个链路如上图。分为左右两部分。左边是必选的,做CDC 入湖;右边是可选的,为了支持兼容HIVE 的需求。
数据更新的第一个业务场景是人群包圈选。每次活动DAU 是一个非常重要的指标,人群圈选业务是根据用户的历史行为来圈选出一些潜在的目标用户。历史方案是基于天级离线数据和小时级离线数据组合计算生成。这种方式存在的最大痛点就是时效性问题,某些场景下的小时级产出的数据延迟在3-4 小时左右,对于除夕活动来说,这种延迟是不能忍受的。基于 HUDI 改造后的链路是用一个实时的 Flink 任务,在入湖过程中完成更新。这使得整条链得到简化,不仅时效性从3h ~ 4h左右缩短到15min左右,而且资源也有节约。
第二个业务场景是基于HUDI 自定义的payload能力的N天留存标签更新。历史的留存链路加工流程需要大规模Join 并且需要与行为数据进行整合,并且需要大规模数据回刷。具体过程是用当天的日活数据和历史N天的日活数据算出当天日活用户在过去 180 天的留存标签,存一个中间表。然后分别用过去N天的行为数据关联这个中间表得到最新的标签覆盖回对应的分区。这个方案的缺点是时效低,重复计算和重复存储。
基于HUDI 改造后的链路从刚才的多层关联升级为单表生产,时效性也是有了很大的提升,从2.5h缩短到1.5h。资源开销也是有收益的。这里最重要的就是基于 HUDI 的 MOR 表能力和自定义payload 的特点。写入流程非常轻量,将当天的日活数据产生的增量数据写到历史N 天的分区里。合并流程做在分区内部做局部关联只更新对应的留存标签。
第三个方向是宽表拼接,也介绍两个典型的业务场景,一个是离线宽表模型,一个是准实时的多流拼接。
宽表模型是指把业务主题相关的指标、维度、属性关联在一起的一张大宽表。宽表模型因为结构简单,模型可复用度高,数据访问效率等优势,广泛地使用在 BI 和 AI 场景。
基于 HUDI 的宽表拼接之前有很多公司也有分享,我们内部的宽表拼接有一些差异化的需求。
支持多个写入任务并行:允许多个写入任务并行加工一张宽表,每个写入任务加工这个宽表中的部分列。
支持 Schema Evolution:在业务演进过程中可能随时需要有更多的列加进来。用户希望在创建表的时候,只需要定义必要的列,比如主键列、分区列、排序列。后续可以很灵活地添加新的列。
支持 Implicit Schema Evolution:显式的 Schema Evolution 是指通过类似于 Alter table add column 这种DDL 语句来修改表。Implicit Schema Evolution,是指在写入任务的 Schema里包含了表里不存在的列,会在写入任务提交时追加到这个表的最后。
支持 Partial Insert:写入任务不需要指定表里的所有列,允许只插入表里的部分。
支持不同分区设置不同的桶个数:有一些业务分区存在非常大的数据量差异,所以需要能支持不同子分区设置不同的桶个数。
支持快照隔离:读取任务和写入任务之间支持快照隔离,上游加工好部分列以后,下游就可以先读这些加工好的部分列。
上图是一个简单的宽表拼接的例子。两个写入任务加工一个宽表,第一个写入任务加工 id, ts 和name。第二个写入任务加工 id, ts 和 price。每个写入任务只需要写入部分列,这个是 partial insert 的能力。最后合并流程做拼接。另外,这个图也可以说明 schema evolution。建表时,只定义了主键、排序键和分区键。第一个写入任务提交的时候追加了name 列,第二个写入任务提交的时候追加 price 列。
写入阶段分为两个阶段,第一个阶段写入数据,第二个阶段提交数据。第一个阶段是无锁方案的设计,第二个阶段是有锁的设计。第一个阶段,写入任务是在加工同一个文件组的同一个数据版本下不同的增量文件来避免多个任务把一个文件写花。在提交阶段引入一种特殊的冲突检查机制,允许在不同分区或者是相同分区的不同列上的并发写入,另外这个阶段按需更新 schema,发现有新增的列需要更新schema 。
这个方案也可以用在实时宽表拼接场景,这里因为时间关系,不再做赘述。最后说一下在目前的宽表拼接实现里有一个限制,即写入任务正在进行时不可以生成合并计划,可能存在丢数据的风险。在用户角度这个限制有三点影响:第一个是离线宽表拼接场景需要依赖任务以来关系来避免写入任务和 schedule compaction 的并行。第二个是对实时宽表拼接场景,只能在同一个 Flink 作业的多个 pipeline 里共同加工一个宽表,不能多个 Flink 作业同时加工一个宽表。第三个是不能满足实时和离线任务共同加工一张宽表的需求。
近期的工作有四点:
(1)Schedule Compaction 和 Writer 的并发。
(2)可扩展的 Bucket index,实现根据数据量自动适配 bucket number 个数。
(3)加速写入流程:这里涉及到多个优化点,一个是优化写入链路,一个是减少序列化和反序列化开销
(4)服务化建设。包括 MetaStore Service 和 Table Service。
中长期的工作围绕两个方向,第一个是建设实时数据湖。对于实时数据湖也会有很多挑战,需要把它补充齐才可以把实时化做起来,这块会引入流计算领域领域通用的概念,比如事件时间和watermark。第二个是基于HUDI的分析查询场景。我们会参与到社区的建设中,通过构建物化视图减少重复计算加速查询,后续也会引入缓存加速分析查询的场景。这两个方向都有很多地方需要探索和完善。
推荐阅读
图加速数据湖分析-GeaFlow和Apache Hudi集成