首页
学习
活动
专区
工具
TVP
发布
精选内容/技术社群/优惠产品,尽在小程序
立即前往

SeaTunnel 在 oppo 的特征平台实践 | ETL 平台数据处理集成

今天的分享包含以下几点:

  • 背景 &需求
  • 为什么是 SeaTunnel
  • ETL 平台集成实践

**作者简介 **

01 业务背景和需求痛点

业务背景

推搜广场景下存在大量的数据同步和特征处理需求。推荐搜索广告业务涉及图中几个模块,以特征为基础的特征服务,上层支持了机器学习、召回引擎和预估引擎。召回引擎和预估引擎支撑着更上层的推荐引擎业务的召回、粗排、精排、重排,最终产出结果。这是推搜广的主要业务流程,其中有些细小差别,但大体相似。

对于推荐系统,物料数据是推荐系统要推荐的内容,包括视频、文章或商品等。推荐系统的主要数据包括用户行为日志、服务端日志、物料数据、实时特征快照等数据,我们首先会接入 kafka 中,分两个流,一是同步到 hdfs 作为离线数据支持离线用户画像、物品画像、离线行为特征等离线特征数据的计算;二是 Kafka 中的数据经过实时的 Flink 或 Storm 处理,进行特征正负样本拼接、日志拼接和特征计算等,生成实时用户画像、物料动态画像、用户序列特征、实时快照特征等实时流特征数据。实时和离线特征通过特征注册存储到 redis、mongodb、parker、cassandra 等存储中通过特征服务对接到上层应用。

当用户向推荐系统发起一个请求时,首先触发推荐系统召回。召回有多种类型,协同类召回是基于物物相似 itemcf,人人相似 usercf,人物矩阵分解等;向量化是把一个内容或者物品通过向量化 embedding 的方式表达出来,再计算相似度;池子召回,是热点池和精品池或者运营池等进行推荐;模型召回是基于一些模型算法挖掘出来的、对用户推荐的候选集数据,进入召回阶段。

召回阶段可能存在 5000 篇视频或文章,这些数据进入粗排。粗排是对召回的数据通过预估引擎进行一次粗粒度的物料筛选,筛选出 5000 中可能的 1000 篇。预估引擎利用了机器学习的一些模型,进行预估和打分。打分后会进行排序。进入精排后会输入更多特征数据,包括交叉特征等,进行更细粒度的筛选、排序和打分,前面的 1000 篇可能会剩下 50 篇或更少。这个结果会进一步进行重排,重排有多重手段,像将一些内容必须插入到某个位置的调整,还包括同类文章数据按规则打散减少同质化内容、提升用户体验,也会对推荐内容做去重复等操作。完成重排后输出结果。

可以看到机器学习、召回引擎以及预估引擎都是以数据和特征为基础的,这些业务场景下有大量数据处理。数据处理主要是特征计算,而计算过程中也需要将产生的数据模型同步到对应存储中,这就是我们业务场景中数据同步需求的来源。整套系统支持了 10-20 个业务,整体数据同步的需求较大。

痛点和目标

业务多,任务碎片化。一些任务部署在调度系统中,一些任务是以 Crontab 形式配置的,开发人员维护同步任务困难,且没有上线前后的串联关系。数据同步和数据处理需求量大,人力有限,同步任务开发和部署零散,有 Spark、Flink 任务也有脚本,开发人员为了维护多个同步任务,同时还需要熟悉打包、编译、上线流程,维护流程难以统一化。且数据同步任务和数据处理存在烟囱式开发的问题,难以通用化,消耗人力物力。

我们需要让数据处理和同步任务标准化、对处理和同步任务进行统一管理,希望能将数据处理和同步抽象成工具化的产品,让数据处理和同步的能力通用化,可被复用。同时让数据处理和同步工具可以有普适性,能够产出一些低学习成本、高开发效率的工具达到减少重复劳动、提升效率的效果。

流程统一

为了解决痛点、达到目标,我们首先进行了数据处理和同步任务开发部署流程的统一。这里以样本拼接为例,样本拼接是我们业务中重要的一环,分为离线和近线。样本拼接主要指取得用户当时的一些特征快照数据,给予用户对这个推荐结果的一个正负反馈,如是否点击、是否曝光、是否下载,把这些数据作为样本输入到训练模型的样本中。我们的样本拼接主要做正负样本。

