前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >基于 XTable 的 Dremio Lakehouse分析

基于 XTable 的 Dremio Lakehouse分析

作者头像
ApacheHudi
发布2024-06-08 08:44:10
910
发布2024-06-08 08:44:10
举报
文章被收录于专栏:ApacheHudiApacheHudi
如今,数据湖仓一体架构正变得无处不在,组织越来越多地采用开放表格式,例如 Apache Hudi、Apache Iceberg 和 Delta Lake 用于其数据平台。这些格式有助于建立开放、灵活的架构基础,使组织能够为其特定工作负载选择最合适的计算引擎,而无需将数据锁定在专有数据库或数据仓库的存储格式中。

这种开放性和灵活性的方法使数据存储和使用方式发生了转变。如今,客户可以选择在云对象存储(如 Amazon S3、Microsoft Azure Blob Storage或 Google Cloud Storage)中以开放表格式存储数据。数据由数据所有者全资拥有和管理,并保存在其安全的 Virtual Private Cloud (VPC) 帐户中。用户可以为其工作负载提供正确类型的查询引擎,而无需复制数据。这创建了一个面向未来的架构,可以在需要时将新工具添加到技术栈中。

尽管有这些优点,但仍存在一个障碍:需要选择单一表格格式,这带来了重大挑战,因为每种格式都具有独特的功能和集成优势。此外对于较新的工作负载,组织要求格式完全可互操作,因此数据是普遍可查询的。如果没有互操作性,组织就会被绑定到单一格式,迫使他们处理一次性迁移策略或制作完整的数据副本(通常经常)以使用其他格式。

Apache XTable(孵化)项目[1]是去年启动的一项开源计划,通过关注这些不同湖仓一体表格式之间的互操作性来应对这一挑战。XTable 充当轻量级转换层,允许在源表和目标表格式之间无缝转换元数据,而无需重写或复制实际数据文件。因此无论写入数据的初始表格式选择如何,都可以使用选择的首选格式和计算引擎来读取数据。

在这篇博客中,我们将介绍一个假设但实际的场景,该场景在当今组织内的分析工作负载中变得越来越频繁。

场景

此方案从两个分析团队开始,该团队是组织中市场分析组的一部分。这些团队负责分析各种超市产品的市场趋势和消费者偏好。他们的大部分数据都位于 S3 数据湖中。对于这个特定的练习,我们使用了来自 Kaggle[2] 的公开数据。

团队 A:使用 Apache Hudi 作为 Spark 的表格式

团队 A 使用 Apache Hudi 来管理一些最关键的低延迟数据管道。Hudi 的优势在于它能够支持增量数据处理,在数据湖中提供更快的更新和删除。此外,Hudi 中强大的索引[3]和自动表管理功能[4]使团队 A 能够在其数据摄取过程中保持高水平的效率和性能,主要通过 Apache Spark 执行。此 Hudi 表包含特定时期内在“Tesco”中发生的销售数据。

团队 B:使用 Dremio 和 Iceberg 进行分析

另一方面,Team B 专注于临时分析、BI 和报告,利用 Dremio 强大的计算引擎和 Apache Iceberg 表的可靠性。Iceberg 的功能(如隐藏分区[5]和数据版本控制)与 Dremio 的分析工作负载查询加速功能无缝配对。这种组合使团队 B 能够执行复杂的分析,并轻松高效地生成 BI 报告。B组将超市“Aldi”的销售数据存储为Iceberg表。

挑战:统一Hudi和Iceberg表的数据

为了对组织中的特殊营销活动进行详细的比较分析,B 团队希望了解“Tesco”和“Aldi”超市的品类产品销售情况。为此团队 B 希望使用团队 A 生成的数据集(存储为 Hudi 表)并将其与他们的数据集(Iceberg 表)相结合。鉴于他们使用 Dremio 作为分析和报告的计算引擎,这在传统上会构成重大障碍,因为 Dremio 本身不支持 Hudi 表。

解决方案:Apache XTable

在诸如此类的场景中,Apache XTable 提供了一个简单的解决方案,使团队 B 能够处理这个问题。使用 XTable,团队 B 将源 Hudi 表(“Tesco”数据)公开为 Iceberg 表。这是通过将元数据从 Hudi 转换为 Iceberg 来实现的,而无需重写或复制实际数据。此转换过程非常高效,并利用相同的 S3 存储桶来存储目标表的已翻译元数据。

