Hudi 不依赖任何外部第三方服务(如 Zookeeper),因此易于操作。一切都是独立的,并且不存在必须长期运行的服务器组件。启动一个 Spark 集群,摄取一批数据,一切都完全关闭(如果摄取模式是批处理)。但有时,拥有中央服务可能有助于提高表操作效率。因此 Hudi 有一个中央时间线服务器,它与 Driver 程序节点中的主线程一起运行,以协助定期写入和表服务。本文介绍时间线服务器的内容、它解决什么问题以及它如何使一些核心 Hudi 操作受益。
如简介中所示,Hudi 有一个中央时间线服务器,在驱动程序节点中运行并作为 Rest 服务。它有多种好处,第一个用例是提供 FileSystemView api。
Hudi 的核心是维护一个 TableFileSystemView,它暴露 API 来获取给定数据集的文件状态,驱动程序和执行程序将在写入和表服务生命周期的不同时间点查询该状态。中央时间线服务器维护一个缓存的 FSView,每个 Spark 任务都可以轮询该 FSView,从而避免每个 Spark 任务自己加载 FSView,这些 API 响应延迟非常低。如果没有这些API,每个执行器或 Spark 任务可能必须自己构建 FSview,这将导致过多的重复工作,从而影响延迟。
第二个用例是标记(Marker) 实现。Hudi 维护标记来区分 Spark 任务写入的最终数据文件集与由于 Spark 重试而创建的文件。第一个实现是直接标记,实现简单,但在非常大的范围内,我们发现删除标记花费了太多时间。因此我们引入了基于时间线服务器的标记来解决延迟问题。使用基于时间线服务器的标记,删除延迟仅为几秒钟,而在某些情况下使用直接标记需要 30 多分钟。
Hudi 的核心是维护一个 TableFileSystemView,它暴露 API 来获取给定数据集的文件状态,驱动程序和执行程序将在写入和表服务生命周期的不同时间点查询该状态。一些众所周知的 API 包括:获取所有文件组的最新基本文件、获取给定分区的最新文件切片、获取最新的合并文件切片(在压缩正在进行时有用)、获取最新的挂起压缩操作、获取替换的文件组 (Clustering和其他替换提交操作)等。
TableFileSystemview 有不同的实现,例如内存中的 FSView、基于元数据的 FSview、RockDBBased FSView、基于可溢出映射的 FSView 等。最常见的是内存和元数据 TableFileSystemView。内存表文件系统视图使用文件系统列表查询 Hudi 时间线和数据文件,并填充服务这些 api 所需的所有内部数据结构。基于元数据的文件系统视图使用元数据表而不是直接文件系统列表。所有这些 FSview 都有内置缓存,这意味着一旦为给定分区加载文件组,后续调用就可以从内存数据结构本身提供服务,而不会产生额外的 I/O。但是所有填充的数据结构(缓存)都必须在时间线发生新更改时(新提交完成时)重新加载,这不可避免。因此来自中央时间线服务器的缓存 FSView 通过减少延迟为我们提供了相当高的价值。
先看看在没有时间轴服务器的情况下事情会如何发展。假设我们正在操作一个由 1000 个分区组成的表,每个分区有 100 个文件组。会调用如下API
getLatestBaseFile(String partition, String fileID)
5000 个随机文件组。因此 5000 个随机文件组可以分布在不同的分区上。最简单的选择是在驱动程序本身中执行所有内容。但是整个执行将是串行的,这可能会非常慢。换句话说我们可以在 for 循环中以单线程方式获取 5000 个文件组的最新基本文件,而不利用集群资源。让我们利用 Spark 并行执行来实现,因此驱动程序中的典型调用可能如下所示
engineContext.parallelelize(partitionFileIdPairs)
.map(partitionFileIdPair -> {
Build FileSystemView for partition of interest
Fetch latest base file for the fileId of interest and return
}).collect(Collectors.toList())
由于我们利用 Spark 的并行执行,因此与在驱动程序中执行所有操作相比,这应该会加快速度。但我们确实还有进一步优化的空间。由于我们对分布在 1000 个分区中的 5000 个文件组感兴趣,因此大致对每个分区中的 50 个文件组感兴趣。整个调用中最昂贵的操作是构建 FileSystemView。根据上面的 DAG,我们正在 50 个 Spark 任务(与 50 个文件组相关)中为给定分区构建 FileSystemView。
位于中心时间轴服务器在消除实例化 FileSystemView 过程中不必要的延迟方面发挥着至关重要的作用。时间线服务器是一个Rest服务,它在同一节点中运行,并在单独的线程中与驱动程序一起处理。所有 FileSystemView 调用都将由该时间线服务器通过 Rest 调用提供服务。执行器会将 FSview 调用路由到位于中心的时间线服务器并返回结果。由于我们还内置了一个缓存层,因此它们往往非常高效,并且避免了 FSview 的重复实例化以及不必要的 I/O。
继续获取 500 个文件组的最新基本文件的示例。当时间线服务器运行并将存储布局设置为 RemoteFileSystemView 时,它可能如下所示。
engineContext.parallelelize(partitionFileIdPairs)
.map(partitionFileIdPair -> {
Make remote request to Timeline server for latest base file for fileId and partition of interest
Return the value obtained as the response
}).collect(Collectors.toList())
在并行线程中,在时间线服务器(REST SERVICE)内:
Receives the request from 5000 spark tasks.
Serves FSview calls for partition of interest if already loaded and cached.
If not, loads the FS view for the given partition of interest.
Serves the response based on the populated data-structures.
正如我们所看到的通过将调用路由到中央时间线服务器来优化对 FSView api 的调用,该服务器提供来自缓存视图的响应。
getLatestBaseFile()
只是一个说明。与此类似大多数 FS 视图调用都会路由到中央时间线服务器,并由缓存的 FS 视图提供服务。
Hudi为每个数据表都有一个元数据表,用于缓存表中的文件列表。如果启用的话,FSview 也可以从元数据表构建。在这种情况下 FS 视图的实例化基于元数据表的 FILES 分区中的数据。这里我们讨论的是时间轴服务器中使用的 FSview 实现。
我们已经确定了一些调用(例如清理器),其中每个分区都将跨所有 Spark 任务加载,因此我们添加了优化以尽可能使用对元数据表的单个调用来预加载所有分区。当表有 1000 个或更多分区时,这会极大地加快 FS 视图调用延迟。
时间线服务器也用作另一个标记实现,在此之前我们有执行器直接操作的直接标记,我们将在其他博客中讨论这个主题。如果感兴趣可以点击此链接讨论基于时间线服务器的标记。
通常操作 Hudi 不需要任何像 Zookeeper 这样需要单独维护的集中运行服务器。在某种程度上时间线服务器是驱动程序节点中长时间运行的服务,用于避免不必要的 I/O,并通过缓存层为 FSview 调用提供服务。