离线样本拼接首先经过 Spark 完成样本拼接和特征抽取后,结果存储到 HDFS,对接离线模型训练完成离线处理。近线样本拼接通过 Flink 对实时日志流数据进行处理,完成样本拼接和特征抽取后放入 Kafka,最终对接增量训练模型后完成近线处理。这里的实现是两套代码,接口和 API 不完全相同,由不同人维护,维护成本高。两套系统,分别存储的数据容易出问题,离线近线两套系统数据容易出现不一致问题,对最终模型训练和实验效果有一定影响。

在此基础上,我们统一了处理流程,实时(近线)和离线均用 Flink 处理数据,维护同一套引擎代码。通过 Flink 进行实时流样本拼接、特征抽取,得到的样本数据存储到 Iceberg 数据湖。Iceberg 对接离线和增量模型训练,进行数据处理。这套方案统一存储、减少数据冗余,避免了特征不一致问题发生。使用一套计算和存储引擎,函数复用,提升了效率。

结构统一

首先我们做了样本结构化。我们把输入到模型的前置特征数据基于不同类型做了分解。

图中第一部分是业务单元,业务单元主要指用户 ID、物料 ID、时间戳,这些是用户请求后的快照数据。第二部实时特征,是用户请求的那一时刻的状态,比如那一时刻对某个兴趣的上下文的那个实时特征。此外还有一些离线特征。

分解前,业务单元呀、实时特征呀、离线特征都是统一通过引擎去输入,dump 到 Kafka,然后再去做特征的样本拼接和数据处理的。但离线特征这一部分的特征很多是静态的,不会频繁变更。如果每次都走流式计算这些离线特征,数据量会特别大。而且数据重复传输,可能一些数据在模型里面根本不会用到。我们做分解后,实时特征实时地请求,离线特征进行填补,减少了数据冗余,整个样本的数据也更结构化。

除样本的结构化外,我们对于样本内的特征也做了标准化,即存储格式的标准化,底层存储用 pb 格式序列化。数据中的 byte、string、Int64List、FeatureValueList 特征数据等。经过统一后,数据可以跨业务地复用、屏蔽底层细节。

功能模块化

在结构统一的基础上,我们进一步做了特征的生产-存储-服务全流程标准化。并统一了 API、对接上下游屏蔽底层细节,让跨业务特征数据共享挖掘更大价值。

我们的特征中心对接了实时和离线数据源,进行特征生产后,将特征注册到特征存储,然后由特征服务统一对接预估模型服务或用作召回数据。

我们的特征服务支持在线计算,在线计算主要支持用户行为序列,近期特征和一些画像特征中需要统计一些基础数据的场景。如最近 20 条的某个类目的峰值、占比、CTR,可以通过在线的方式实时计算,这样能够增加业务的灵活性。

在这个结构上,我们的特征生产是基于 SeaTunnel 做的。

02 为什么用 SeaTunnel

SeaTunnel 构建在 Spark 和 Flink 上,借助 Flink、Spark 能够满足大数据量实时离线, 高性能的同步和处理能力。用户不需要关注细节,通过配置化、插件的方式,配合 SQL,就可以快速部署数据同步应用到生产中。

SeaTunnel 处理流程高度抽象,逻辑清晰。SeaTunnel 对接 Hadoop、Kafka、ES、Clickhouse 等数据源,经过 Source 输入,Source 对接 Transform,Transform 中进行数据逻辑处理,包括数据过滤等。数据处理完成后对接 Sink 将数据输出到目标数据源中。

图中是一个 SeaTunne 任务的开发配置,env 部分配置可配置任务的并行度和任务的优化参数、checkpoint 路径和频率等。Source 部分可以配置 FakeSourceStream 数据源,做测试使用。FakeSourceStream 可以配置数据表和字段名称。Transform 部分从 source 读取数据,可配置多个数据处理的 SQL 和临时表名。这里配置的 Sink 是 ConsoleSink,是输出到控制台,方便观察和调试。

SeaTunnel 基于 java SPI 技术,非常便于扩展。下图是整个顶层接口的设计,实线表示了继承关系,白色的虚线是依赖关系。顶层是基于 Plugin,在 Plugin 的基础上涉及了 BaseSource、BaseSink 和 BaseTransform。下面还有 BaseFlinkSource、BaseFlinkSink 和 BaseFlinkTransform 三个高度抽象的处理流程。继承这些接口,实现自己的逻辑即可。

除了 Source、Sink、Transform 外还有 Runtime,Runtime 是封装了整个运行环境。Flink 封装了 BatchEnv、StreamEnv。除此之外还有 Execution,Execution 串联了整个 Job 的执行流程。

整体结构清晰明了,插件实现起来很容易。

SeaTunnel 已经支持多种数据源,在此基础上减少了造轮子的情况发生。SeaTunnel 社区已经支持了 Doris、Redis、MongoDB、Hive、MySQL、TiDB、ElasticSearch、Clickhouse 等数据源。

