前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
社区首页 >专栏 >Delta实践 | Delta Lake在Soul的应用实践

Delta实践 | Delta Lake在Soul的应用实践

作者头像
大数据技术架构
发布于 2021-03-05 02:42:56
发布于 2021-03-05 02:42:56
1.5K0
举报

作者:张宏博,Soul大数据工程师

一、背景介绍

(一)业务场景 传统离线数仓模式下,日志入库前首要阶段便是ETL,Soul的埋点日志数据量庞大且需动态分区入库,在按day分区的基础上,每天的动态分区1200+,分区数据量大小不均,数万条到数十亿条不等。下图为我们之前的ETL过程,埋点日志输入Kafka,由Flume采集到HDFS,再经由天级Spark ETL任务,落表入Hive。任务凌晨开始运行,数据处理阶段约1h,Load阶段1h+,整体执行时间为2-3h。

(二)存在的问题 在上面的架构下,我们面临如下问题: 1.天级ETL任务耗时久,影响下游依赖的产出时间。 2.凌晨占用资源庞大,任务高峰期抢占大量集群资源。 3.ETL任务稳定性不佳且出错需凌晨解决、影响范围大。

二、为什么选择Delta?

为了解决天级ETL逐渐尖锐的问题,减少资源成本、提前数据产出,我们决定将T+1级ETL任务转换成T+0实时日志入库,在保证数据一致的前提下,做到数据落地即可用。 之前我们也实现了Lambda架构下离线、实时分别维护一份数据,但在实际使用中仍存在一些棘手问题,比如:无法保证事务性,小文件过多带来的集群压力及查询性能等问题,最终没能达到理想化使用。

所以这次我们选择了近来逐渐进入大家视野的数据湖架构,数据湖的概念在此我就不过多赘述了,我理解它就是一种将元数据视为大数据的Table Format。目前主流的数据湖分别有Delta Lake(分为开源版和商业版)、Hudi、Iceberg,三者都支持了ACID语义、Upsert、Schema动态变更、Time Travel等功能,其他方面我们做些简单的总结对比: 开源版Delta 优势: 1.支持作为source流式读 2.Spark3.0支持sql操作 劣势: 1.引擎强绑定Spark 2.手动Compaction 3.Join式Merge,成本高 Hudi 优势: 1.基于主键的快速Upsert/Delete 2.Copy on Write / Merge on Read 两种merge方式,分别适配读写场景优化 3.自动Compaction 劣势: 1.写入绑定Spark/DeltaStreamer 2.API较为复杂 Iceberg 优势: 1.可插拔引擎 劣势: 1.调研时还在发展阶段,部分功能尚未完善 2.Join式Merge,成本高

调研时期,阿里云的同学提供了EMR版本的Delta,在开源版本的基础上进行了功能和性能上的优化,诸如:SparkSQL/Spark Streaming SQL的集成,自动同步Delta元数据信息到HiveMetaStore(MetaSync功能),自动Compaction,适配Tez、Hive、Presto等更多查询引擎,优化查询性能(Zorder/DataSkipping/Merge性能)等等

三、实践过程

测试阶段,我们反馈了多个EMR Delta的bug,比如:Delta表无法自动创建Hive映射表,Tez引擎无法正常读取Delta类型的Hive表,Presto和Tez读取Delta表数据不一致,均得到了阿里云同学的快速支持并一一解决。 引入Delta后,我们实时日志入库架构如下所示:

数据由各端埋点上报至Kafka,通过Spark任务分钟级以Delta的形式写入HDFS,然后在Hive中自动化创建Delta表的映射表,即可通过Hive MR、Tez、Presto等查询引擎直接进行数据查询及分析。