一旦团队 A 数据 (Hudi) 呈现为 Iceberg 表,团队 B 就可以处理数据,就像它最初是以 Iceberg 格式编写的一样。他们可以利用 Dremio 计算的联接和联合等操作,使用来自两个团队的数据创建一个新数据集。通过 XTable,无需进行成本高昂的数据重写或繁琐的迁移工作,从而可以进行快速分析。借助 XTable,数据更加普遍可用,使组织能够无缝地使用多种表格格式。现在我们已经对 Apache XTable 提供的问题陈述和解决方案有了深入的了解,现在让我们深入了解实际方面,看看互操作性在上述场景中是如何工作的。

动手实践用例

团队A

团队 A 使用 Apache Spark 将“Tesco”超市的销售数据摄取到存储在 S3 数据湖中的 Hudi 表中。让我们从创建 Hudi 表开始。以下是将 PySpark 与 Apache Hudi 一起使用所需的所有配置。

代码语言:javascript
复制
from typing import *

from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
.appName("Hudi Table") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.2.4,com.amazonaws:aws-java-sdk:1.12.262") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()

print("Spark Running")

s3_path = "s3a://diplakehouse/hudi_tables/"

# Access SparkContext
sc = spark.sparkContext

spark.sql(
    """CREATE TABLE retail_data
        (supermarket STRING, prices STRING, prices_unit STRING, unit STRING, names STRING, date STRING,
        category STRING, own_brand STRING)
       USING HUDI 
       LOCATION 's3a://diplakehouse/hudi_tables/'"""
);

现在我们将销售记录引入 Hudi 表。

代码语言:javascript
复制
spark.sql(
    """CREATE OR REPLACE TEMPORARY VIEW retail_temp USING csv
            OPTIONS (path "Dataset/All_Data_Tesco.csv", header true)"""
)

spark.sql("INSERT INTO retail_data SELECT * FROM retail_temp")

让我们快速检查一下 S3 文件系统中的 Hudi 表文件。

下面是数据(使用 Spark SQL 查询)。

团队B

接下来,使用 Spark 执行“Aldi”超市的摄取,数据集作为 Iceberg 表 (retail_ice) 存储在 S3 数据湖中。此步骤模拟数据工程团队负责数据准备和引入的典型工作流。

如果要使用本地 Spark 和 Dremio 环境来试用此用例,请按照此存储库中的说明创建本地湖仓一体环境。

我们首先使用 PySpark 和 Hadoop 目录配置 Apache Iceberg,并创建 Iceberg 表。