03 ETL 平台集成

ETL 特征生产处理平台是基于 SeaTunnel 进行的二次开发,构建在 flink 之上。

  • ETL 集成平台同 SeaTunnel 一样插件化,非常便于扩展与集成;
  • SeaTunnel 已经支持了好多数据源,不需要从头开始造轮子;
  • ETL 平台提供了配置化的方式,便于上手,用户不需要编写代码和了解底层细节和 API,就可以完成一个流任务或批任务的开发;
  • 借助 Flink 的状态机制和实时处理的特性,非常适合窗口统计类实时运算的操作,非常切合推荐业务场景;

图中是 ETL 特征生产处理平台的架构,有相对独立的监控管理模块,Flink 引擎之上有数据输入层、数据处理层和数据输出层。

监控包括配置管理和任务管理,质量监控是负责配置质质量监控、告警。任务编排针对离线任务的依赖关系的管理。元数据是管理如 Kafka 输入的数据源的信息。此外还有血缘依赖。数据输入层接入了 Hdfs、MQ、Kafka、HBase 等数据源。数据处理层可以对数据进行标准化处理,支持 SQL、DataSet/DataStream、数据格式转换和数据压缩等处理。数据输出层支持了 Scylladb、Hdfs、Redis、Kafka 等。平台的应用场景包含数据同步、数据处理、特征计算、样本拼接,都是基于底层的模块系统。

下面会详细介绍 ETL 的任务是怎么跑起来,又是怎么开发的。

组件单元分为三块:plugin、SQL 和 UDF。Plugin 是 Source、Transformer 插件,SQL 可做逻辑处理,我们也封装了很多具有通用性的 UDF,和根据业务场景定制化的 UDF。结合几个模块即可生成一个 Job 的配置文件。Job 既可以是批任务也可以是流处理。整个 Job 的配置文件生成后会进行任务的管理编排,调度系统 oflow 负责编排批任务、ostream 负责编排流处理任务。oflow 会管理任务的上下游依赖关系,重试次数、报警等。ostream 是 Flink 运行的环境,管理任务参数配置等。对任务进行编排后会对任务进行提交,支持编排任务在 Yarn 和 k8s 平台进行运行。

ETL 平台 Job 开发流程是怎样的?

首先我们采用了插件参数化管理的模式。以 Kafka Source 为例,如下图所示,可以配置 KafkaTableSchema 的实例名、schema、消费者信息和字段信息等。

建好 Source 后,对于不了解的新手,可以通过平台提供的托拉拽方式完成一个任务的编辑。同时借助 DAG 图的能力,能够非常容易地理解整个作业的流程。\

对整个流程比较熟悉的用户则可以直接编辑配置,或者通过复制编辑、修改部分配置,就可以快速完成一个 job 构建。

配置与插件互通,用户可以根据情况灵活选择和编辑。DAG 图可以辅助用户检查是否存在依赖错误的问题。

如何确保一个任务正确并且稳定的开发和上线运行呢?我们通过三个环节进行把控:配置检验,检查是否配置中已存在错误;逻辑调试,判断是否存在逻辑错误,如 SQL 中的字段错误或命名问题,都可以在这一步被发现;线上监控是在任务上线后保障任务稳定运行。

如何将错误控制在提交之前?首先我们会对配置进行校验。保证组成配置是合理的,SQL 是没有语法的错误的。如果肉眼观察能难发现错误,我们会对复杂 SQL 通过 SQL 解析快速定位到具体的编写错误。

第二是逻辑校验,对任务进行调试。任务调试需要数据,我们提供了两种方式采集数据,一种是线上采样,另一种是样例数据上传。线上采样有时过于随机,不满足需求,这时用户可以上传样例数据造出需要的样本,观察下游数据产出是否通过测试。

逻辑校验对全流程进行模拟,打印日志,将逻辑错误预先排除。启动调试流程后,经过样本上传或线上采样获得测试数据,进入模拟流程。最后输出结果。调试任务启动后,会对原有的任务配置做更改,在任务中插入埋点、将中间表数据处理进行输出。

图中是逻辑校验过程中打印的日志信息。可以直观地在任务上线前看到错误,防止任务不经过测试就上线污染线上数据。

除了任务上线前校验,我们还对任务进行了监控。任务监控包括 Flink metric 和自定义的指标上报,数据存储到时序数据库中,对接规则告警系统。图中展示了 Grafana 的监控项。任务上线后,基于监控大盘可以非常方便快速地定位和排查问题。

