"数据智能" (Data Intelligence) 有一个必须且基础的环节,就是数据仓库的建设,同时,数据仓库也是公司数据发展到一定规模后必然会提供的一种基础服务。从智能商业的角度来讲,数据的结果代
本文主要讲述知乎的实时数仓实践以及架构的演进,这包括以下几个方面
Spark Streaming
。Flink Streaming
。1.0 版本的实时数仓主要是对流量数据做实时 ETL,并不计算实时指标,也未建立起实时数仓体系,实时场景比较单一,对实时数据流的处理主要是为了提升数据平台的服务能力。实时数据的处理向上依赖数据的收集,向下关系到数据的查询和可视化,下图是实时数仓 1.0 版本的整体数据架构图。
第一部分是数据采集,由三端SDK采集数据并通过Log Collector Server 发送到Kafka。第二部分是数据ETL,主要完成对原始数据的清洗和加工并分实时和离线导入Druid。第三部分是数据可视化,由Druid负责计算指标并通过Web Server配合前端完成数据可视化。
其中第一、三部分的相关内容请分别参考:知乎客户端埋点流程、模型和平台技术,Druid与知乎数据分析平台,此处我们详细介绍第二部分。由于实时数据流的稳定性不如离线数据流,当实时流出现问题后需要离线数据重刷历史数据,因此实时处理部分我们采用了lambda架构。
Lambda架构有高容错、低延时和可扩展的特点,为了实现这一设计,我们将ETL工作分为两部分:Streaming ETL 和Batch ETL。
这一部分我会介绍实时计算框架的选择、数据正确性的保证、以及Streaming中一些通用的ETL逻辑,最后还会介绍Spark Streaming在实时ETL中的稳定性实践。
在2016年年初,业界用的比较多的实时计算框架有Storm和Spark Streaming。Storm是纯流式框架,Spark Streaming用Micro Batch 模拟流式计算,前者比后者更实时,后者比前者吞吐量大且生态系统更完善,考虑到知乎的日志量以及初期对实时性的要求,我们选择了Spark Streaming作为实时数据的处理框架。
Spark Streaming的端到端Exactly-once需要下游支持幂等、上游支持流量重放,这里我们在Spark Streaming这一层做到了At-least-once,正常情况下数据不重不少,但在程序重启时可能会重发部分数据,为了实现全局的Exactly-once,我们在下游做了去重逻辑,关于如何去重后面我会讲到。
ETL逻辑和埋点的数据结构息息相关,我们所有的埋点共用同一套 Proto Buffer Schema,大致如下所示。
message LogEntry {
optional BaseInfo base = 1;
optional DetailInfo detail = 2;
optional ExtraInfo extra = 3;
}
针对上述三种信息我们将ETL逻辑分为通用和非通用两类,通用逻辑和各个业务相关,主要应用于Base和Detail信息,非通用逻辑则是由需求方针对某次需求提出,主要应用于Extra信息。这里我们列举3个通用逻辑进行介绍,这包括:动态配置Streaming、UTM参数解析、新老用户识别。
由于Streaming任务需要7 * 24小时运行,但有些业务逻辑,比如:存在一个元数据信息中心,当这个元数据发生变化时,需要将这种变化映射到数据流上方便下游使用数据,这种变化可能需要停止Streaming任务以更新业务逻辑,但元数据变化的频率非常高,且在元数据变化后如何及时通知程序的维护者也很难。动态配置Streaming为我们提供了一个解决方案,该方案如下图所示。
我们可以把经常变化的元数据作为Streaming Broadcast变量,该变量扮演的角色类似于只读缓存,同时针对该变量可设置TTL,缓存过期后Executor节点会重新向Driver请求最新的变量。通过这种机制可以非常自然的将元数据的变化映射到数据流上,无需重启任务也无需通知程序的维护者。
UTM的全称是Urchin Tracking Module,是用于追踪网站流量来源的利器,关于UTM背景知识介绍可以参考网上其他内容,这里不再赘述。下图是我们解析UTM信息的完整逻辑。
流量数据通过 UTM 参数解析后,我们可以很容易满足以下需求
对于互联网公司而言,增长是一个永恒的话题,实时拿到新增用户量,对于增长运营十分重要。例如:一次投放n个渠道,如果能拿到每个渠道的实时新增用户数,就可以快速判断出那些渠道更有价值。我们用下图来表达Streaming ETL中是如何识别新老用户的。
判断一个用户是不是新用户,最简单的办法就是维护一个历史用户池,对每条日志判断该用户是否存在于用户池中。由于日志量巨大,为了不影响 Streaming任务的处理速度,我们设计了两层缓存:Thread Local Cache和Redis Cache,同时用HBase做持久化存储以保存历史用户。访问速度:本地内存 > 远端内存 > 远端磁盘,对于我们这个任务来说,只有1%左右的请求会打到HBase,日志高峰期26w/s,完全不会影响任务的实时性。当然本地缓存LruCache的容量大小和Redis的性能也是影响实时性的两个因素。
Streaming ETL除了上述几个通用场景外,还有一些其他逻辑,这些逻辑的存在有的是为了满足下游更方便的使用数据的需求,有的是对某些错误埋点的修复,总之Streaming ETL在整个实时数仓中处于指标计算的上游,有着不可替代的作用。
接下来要介绍的是 Lambda 架构的第二个部分:Batch ETL,此部分我们需要解决数据落地、离线 ETL、数据批量导入 Druid 等问题。针对数据落地我们自研了 map reduce 任务 Batch Loader,针对数据修复我们自研了离线任务 Repair ETL,离线修复逻辑和实时逻辑共用一套 ETL Lib,针对批量导入 ProtoParquet 数据到 Druid,我们扩展了 Druid 的导入插件。
数据架构图中有两个 Kafka,第一个 Kafka 存放的是原始日志,第二个 Kafka 存放的是实时 ETL 后的日志,我们将两个 Kafka 的数据全部落地,这样做的目的是为了保证数据链路的稳定性。因为实时 ETL 中有大量的业务逻辑,未知需求的逻辑也许会给整个流量数据带来安全隐患,而上游的 Log Collect Server 不存在任何业务逻辑只负责收发日志,相比之下第一个 Kafka 的数据要安全和稳定的多。Repair ETL 并不是经常启用,只有当实时 ETL 丢失数据或者出现逻辑错误时,才会启用该程序用于修复日志。
前面已经介绍过,我们所有的埋点共用同一套 Proto Buffer Schema,数据传输格式全部为二进制。我们自研了落地 Kafka PB 数据到 Hdfs 的 Map Reduce 任务 BatchLoader,该任务除了落地数据外,还负责对数据去重。在 Streaming ETL 阶段我们做到了 At-least-once,通过此处的BatchLoader 去重我们实现了全局 Exactly-once。BatchLoader 除了支持落地数据、对数据去重外,还支持多目录分区(p_date/p_hour/p_plaform/p_logtype)、数据回放、自依赖管理(早期没有统一的调度器)等。截止到目前,BatchLoader 落地了 40+ 的 Kakfa Topic 数据。
采用 Tranquility 实时导入 Druid,这种方式强制需要一个时间窗口,当上游数据延迟超过窗值后会丢弃窗口之外的数据,这种情况会导致实时报表出现指标错误。为了修复这种错误,我们通过 Druid 发起一个离线 Map Reduce 任务定期重导上一个时间段的数据。通过这里的 Batch 导入和前面的实时导入,实现了实时数仓的 Lambda 架构。
到目前为止我们已经介绍完 Lambda 架构实时数仓的几个模块,1.0 版本的实时数仓有以下几个不足
随着数据量的暴涨,Druid 中的流量数据源经常查询超时同时各业务消费实时数据的需求也开始增多,如果继续沿用实时数仓 1.0 架构,需要付出大量的额外成本。于是,在实时数仓 1.0 的基础上,我们建立起了实时数仓 2.0,梳理出了新的架构设计并开始着手建立实时数仓体系,新的架构如下图所示。
实时数仓 1.0 我们只对流量数据做 ETL 处理,在 2.0 版本中我们加入了对业务库的变更日志 Binlog 的处理,Binlog 日志在原始层为库级别或者 Mysql 实例级别,即:一个库或者实例的变更日志存放在同一个 Kafka Topic 中。同时随着公司业务的发展不断有新 App 产生,在原始层不仅采集「知乎」日志,像知乎极速版以及内部孵化项目的埋点数据也需要采集,不同 App 的埋点数据仍然使用同一套 PB Schema。
明细层是我们的 ETL 层,这一层数据是由原始层经过 Streaming ETL 后得到。其中对 Binlog 日志的处理主要是完成库或者实例日志到表日志的拆分,对流量日志主要是做一些通用 ETL 处理,由于我们使用的是同一套 PB 结构,对不同 App 数据处理的逻辑代码可以完全复用,这大大降低了我们的开发成本。
明细汇总层是由明细层通过 ETL 得到,主要以宽表形式存在。业务明细汇总是由业务事实明细表和维度表 Join 得到,流量明细汇总是由流量日志按业务线拆分和流量维度 Join 得到。流量按业务拆分后可以满足各业务实时消费的需求,我们在流量拆分这一块做到了自动化,下图演示了流量数据自动切分的过程。
不同于明细层和明细汇总层,指标汇总层需要将实时计算好的指标存储起来以供应用层使用。我们根据不同的场景选用了 HBase 和 Redis 作为实时指标的存储引擎。Redis 的场景主要是满足带 Update 操作且 OPS 较高的需求,例如:实时统计全站所有内容(问题、答案、文章等)的累计 PV 数,由于浏览内容产生大量的 PV 日志,可能高达几万或者几十万每秒,需要对每一条内容的 PV 进行实时累加,这种场景下选用 Redis 更为合适。HBase 的场景主要是满足高频 Append 操作、低频随机读取且指标列较多的需求,例如:每分钟统计一次所有内容的被点赞数、被关注数、被收藏数等指标,将每分钟聚合后的结果行 Append 到 HBase 并不会带来性能和存储量的问题,但这种情况下 Redis 在存储量上可能会出现瓶颈。
指标口径管理依赖指标系统,指标可视化依赖可视化系统,我们通过下图的需求开发过程来讲解如何将三者联系起来。
应用层主要是使用汇总层数据以满足业务需求。应用层主要分三块:
相比实时数仓 1.0 以 Spark Streaming 作为主要实现技术,在实时数仓 2.0 中,我们将 Flink 作为指标汇总层的主要计算框架。Flink 相比 Spark Streaming 有更明显的优势,主要体现在:低延迟、Exactly-once 语义支持、Streaming SQL 支持、状态管理、丰富的时间类型和窗口计算、CEP 支持等。 我们在实时数仓 2.0 中主要以 Flink 的 Streaming SQL 作为实现方案。使用 Streaming SQL 有以下优点:易于平台化、开发效率高、维度成本低等。目前 Streaming SQL 使用起来也有一些缺陷:1.语法和 Hive SQL 有一定区别,初使用时需要适应;2.UDF 不如 Hive 丰富,写 UDF 的频率高于 Hive。
从实时数仓 1.0 到 2.0,不管是数据架构还是技术方案,我们在深度和广度上都有了更多的积累。随着公司业务的快速发展以及新技术的诞生,实时数仓也会不断的迭代优化。短期可预见的我们会从以下方面进一步提升实时数仓的服务能力。