在大数据生态中,数据分析系统在数据创造价值过程中起着非常关键的作用,直接影响业务决策效率以及决策质量。Apache Doris作为一款支持对海量大数据进行快速分析的MPP数据库,在数据分析领域有着简单易用、高性能等优点。近日,Apache Doris组织了一场线上Meetup,作业帮带来了《Doris在作业帮实时数仓中的应用实践》的主题分享。
大家下午好。很感谢大家参加全球100案例峰会预热沙龙关于Doris的线上MeetUp。
下面我来介绍下Doris在作业帮实时数仓中的应用与实践。
这次的分享主要分三个主题:
1.首先是所在团队的业务与背景介绍 2.其次会介绍下基于Doris,作业帮的查询系统是如何构建的,以及主要解决的问题 3.未来的规划
我所在团队是作业帮大数据团队,主要负责建设公司级数仓,向各个产品线提供面向业务的数据信息,如到课时长、答题情况等业务数据以及如pv、uv、活跃等流量类数据,服务于拉新、教学、BI等多个重要业务线。
在数仓体系中,大数据团队主要负责到ODS-DWS的建设,从DWS到ADS一般是数仓系统和业务线系统的边界。
在过去,由于缺失有效、统一的查询系统,我们探索了很多模式来支持各个业务线发展。
随着需求越来越多,系统也越来越难以维护,交付效率也特别低,需求排队非常严重。因此,提供有效而统一的查询系统,对于实时数仓建设在提高业务支持效率、降低维护成本上都具有非常重大的意义。
经过过去数月的探索与实践,我们确立了以Doris为基础的实时查询系统。同时也对整个实时数仓的数据计算系统做了一次大的重构,最终整体的架构图如下:
如图所示(从下到上),原始业务层日志经数据摄入系统进入数仓,在数据清洗计算层,我们将原来基Spark系统升级到了Flink,并且基于Flink-Sql提供了统一的数据开发框架,从原有的代码开发升级到Sql开发来提升数据的研发效率。
其后查询系统将Kafka的数据实时同步到查询引擎内,并通过OpenAPI的统一接口对外提供查询服务。
基于Doris的查询系统上线后,我们面对一个需求,不用像过去一样做方案调研、开发接口、联调测试,现在只要把数据写入,业务层就可以基于sql自己完成数据查询、业务开发,交付效率(数据计算好到提供可读服务)从过去的数人周加快到小时级。
在性能方面,过去基于ES或者mysql来做,当查询的数据量较大时,我们只能忍受数十个小时到数分钟的延迟,基于Doris的方案,加快到分钟级甚至秒级。
Doris的整体架构非常简单,不依赖任何第三方组件,社区支持度也非常好,从上线到今,我们只需做一些轻量级的运维规范,即可保证高稳定性。
所以说,通过引入Doris,解决了作业帮内实时数仓查询交付慢、查询慢的痛点问题,对于后续数仓的系统发展起到了非常关键的作用。
接下来,重点讲下查询系统的工作,分两部分:查询系统的架构选型以及原理,以及应用&实践.
在讲查询引擎之前,先讲下业务场景。
作业帮内,业务场景主要分两种:一种是传统的流量类,比如算pv、uv、活跃……,作业帮内很多时候还需要看进一步的明细,比如作业帮主App 在每天各个小时的活跃用户数,还要看 作业帮主App每个小时内各个版本的活跃用户数。
第二种是面向我们业务线的工作台,比如教学的老师。比如我们的老师上完课后,会看下自己班内的同学们的出勤数据、课堂测验数据等。
这两种场景下,考虑到调研成本、团队技术生态、维护成本等多种因素,我们最后选择了Doris 作为我们的查询引擎。主要是Doris可在上述两种场景下都可以统一的满足业务的需求。
首先介绍下Doris。
Doris是 mpp架构的查询引擎。
整体架构非常简单,只有FE、BE两个服务,FE负责Sql解析、规划以及元数据存储,BE负责Sql-Plan的执行以及数据的存储,整体运行不依赖任何第三方系统,功能也非常丰富如支持丰富的数据更新模型、Mysql协议、智能路由等。对于业务线部署运维到使用都非常友好。
接下来讲下用Doris如何解决我们前面提到的业务场景下的问题。
Doris有多种数据模型,流量类场景常用的是聚合模型。比如对于前面提到的场景,我们会吧作业帮主App各个版本的明细数据存到base表中,如果直接从base表中读取跨天级的聚合数据,由于数据行比较多,可能会出现查询延迟的问题,因此我们会对常用的天级数据做一次rollup,这样通过预聚合,来减少查询的数据量,可以加快查询的延迟。
要高效的使用Doris的聚合模型,前提都是基于key列做数据行筛选,如果使用value列,Doris需要把相关的行全部聚合计算后方可决策是否属于结果集,因此效率比较低。
而对于教研工作台,前面提到的都是基于value的筛选,因此使用了Doris on ES的模型。主要是考虑到 可以发挥ES的任意列检索的能力,来加快查询速度。
在我们的实践中,发现Doris on ES相比直接裸用ES或社区的其他方案如Presto on ES在性能上有很大的提升,接下来介绍下Doris on ES高性能的设计原理。
Doris on ES整体的架构如图,FE负责查询ES的元数据信息如location、shard等,BE负责从ES数据节点扫描数据。
Doris on ES高性能,相比裸用ES,有几个优化点: 裸用ES时,ES采用的是Query then Fetch的模式,比如请求1000条文档,ES有10个分片,这时候每个分片都会给协调返回1000个doc id,然后 协调节点其实拿到了10 * 1000个doc id,然后选择1000个。这样其实每个分片多返回了900个.
Doris on ES则绕过了协调节点直接去操作datanode。它会在每个datanode上查询符合预期的docid,这样不会有过多的docid返回。
其次,Doris从ES扫描数据时,也做了很多优化。比如在扫描速度上,采用了顺序扫描、列存优化、谓词下推等,在数据从ES传输到Doris时,采用就近原则如BE会优先访问本机的datanode、source filter来过滤不用的字段等来加速传输速度。
在我们的调研中,Doris on ES的性能,比Presto on ES快了有数十倍。
在作业帮内,除了上面介绍的基于Doris的数据模型做的基础应用,要完整的支持业务、保证稳定性、提高效率,还需要其他周边的系统建设。
接下来介绍下基于Doris,作业帮查询系统架构的整体设计以及工作模式。
这是作业帮查询系统的总体架构。
从上往下,首先是我们平台,包括各个报表平台、元数据管理平台等,主要来提高各个场景的人效。
其下红色部分为我们统一的api接口层,这里我们主要是制定了api的规范比如请求响应方式、返回码等,来减少系统之间对接的成本。
基于api除了提供了主要的读写接口外,也包含了周边的服务建设,比如元数据管理、调度系统等。
接下来就基于一个完整的流程来介绍下各部分系统。
首先是元数据。Doris基于mysql语法建表,已经有元数据,我们这里做元数据,有几个额外的考虑:
要统一元数据,统一数据模型,就得抽象整个数据表的结构,来管理好不同存储上的表,我们基于env、db、table为基本单位来管理表,database、table大家相对熟悉,env是我们引入的新namespace,主要用于提供不同集群/业务线的定义,如百度云的数仓集群、腾讯云的数仓集群,表单元下主要包含field(列类型、值域)、index(如rollup、bitmap索引等)、storage(存储属性)。
关于列属性,主要是规范化类型系统,考虑到json-schema由于其校验规则丰富、描述能力强,因此对于列值的约束统一使用json-schema来做。
对于数据类型,我们设计了公共数据类型以及私有数据类型。公共类如varchar、int等,这些在不同的存储系统都有对应的实现,也支持私有类型如Doris::bitmap,方便私有系统的兼容和扩展。通过这个模式可以将基于各个存储系统的表做了统一的管理。
这是我们线上的真实的一张表。里面包含了列信息以及对应的存储配置。
左图中的纵向红框是json-schema的描述,来规范化值域。横向红框为ES表的一些meta字段,比如docid、数据更新时间。这些字段可以方便追查数据问题、以及用作数据筛选。
因为我们统一了数据模型,因此可以很方便的对所有表统一设置要增加这些meta字段。
通过元数据的统一管理,构建的表质量都非常高。所有的表都在最大化性能的提供查询服务,且由于数据导致的查询不可用case为0。且对于任何业务线的同学,不管是否了解Doris,都可以分钟级构建出这样一张高质量的表。
建好表后,就是数据的写以及读。统一基于openapi来做。
做api接口其实本质上也是为了在提供系统能力的前提下,进一步保障系统的稳定性和易用性。
比如要控制业务线的误用(如连接数打满),提供统一的入口方便写es、Doris,且控制数据质量……
首先介绍下数据写接口。
由于统一了表模型,因此可以很方便的提供统一的写入接口协议。用户也无须关注实际表的存储是es还是Doris以及处理异构系统的系统。
第二,统一了写接口,就可以统一的对写入的数据会做校验检查,如数据的大小、类型等,这样可以保证数据写入的质量与准确性。这样对于数据的二次加工非常重要。
第三,接入协议中还增加了关键词,如数据的版本。可以解决数据的乱序问题,以及建立统一的写入监控。如下图是我们整个写入数据流的qps以及端到端(数据写入存储时间以及数据生产时间)的延迟分位值,这样可以让系统提高可观测性、白盒化。
接下来讲一个具体的场景,写入端是如何解决乱序问题的。
常态下我们的实时数据流是经过flink或spark计算后写入kafka,然后由查询系统同步到Doris/es中。
当需要修数时,如果直接写入,会导致同一个key的数据被互相覆盖,因此为了避免数据被乱序覆盖,就得必须停掉实时流,这个会导致数据时效性式受损。
因此我们基于写入端做了改进,实时数据流、离线修复数据流各自写入不同的topic,同步服务对每个topic做限速消费,如实时流时效性要求高,可以配额调的大些,保证配额,离线时效性则允许配额小点,或者在业务低峰期将配额调大,并基于数据key&列版本存储做了过滤。这样可以保证时效性的前提下,修数也可以按照预期进行。
最后是读的部分。
在提供sql能力的前提下,我们也做了一些额外的方案,比如缓存、统一的系统配置。对于系统延迟、稳定性提升都有很大的改进。并且由于统一了读接口,上述的这些改造,对于业务线来说都是透明的。
除了常规下面向低延迟的读,还有一类场景面向吞吐的读。 介绍下场景,比如 要统计统计某个学部下(各个老师)的学生上课情况:上课人数、上课时长等。
在过去,我们是基于spark/flink来处理这类问题,如spark消费kafka中的课中数据,对于每一条数据,会去redis中查询教师信息来补全维度。
常态下,当课中数据到达的时候,教师信息是就绪的,因此没有什么问题。可是在异常下,如维度流迟到、存储查询失败等,会导致课中流到达时,无法获取对应的教师信息,也就无法计算相关维度如学部的统计。
过去面临这种情况时,只能遇到这种异常,如重试如果无法解决,只能丢弃或者紧急人工干预,比如在尾标就绪后再重新回刷课中表,一旦遇到上游kafka数据过期就只能从ods层或者离线修复,效率特别低,用户体验也非常差。
基于Doris模式下,我们使用微批调度的模式。
调度系统会定期(分钟级)执行一个调度任务,基于sql join完成数据的选取。这样哪怕在异常下,课中流查不到教师数据,这样join的结果只是包含了可以查到教师数据的信息, 待教师数据就绪后,即可自动补全这部分课中数据的维度。整个过程全部自动化来容错。效率非常高。
因此这个模式的主要好处:
最后,讲下其他方面的建议实践,这些相对简单,但是在实际的应用中非常容易忽视。
最后,讲下规划。
Doris 在作业帮实时数仓的建设中发挥了很关键的作用。
在实际的应用中,我们也发现了一些当前的一些不足。
如Doris on ES在面对大表的join查询时,目前延迟还比较大,因此需要进一步的优化解决; Doris自身的olap表可以做动态分区,对于ES表目前可控性还不足; 其次,当ES修改表后,如增加字段,只能删除Doris表重建,可能会有短暂的表不可用,需要自动化同步或者支持在线热修改; 最后Doris on ES可以支持更多的谓词下推,如count等。
我们也希望可以和社区一起,把Doris建设的越来越好。
好的。我的分享到此结束。谢谢大家。
问题1:Doris on ES V.S. sparksql on ES,在功能上和性能上咱们调研过吗?对于使用那个您这边有什么建议吗?
问题2:Doris 支持Hive Metastore,和Flink SQL是什么关系?刚才讲的太快,有点没听懂
问题3:_version字段是一个内部字段?需要用户端写入的时候指定,还是系统自动创建?和HBase的version的应用场景有区别吗?
领取专属 10元无门槛券
私享最新 技术干货