下图是任务监控中作业概况的 Grafana 大盘,可以看到任务运行情况、重启次数和延迟等信息。

业务指标监控会对指标进行收集,服务暴露 metrics 函数和 sink 插件上报到时序数据库。配合规则告警系统对数据报警。

ETL 平台支持了数据同步、样本拼接、时序特征、物料和用户画像、窗口统计等场景。这些基础上,我们提供了模板,为数据同步、时序特征等较为固定和可以通用配置的场景提供了模板。我们基于模板功能可以把通用的能力沉淀下来。当用户需要新增数据同步任务时,可以基于模板对主要参数进行修改,快速创建任务完成数据同步任务的开发。

我们的 ETL 平台对 SeaTunne l 也进行了优化与改进。我们对所有插件都加入了并发控制,例如一个 Kafka 有 20 个 partition,但下游数据处理逻辑比较复杂,需要 40 并发处理数据。此时如果在代码中统一配置了 40 并发,这些并发可能产生很多大而无用的开销。还可以控制写入 HDFS 的文件数量和文件大小等参数。此外我们对 Sink,Source 插件支持了更多扩展,支持了很多内部数据源。我们的配置支持参数动态配置,这个功能在进行数据回溯时非常有用。我们还开发了大量聚合函数和窗口函数,对状态保存和 Sum、Distinct 等聚合函数进行了优化。有一些函数是定制化的,有的业务有统计一些用户最近实时行为特征序列的需求,统计最近 APP 下载的一个窗口最新的 20 条,且同时对 20 条信息去重,同时还要支持分类计数等。这样类场景,如果用 SQL 实现会非常复杂,还需要进行复杂的优化和调优,此时我们会开发比较通用、在性能上经过优化的函数交给用户。

ETL 平台规模方面,当前上线的任务量在 1400 多个,日均处理条目数在 100 亿以上,日均数据量 40T 以上。

我们未来规划是引进 Alink,Flink Ml 等机器学习框架,支持回归以及分类算法、特征工程、窗口计算等与业务契合的能力。还有一个原因的话,就是我们通过这个引进机器学习框架,可以把现在在 Spark 上运行的任务也迁移到 Flink 上做统一管控。其次,我们会在批流一体化落地方面持续探索,解决 Flink 处理批任务的性能问题和起调机制问题。流批一体降低了维护成本,防止数据不一致问题出现或数据丢失,同时让数据回溯更加稳定便捷。此外,我们还计划支持纯 SQL 模块,Spark SQL 和其他 SQL 迁移来的用户,面对平台中的配置文件的严格的形式会认为使用不够方便,因此我们将对纯 SQL 进行更好的支持。

- END

Apache SeaTunnel(Incubating) 是一个分布式、高性能、易扩展、用于海量数据(离线 &实时)同步和转化的数据集成平台。

仓库地址: 

https://github.com/apache/incubator-seatunnel

网址:

https://seatunnel.apache.org/

Proposal:

https://cwiki.apache.org/confluence/display/INCUBATOR/SeaTunnelProposal

Apache SeaTunnel(Incubating) 2.1.0 下载地址:

https://seatunnel.apache.org/download

衷心欢迎更多人加入!

能够进入 Apache 孵化器,SeaTunnel(原 Waterdrop) 新的路程才刚刚开始,但社区的发展壮大需要更多人的加入。我们相信,在「Community Over Code」(社区大于代码)、「Open and Cooperation」(开放协作)、「Meritocracy」(精英管理)、以及「多样性与共识决策」等 The Apache Way 的指引下,我们将迎来更加多元化和包容的社区生态,共建开源精神带来的技术进步!

我们诚邀各位有志于让本土开源立足全球的伙伴加入 SeaTunnel 贡献者大家庭,一起共建开源!

提交问题和建议:

https://github.com/apache/incubator-seatunnel/issues

贡献代码:

https://github.com/apache/incubator-seatunnel/pulls

订阅社区开发邮件列表 : 

dev-subscribe@seatunnel.apache.org

开发邮件列表:

dev@seatunnel.apache.org

加入 Slack:

https://join.slack.com/t/apacheseatunnel/shared_invite/zt-10u1eujlc-g4E~ppbinD0oKpGeoo_dAw

关注 Twitter: 

https://twitter.com/ASFSeaTunnel

  • 发表于:
  • 本文为 InfoQ 中文站特供稿件
  • 首发地址https://www.infoq.cn/article/9603355a26b045ac7e30465b9
  • 如有侵权,请联系 cloudcommunity@tencent.com 删除。

扫码

添加站长 进交流群

领取专属 10元无门槛券

私享最新 技术干货

扫码加入开发者社群
领券