我们基于Spark,封装了通用化ETL工具,实现了配置化接入,用户无需写代码即可实现源数据到Hive的整体流程接入。并且,为了更加适配业务场景,我们在封装层实现了多种实用功能: 1. 实现了类似Iceberg的hidden partition功能,用户可选择某些列做适当变化形成一个新的列,此列可作为分区列,也可作为新增列,使用SparkSql操作。如:有日期列date,那么可以通过 'substr(date,1,4) as year' 生成新列,并可以作为分区。 2. 为避免脏数据导致分区出错,实现了对动态分区的正则检测功能,比如:Hive中不支持中文分区,用户可以对动态分区加上'\w+'的正则检测,分区字段不符合的脏数据则会被过滤。 3. 实现自定义事件时间字段功能,用户可选数据中的任意时间字段作为事件时间落入对应分区,避免数据漂移问题。 4. 嵌套Json自定义层数解析,我们的日志数据大都为Json格式,其中难免有很多嵌套Json,此功能支持用户选择对嵌套Json的解析层数,嵌套字段也会被以单列的形式落入表中。 5. 实现SQL化自定义配置动态分区的功能,解决埋点数据倾斜导致的实时任务性能问题,优化资源使用,此场景后面会详细介绍。

平台化建设:我们已经把日志接入Hive的整体流程嵌入了Soul的数据平台中,用户可通过此平台申请日志接入,由审批人员审批后进行相应参数配置,即可将日志实时接入Hive表中,简单易用,降低操作成本。

为了解决小文件过多的问题,EMR Delta实现了Optimize/Vacuum语法,可以定期对Delta表执行Optimize语法进行小文件的合并,执行Vacuum语法对过期文件进行清理,使HDFS上的文件保持合适的大小及数量。值得一提的是,EMR Delta目前也实现了一些auto-compaction的策略,可以通过配置来自动触发compaction,比如:小文件数量达到一定值时,在流式作业阶段启动minor compaction任务,在对实时任务影响较小的情况下,达到合并小文件的目的。

四、问题 & 方案

接下来介绍一下我们在落地Delta的过程中遇到过的问题

(一)埋点数据动态分区数据量分布不均导致的数据倾斜问题 Soul的埋点数据是落入分区宽表中的,按埋点类型分区,不同类型的埋点数据量分布不均,例如:通过Spark写入Delta的过程中,5min为一个Batch,大部分类型的埋点,5min的数据量很小(10M以下),但少量埋点数据量却在5min能达到1G或更多。数据落地时,我们假设DataFrame有M个partition,表有N个动态分区,每个partition中的数据都是均匀且混乱的,那么每个partition中都会生成N个文件分别对应N个动态分区,那么每个Batch就会生成M*N个小文件。

为了解决上述问题,数据落地前对DataFrame按动态分区字段repartition,这样就能保证每个partition中分别有不同分区的数据,这样每个Batch就只会生成N个文件,即每个动态分区一个文件,这样解决了小文件膨胀的问题。但与此同时,有几个数据量过大的分区的数据也会只分布在一个partition中,就导致了某几个partition数据倾斜,且这些分区每个Batch产生的文件过大等问题。

解决方案:如下图,我们实现了用户通过SQL自定义配置repartition列的功能,简单来说,用户可以使用SQL,把数据量过大的几个埋点,通过加盐方式打散到多个partition,对于数据量正常的埋点则无需操作。通过此方案,我们把Spark任务中每个Batch执行最慢的partition的执行时间从3min提升到了40s,解决了文件过小或过大的问题,以及数据倾斜导致的性能问题。

(二)应用层基于元数据的动态schema变更 数据湖支持了动态schema变更,但在Spark写入之前,构造DataFrame时,是需要获取数据schema的,如果此时无法动态变更,那么便无法把新字段写入Delta表,Delta的动态schena便也成了摆设。埋点数据由于类型不同,每条埋点数据的字段并不完全相同,那么在落表时,必须取所有数据的字段并集,作为Delta表的schema,这就需要我们在构建DataFrame时便能感知是否有新增字段。

解决方案:我们额外设计了一套元数据,在Spark构建DataFrame时,首先根据此元数据判断是否有新增字段,如有,就把新增字段更新至元数据,以此元数据为schema构建DataFrame,就能保证我们在应用层动态感知schema变更,配合Delta的动态schema变更,新字段自动写入Delta表,并把变化同步到对应的Hive表中。