代码语言:javascript
复制
import pyspark
from pyspark.sql import SparkSession
import os
conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.4.3,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        .set('spark.sql.catalog.hdfs_catalog', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.hdfs_catalog.type', 'hadoop')
        .set('spark.sql.catalog.hdfs_catalog.warehouse', 's3a://diplakehouse/iceberg_new/')
        .set('spark.sql.catalog.hdfs_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
)
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

spark.sql(
    """CREATE TABLE IF NOT EXISTS hdfs_catalog.retail_ice
            (supermarket STRING, prices STRING, prices_unit STRING, unit STRING, names STRING, date STRING, category STRING, own_brand STRING) USING iceberg"""
)

然后摄取“Aldi”的销售数据。

代码语言:javascript
复制
spark.sql(
    """CREATE OR REPLACE TEMPORARY VIEW salesview USING csv
            OPTIONS (path "file:/home/docker/All_Data_Aldi.csv", header true)"""
)
spark.sql("INSERT INTO hdfs_catalog.retail_ice SELECT * FROM salesview")

在S3数据湖中将数据写入Iceberg表后,数据分析师可以使用Dremio的湖仓一体平台连接到湖并开始查询数据。

下面是一个简单的查询

将Hudi dataset ('Tesco')转化为Iceberg

因此,由于两个团队的数据都以两种不同的表格式存储,我们现在引入 Apache XTable 来解决互操作性挑战。

XTable 将用于将元数据从 Hudi 表(“Tesco”)转换为 Iceberg 格式,从而使数据能够使用 B 团队端的 Dremio 以 Iceberg 格式访问和查询。这不会修改或复制原始数据集的 Parquet 基础文件。

从 Apache XTable 开始,我们将首先将 GitHub[6] 存储库克隆到本地环境,并使用 Maven 编译必要的 jar。以下命令启动生成:

代码语言:javascript
复制
mvn clean package

有关安装的更多详细信息,请遵循官方文档[7]。

构建完成成功后,我们将使用utilities-0.1.0-SNAPSHOT-bundled.jar启动元数据转换过程。

下一步是在我们克隆的 XTable 目录中设置一个配置文件 my_config.yaml,以定义翻译详细信息。配置应如下所示:

代码语言:javascript
复制
sourceFormat: HUDI
targetFormats:
  - ICEBERG
datasets:
  - tableBasePath: s3://diplakehouse/hudi_tables/
    tableName: retail_data

该配置概述了源格式 (Hudi)、目标格式 (Iceberg) 和表特定的详细信息:S3 中的基本路径和表名称。为了启动翻译过程,我们将执行以下命令。

代码语言:javascript
复制

java -jar utilities/target/utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml

同步过程成功完成后,我们将看到输出,如下面的代码片段所示。

如果我们现在检查 S3 位置路径,我们将看到 Iceberg 元数据文件,其中包括架构定义、提交历史记录、分区信息和列统计信息等详细信息。这是 S3 中的元数据文件夹。正如我们所看到的,Iceberg 元数据是同一个 /hudi_tables 目录的一部分。

现在原始的 Hudi 表(“Tesco”数据集)已转换为 S3 数据湖中的 Iceberg 表,我们可以无缝地使用 Dremio 的计算引擎来查询数据并执行进一步的操作。

如果没有像 Apache XTable 这样的轻量级翻译层,从 Dremio 访问 Hudi 表将不简单。替代方案将涉及繁琐的迁移过程、昂贵的数据重写以及历史数据版本的潜在丢失。

让我们继续从 Dremio 查询这个新数据集。

现在在下一部分中,团队 B 希望将两个数据集(“Tesco”和“Aldi”)组合到一个视图中,并使用这些数据构建 BI 报告。我们将在这两个表上使用一个简单的 UNION,如下所示,以实现此目的。

Dremio 还允许将其保存为环境中特定空间(图层)中的视图,以便特定团队可以使用。我们会将合并后的数据集另存为 Universal_dataset_superstore。

因此这个组合数据集(Hudi翻译和Iceberg原生表)现在将由B团队用于对“Tesco”和“Aldi”超市进行类别产品销售分析。为此分析师可以使用 Dremio 中的“分析方式”按钮,使用这个新的组合数据集在 Tableau 中构建 BI 报表。

下面是 Tableau 中的最终报告,它集成了来自两种不同表格格式的数据集,以执行按类别的产品销售分析。“Tesco”数据(绿色)最初以 Hudi 格式存储,现在使用 XTable 转换为 Iceberg。“Aldi”数据(黄色)原生存储为 Iceberg 表。

这个用例强调了 XTable 的转换功能带来的好处。B 团队的分析师能够像处理Iceberg表一样处理 Tesco 的数据,而无需在分析过程中进行任何更改。XTable 提供的灵活性使 Dremio 能够读取和执行 Tesco 数据集的分析,而与原生 Iceberg 格式没有任何区别。开放格式之间的互操作能力可以节省资金,提高性能,简化分析工作流程,并确保数据普遍可访问。

引用链接

[1] Apache XTable(孵化)项目: [https://github.com/apache/incubator-xtable](https://github.com/apache/incubator-xtable) [2] Kaggle: [https://www.kaggle.com/datasets/willianoliveiragibin/retail-analytics-trends/data](https://www.kaggle.com/datasets/willianoliveiragibin/retail-analytics-trends/data) [3] 索引: [https://hudi.apache.org/docs/next/indexing](https://hudi.apache.org/docs/next/indexing) [4] 自动表管理功能: [https://hudi.apache.org/docs/next/clustering](https://hudi.apache.org/docs/next/clustering) [5] 隐藏分区: [https://www.dremio.com/blog/fewer-accidental-full-table-scans-brought-to-you-by-apache-icebergs-hidden-partitioning/](https://www.dremio.com/blog/fewer-accidental-full-table-scans-brought-to-you-by-apache-icebergs-hidden-partitioning/) [6] GitHub: [https://github.com/apache/incubator-xtable](https://github.com/apache/incubator-xtable) [7] 官方文档: [https://xtable.apache.org/docs/setup/](https://xtable.apache.org/docs/setup/)

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-06-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 ApacheHudi 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 场景
    • 团队 A:使用 Apache Hudi 作为 Spark 的表格式
      • 团队 B:使用 Dremio 和 Iceberg 进行分析
        • 挑战:统一Hudi和Iceberg表的数据
          • 解决方案:Apache XTable
          • 动手实践用例
            • 团队A
              • 团队B
                • 将Hudi dataset ('Tesco')转化为Iceberg
                相关产品与服务
                对象存储
                对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档