分享嘉宾|陈世治 哔哩哔哩 资深开发工程师
编辑整理|王超
内容校对|李瑶
出品社区|DataFun
01
背景与挑战
上图展示了当前B站实时数仓的一个简略架构,大致可以分为采集传输层、数据处理层,以及最终的AI和BI应用层。为保证稳定性,数据处理层是由以实时为主,以离线兜底的两条链路组成,即我们熟知的批流双链路。
在实践落地的过程中,上述上架构存在以下问题:
为了解决上述困境,我们引入了数据湖的构建。如上图,是我们构建数据湖的能力愿景,也是落地的实践路径。
首先,支持高效的数据流转,比如实时数据入湖,流量日志动态分流,以及数据模型层的湖上流式构建能力,如Join、维表等。
其次,批流融合能力,在底层内核、架构、平台工具等打通批流一体生产,支持流-批调度,多任务并发更新等。
第三,统一的数据管理,包括统一元数据服务、强大的数据湖自治,表服务自适应管理,湖上视图管理等。
最后,便捷的分析查询,各种湖查询加速能力建设,包括Clustering加速、索引加速、预计算物化加速,Alluxio缓存加速等。
02
典型场景案例
下面会针对四个典型案例进行展开:RDB一键入湖、流量日志分流、物化查询加速,以及实时数仓演进。
1. RDB一键入湖
为了提升数据集成的时效性,我们将原基于Datax+Hive的方案,成功改造为CDC+Hudi的方案。上图是一个简略的CDC+Hudi同步方案示意。RDB先通过Flink CDC Job同步至内部Kafka缓冲供下游订阅,然后1个逻辑表会对应1个Hudi Sync任务,再同步至Hudi表。
在具体落地过程中,我们解决了乱序、Schema Evolution、数据断流推进等问题,本文在此不做展开,将重点讨论批流融合的痛点。
以往基于批同步后,业务方将获得一个全量或者增量数据分片,即数仓里的一个分区。如上图右侧示例,SQL只需写log_date进行过滤就可指定对应分片。
升级至实时入湖方案之后,在切换过程中会有以下两个痛点:
一是分片的时间界限模糊导致切换有感,需用户主动过滤漂移数据,比如基于event time,且SQL上的过滤只能下推至Merge后数据,对CDC Merge前的变更流不生效;
二是由于数据实时变更,历史分区会随时被Upsert,流转批后的离线ETL任务无法获得稳定重跑链路。
对以上问题业界有些潜在方案,一种是通过脚本,从Hudi表导出到Hive表来实现快照,但会导致使用割裂和架构冗余;另一种是基于Savepoint的方案,在Commit时会触发Savepoint,但并未解决漂移问题。
我们的优化方案是基于Hudi Snapshot View快照视图,并支持在多种引擎上的适配。
如上图所示意,基于Hudi支持了带过滤谓词下推的分区快照视图,以实现具备准确切分的逻辑分区。视图实时创建之后,会rewrite生成一个Predicated File Slice,将Expression下推至Log Merge之前,对漂移数据准确过滤。
数据文件是基于Hudi Meta进行映射的,没有冗余的存储。快照视图上也支持独立的Compaction/Clustering/Clean等表服务,对视图物化、加速或过期等。
在该方案里,一张表里同时存在实时分区、增量快照分区以及全量快照分区,该如何进行管理?
如前文所述,快照视图也会有表服务,所以直接新增一个Action,无法满足需求。我们引入了一种新的视图管理手段,该方案是基于独立的Timeline来实现。如上图右下方所示,新增的Snapshot Timeline实现了类似于Git的能力,比如,支持branch或者tag的创建或者删除,快照切换等。
在分区视图场景中,通过轻量的checkout操作,就能够实现实时、全量以及增量分区的便捷切换,视图的Compaction/Clustering/Clean等表服务,也在各自Timeline上独立管理。
在写入和查询阶段,如何对引擎进行适配?
写入侧,比较重要的是Snapshot View的生成时机。我们基于分区提交来确认数据到位,同时触发快照生成。而分区提交的时机,则是基于Watermark的分区推进机制,这块在下文内核优化部分再展开介绍。
查询侧,目前已支持Flink Batch 、Spark和Hive引擎对快照视图查询,用户在原有SQL基础上,只需加上hint声明查询模式是增量或全量的,即可访问对应的分区视图。具体实现,是在Hudi里面新增了SnapshotMetaClient,用来指向TableMetaClient,基于前面实现的Timeline管理机制,动态指向到对应分区视图。
最终的收益主要是降本增效。降本方面,相当于一张Hudi表里,每个分区只存有增量的数据,但同时实现一个全量分区、增量分区以及实时分区,大幅降低了存储成本。增效方面,数据时效提升到分钟级,且hint或option的机制,使用户基本没有切换成本。
2. 流量日志分流
流量日志分流是一个常见业务场景。我们公司内部有日千亿级的埋点日志,包含1w+事件分类,产出供全站各个BU使用。如上图所示,之前已有基于Flink+Hive的生成链路,其主要痛点如下:
我们的解决方案,如上图所示,主要包含三个方面:
如上图,是新老方案的查询性能对比,在大部分场景下,新方案表现更优,整体方案达到预期要求,同时也简化了整个流量日志的分流架构。
3. 物化查询加速
通常,在数据生产的末端进行查询时,面临如下痛点:
针对上述痛点,我们通过Flink物化视图支持与Hudi增量计算,实现了指标预计算。
如上图,用户可以通过hint标记子查询或主动创建物化视图,在后台构建起托管的指标物化任务。它增量消费Hudi源表,将物化结果写入Hudi Upsert表。查询时,如果被Flink BatchPlanner命中,将直接查询物化表,提升了查询时效性。此外,该方案对稳定性也有极大提升,若物化任务异常,可降级到源表查询。
如上图,此处简要介绍下实现:
首先,对Flink支持了物化视图,并在BatchPlanner里,新增了物化解析规则和管理。
其次,Hudi表TableMeta新增物化路由的索引,并在写入端,支持commit时记录watermark在InstantMeta中,作为进度暴露给查询端。
在Hudi支持Flink Batch在OLAP场景中的查询响应上,我们也做了很多优化。比如组件缓存,通过metaclient、文件索引等复用,减少了元数据加载耗时。通过线程池并行加载、文件索引异步预加载、list合并、本地性优化等手段,实现了对Split的生成加速。基于文件索引,可对查询的并行度动态推算等。
对源表,我们已支持了Clustering和索引加速。对物化Upsert表,也支持了对历史数据Clustering。此外,基于Alluxio,可同时对物化表和源表进行缓存加速。
4. 实时数仓演进
下面介绍下实时数仓演进,上图是开篇提到的实时数仓架构。前文介绍的3个典型案例方案,重点关注了其中的RDB实时入湖、流量日志动态分流、指标查询加速等痛点,我们也尝试寻找其他痛点场景的解决方案。
如上图,融合了新方案后的架构,是个混合增量数仓,在探索的新场景主要包括三个方面:
03
基建与内核
接下来介绍在批流融合方面对基建和内核的优化,包括:TableService的优化、分区推进支持,以及数据回滚的增强。
1. TableService优化
如上图,是一个含内嵌表服务的Hudi写入作业,该架构有以下痛点:
我们的优化方案是通过表服务的外挂模式,加上Hudi Manager进行托管,上图是整体架构:
首先,对所有表服务都支持调度-执行剥离,并且支持动态规则刷新;
其次,引入Hudi Manager作为中控平台,支持Hudi表生命周期管理、表服务的策略化运营、资源与任务调度等。
目前支持了社区的所有表服务,以及自研的物化表服务。
2. 分区推进支持
当前社区的Hive Sync,主要聚焦于分区同步,而非分区推进。而在批流融合过程中,尤其是流转批时,下游调度通知尤为重要。此外,分区推进问题,也关系到如何在同一张表中,协同好用户实时分析和调度ETL两种场景。
我们的方案是基于Watermark的分区推进机制。
首先,分区推进会被分成两步提交,第一步是arrival commit,在数据第一次写入该分区时commit,第二步是ready commit,当watermark到达了预设值之后,再次进行commit。
其次,对Hive MetaStore拓展,在Partiiton中新增commit属性,arrival和ready两次commit分别对应false和true,以此标记分区是否完整提交。
为避免因任务重启等问题导致错乱,分区推进状态会以PartitionState形式存在Flink State中。每次提交的分区,将根据write status、watermark和状态来生成,确保其一致性。
下面介绍下查询端对分区推进机制的适配。
若在调度ETL场景,默认情况下,用户可查到所有ready commit分区下面的数据文件。
若用户需要实时分析,只要指定include commit=true,就可查到所有当前arrival commit的分区,即所有数据。
两者在SQL层面没有任何区别,只需要设置参数或者hint即可。
3. 数据回滚增强
回滚能力对于数据湖的生产落地保障非常重要,可以大致分成两部分,一个是业务数据回滚,另一个是元数据异常运维。
在业务数据回滚方面,以前基于Flink流式写入,都会采用Spark批量修复,流批SQL的不统一,无法做到真正的批流融合。另外,基于Kafka的实时链路,基本上不具备修复能力。
基于Hudi+Flink的方案后,我们做了以下的工作:
Hudi元数据的修复,可能会由多种原因引起。比如,因为一些未知问题,导致了从某时刻开始出现元数据状态跟数据文件不一致。又或者,若HDFS出现了坏块,导致了archived timeline、某些instant、某些数据文件损坏,若不运维就会扩大影响至整张Hudi表不可用。
我们的方案,以Instant Rollback为主要手段,以Savepoint Rollback作为兜底,通过Spark Procedure来接入。
为什么是需要两种rollback来结合呢?如上图,列举两种case来说明。
如果archived timeline文件损坏,只单纯地rollback到某个instant将无法修复,因为查询archived timeline仍不可避免,此时可rollback至最近可用的savepoint,再重新恢复写入作业。
如果仅某个instant损坏,则rollback到最近可用的instant即可。若可用instant都已被archived,则rollback至最近可用的savepoint。然后,再重新恢复写入作业。
对于savepoint,将作为一个托管的表服务,基于前文提到Hudi Manager周期性生成和过期,以确保一直存在可用版本。
04
未来工作展望
最后,我简略介绍一下对未来工作的展望。
推荐阅读
字节跳动基于 Apache Hudi 的湖仓一体方案及应用实践
Apache Hudi Timeline:支持 ACID 事务的基础