(三)Spark Kafka偏移量提交机制导致的数据重复 我们在使用Spark Streaming时,会在数据处理完成后将消费者偏移量提交至Kafka,调用的是spark-streaming-kafka-0-10中的commitAsync API。我一直处于一个误区,以为数据在处理完成后便会提交当前Batch消费偏移量。但后来遇到Delta表有数据重复现象,排查发现偏移量提交时机为下一个Batch开始时,并不是当前Batch数据处理完成后就提交。那么问题来了:假如一个批次5min,在3min时数据处理完成,此时成功将数据写入Delta表,但偏移量却在5min后(第二个批次开始时)才成功提交,如果在3min-5min这个时间段中,重启任务,那么就会重复消费当前批次的数据,造成数据重复。

解决方案: 1.StructStreaming支持了对Delta的exactly-once,可以使用StructStreaming适配解决。 2.可以通过其他方式维护消费偏移量解决。

(四)查询时解析元数据耗时较多 因为Delta单独维护了自己的元数据,在使用外部查询引擎查询时,需要先解析元数据以获取数据文件信息。随着Delta表的数据增长,元数据也逐渐增大,此操作耗时也逐渐变长。 解决方案:阿里云同学也在不断优化查询方案,通过缓存等方式尽量减少对元数据的解析成本。

(五)关于CDC场景 目前我们基于Delta实现的是日志的Append场景,还有另外一种经典业务场景CDC场景。Delta本身是支持Update/Delete的,是可以应用在CDC场景中的。但是基于我们的业务考量,暂时没有将Delta使用在CDC场景下,原因是Delta表的Update/Delete方式是Join式的Merge方式,我们的业务表数据量比较大,更新频繁,并且更新数据涉及的分区较广泛,在Merge上可能存在性能问题。 阿里云的同学也在持续在做Merge的性能优化,比如Join的分区裁剪、Bloomfilter等,能有效减少Join时的文件数量,尤其对于分区集中的数据更新,性能更有大幅提升,后续我们也会尝试将Delta应用在CDC场景。

五、后续计划

1.基于Delta Lake,进一步打造优化实时数仓结构,提升部分业务指标实时性,满足更多更实时的业务需求。 2.打通我们内部的元数据平台,实现日志接入->实时入库->元数据+血缘关系一体化、规范化管理。 3.持续观察优化Delta表查询计算性能,尝试使用Delta的更多功能,比如Z-Ordering,提升在即席查询及数据分析场景下的性能。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2021-03-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据技术架构 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
认识 Delta Lake
17,18是计算引擎火热的两年,19年已然是红海了。计算引擎中的王者是Spark,综合指标最好,生态也好,当其他引擎还在ETL,交互查询,流上厮杀时,Spark已经在AI领域越走越远。
用户2936994
2022/04/25
7430
Hudi、Iceberg 和 Delta Lake:数据湖表格式比较
在构建数据湖时,可能没有比存储数据格式更重要的决定了。结果将直接影响其性能、可用性和兼容性。
大数据杂货铺
2022/12/02
4.2K0
深度对比delta、iceberg和hudi三大开源数据湖方案
目前市面上流行的三大开源数据湖方案分别为:delta、Apache Iceberg和Apache Hudi。其中,由于Apache Spark在商业化上取得巨大成功,所以由其背后商业公司Databricks推出的delta也显得格外亮眼。Apache Hudi是由Uber的工程师为满足其内部数据分析的需求而设计的数据湖项目,它提供的fast upsert/delete以及compaction等功能可以说是精准命中广大人民群众的痛点,加上项目各成员积极地社区建设,包括技术细节分享、国内社区推广等等,也在逐步地吸引潜在用户的目光。Apache Iceberg目前看则会显得相对平庸一些,简单说社区关注度暂时比不上delta,功能也不如Hudi丰富,但却是一个野心勃勃的项目,因为它具有高度抽象和非常优雅的设计,为成为一个通用的数据湖方案奠定了良好基础。
大数据技术架构
2020/03/25
4.3K0
深度对比delta、iceberg和hudi三大开源数据湖方案
Data Lake 三剑客—Delta、Hudi、Iceberg 对比分析
定性上讲,三者均为 Data Lake 的数据存储中间层,其数据管理的功能均是基于一系列的 meta 文件。meta 文件的角色类似于数据库的 catalog/wal,起到 schema 管理、事务管理和数据管理的功能。与数据库不同的是,这些 meta 文件是与数据文件一起存放在存储引擎中的,用户可以直接看到。这种做法直接继承了大数据分析中数据对用户可见的传统,但是无形中也增加了数据被不小心破坏的风险。一旦某个用户不小心删了 meta 目录,表就被破坏了,想要恢复难度非常大。
王知无-import_bigdata
2020/02/19
4.3K0
B站基于Hudi+Flink打造流式数据湖的落地实践
上图展示了当前B站实时数仓的一个简略架构,大致可以分为采集传输层、数据处理层,以及最终的AI和BI应用层。为保证稳定性,数据处理层是由以实时为主,以离线兜底的两条链路组成,即我们熟知的批流双链路。
ApacheHudi
2023/09/04
1.2K0
B站基于Hudi+Flink打造流式数据湖的落地实践
Hive面试题持续更新【2023-07-07】
Hive是一个在Hadoop上构建的数据仓库基础架构,它提供了一种类似于SQL的查询语言,称为HiveQL,用于处理和分析大规模的结构化数据。Hive的体系架构主要包括以下几个组件:
火之高兴
2024/07/25
1430
数据湖在大数据典型场景下应用调研个人笔记
数据湖是一种不断演进中、可扩展的大数据存储、处理、分析的基础设施;以数据为导向,实现任意来源、任意速度、任意规模、任意类型数据的全量获取、全量存储、多模式处理与全生命周期管理;并通过与各类外部异构数据源的交互集成,支持各类企业级应用。
王知无-import_bigdata
2021/03/26
1.3K0
数据湖在大数据典型场景下应用调研个人笔记
「Hudi系列」Hudi查询&写入&常见问题汇总
2. 「Hudi系列」Apache Hudi入门指南 | SparkSQL+Hive+Presto集成
王知无-import_bigdata
2022/06/05
6.8K0
「Hudi系列」Hudi查询&写入&常见问题汇总
聊聊流式数据湖Paimon(一)
Apache Paimon (incubating) 是一项流式数据湖存储技术,可以为用户提供高吞吐、低延迟的数据摄入、流式订阅以及实时查询能力。
Ryan_OVO
2023/12/26
2K0
聊聊流式数据湖Paimon(一)
17张图带你彻底理解Hudi Upsert原理
如果要深入了解Apache Hudi技术的应用或是性能调优,那么明白源码中的原理对我们会有很大的帮助。Upsert是Apache Hudi的核心功能之一,主要完成增量数据在HDFS/对象存储上的修改,并可以支持事务。而在Hive中修改数据需要重新分区或重新整个表,但是对于Hudi而言,更新可以是文件级别的重写或是数据先进行追加后续再重写,对比Hive大大提高了更新性能。upsert支持两种模式的写入Copy On Write和Merge On Read ,下面本文将介绍Apache Hudi 在Spark中Upsert的内核原理。
ApacheHudi
2021/05/24
6.7K0
17张图带你彻底理解Hudi Upsert原理
为 Delta 新增 Upsert(Merge)功能
今天花了一早上以及午休时间,终于把delta的Upsert功能做完了。加上上周周四做的Delta Compaction支持,我想要的功能基本就都有了。
用户2936994
2019/06/11
9550
实时湖仓一体规模化实践:腾讯广告日志平台
1. 背景 1.1 整体架构 腾讯广告系统中的日志数据流,按照时效性可划分为实时和离线,实时日志通过消息队列供下游消费使用,离线日志需要保存下来,供下游准实时(分钟级)计算任务,离线(小时级/天级/Adhoc)分析处理和问题排查等基于日志的业务场景。因此,我们开发了一系列的日志落地处理模块,包括消息队列订阅 Subscriber,日志合并,自研 dragon 格式日志等,如下图所示: Subscriber:Spark Streaming 任务,消费实时数据,落地到 HDFS,每分钟一个目录,供下游准实时
腾讯大数据
2022/09/20
1.2K0
实时湖仓一体规模化实践:腾讯广告日志平台
基于Apache Hudi 的CDC数据入湖
首先我们介绍什么是CDC?CDC的全称是Change data Capture,即变更数据捕获,它是数据库领域非常常见的技术,主要用于捕获数据库的一些变更,然后可以把变更数据发送到下游。它的应用比较广,可以做一些数据同步、数据分发和数据采集,还可以做ETL,今天主要分享的也是把DB数据通过CDC的方式ETL到数据湖。
ApacheHudi
2021/10/11
1.2K0
基于Apache Hudi 的CDC数据入湖
[LakeHouse] Delta Lake全部开源,聊聊Delta的实现架构
刚刚结束的Data + AI summit上,Databricks宣布将Delta Lake全部开源。
Tim在路上
2022/09/01
1.2K0
[LakeHouse] Delta Lake全部开源,聊聊Delta的实现架构
腾讯广告业务基于Apache Flink + Hudi的批流一体实践
广告主和代理商通过广告投放平台来进行广告投放,由多个媒介进行广告展示 ,从而触达到潜在用户。整个过程中会产生各种各样的数据,比如展现数据、点击数据。其中非常重要的数据是计费数据,以计费日志为依据向上可统计如行业维度、客户维度的消耗数据,分析不同维度的计费数据有助于业务及时进行商业决策,但目前部门内消耗统计以离线为主,这种T+1延迟的结果已经无法满足商业分析同学的日常分析需求,所以我们的目标为:建设口径统一的实时消耗数据,结合BI工具的自动化配置和展现能力,满足业务实时多维消耗分析,提高数据运营的效率和数据准确性。
ApacheHudi
2022/07/11
1.2K0
腾讯广告业务基于Apache Flink + Hudi的批流一体实践
OnZoom基于Apache Hudi的流批一体架构实践
OnZoom是Zoom新产品,是基于Zoom Meeting的一个独一无二的在线活动平台和市场。作为Zoom统一通信平台的延伸,OnZoom是一个综合性解决方案,为付费的Zoom用户提供创建、主持和盈利的活动,如健身课、音乐会、站立表演或即兴表演,以及Zoom会议平台上的音乐课程。
ApacheHudi
2021/12/21
1.5K0
OnZoom基于Apache Hudi的流批一体架构实践
最新大厂数据湖面试题,知识点总结(上万字建议收藏)
本文目录: 一、什么是数据湖 二、数据湖的发展 三、数据湖有哪些优势 四、数据湖应该具备哪些能力 五、数据湖的实现遇到了哪些问题 六、数据湖与数据仓库的区别 七、为什么要做数据湖?区别在于? 八、数据湖挑战 九、湖仓一体 十、目前有哪些开源数据湖组件 十一、三大数据湖组件对比
五分钟学大数据
2022/04/07
1.2K0
最新大厂数据湖面试题,知识点总结(上万字建议收藏)
实战|使用Spark Streaming写入Hudi
传统数仓的组织架构是针对离线数据的OLAP(联机事务分析)需求设计的,常用的导入数据方式为采用sqoop或spark定时作业逐批将业务库数据导入数仓。随着数据分析对实时性要求的不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。
ApacheHudi
2021/04/13
2.3K0
基于 Iceberg 拓展 Doris 数据湖能力的实践
6月 26 号,由示说网主办,上海白玉兰开源开放研究院、云启资本、开源社联合主办的上海开源大数据技术 Meetup 如期举行。Apache Doris 社区受邀参与本次 Meetup ,来自百度的资深研发工程师 张文歆 为大家带来了题为“ 基于 Iceberg 拓展 Doris 数据湖能力的实践 ”的主题分享,以下是分享内容。
从大数据到人工智能
2022/05/18
1.5K1
基于 Iceberg 拓展 Doris 数据湖能力的实践
每年节约3千万!微信实验平台Iceberg湖仓一体架构改造
微信实验平台主要提供微信内部各个业务场景(视频号、直播、搜一搜、公众号等)下的各类实验场景的支持,有 AB 实验、MAB 实验、BO 实验、Interleaving 实验、客户端实验、社交网络实验、双边实验等。
腾讯云开发者
2023/08/25
1.4K0
每年节约3千万!微信实验平台Iceberg湖仓一体架构改造
相关推荐
认识 Delta Lake
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档