前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >使用 Apache Hudi + Daft + Streamlit 构建 Lakehouse 分析应用

使用 Apache Hudi + Daft + Streamlit 构建 Lakehouse 分析应用

作者头像
ApacheHudi
发布2024-05-20 16:12:20
790
发布2024-05-20 16:12:20
举报
文章被收录于专栏:ApacheHudiApacheHudi
构建面向用户的分析应用和仪表板对于希望使决策具有可操作性的组织至关重要。虽然传统的 BI 工具主要用于此类任务,但它们通常缺乏与 Python 生态系统的集成,需要专业技能。此外在分析生命周期的不同阶段(不仅仅是最后一公里),快速原型化和构建数据应用程序的需求变得至关重要,包括探索性数据分析、ML 模型评估、数据管道监控和LLM基于应用程序。

为了应对这些挑战,像 Streamlit[1] 这样的低代码工具作为 Python 生态系统的包装器,允许将 API、模型和业务逻辑变为现实。Streamlit 支持从数据库、API 和文件系统等各种来源轻松使用数据,从而轻松集成到应用程序中。在这篇博客中,我们将重点介绍如何使用直接来自开放湖仓一体平台的数据来构建数据应用。

开放湖仓一体平台

随着越来越多的组织过渡到使用开放表格式在数据湖上进行事务,湖仓一体架构越来越受欢迎。Apache Hudi 等开放式湖仓一体平台允许组织构建灵活的架构,使他们能够为其工作负载选择最佳计算引擎,而无需将数据锁定在专有存储格式中。

湖仓一体的核心是将传统数据库(如OLAP)的事务能力与数据湖的可扩展性和成本效益相结合。数据文件以可访问的开放表格式存储在基于云的对象存储(如 Amazon S3、Azure Blob 或 Google Cloud Storage)中,元数据由“表格式”组件管理。这种模块化方法创建了一个面向未来的架构,可以根据需要将新的计算引擎添加到堆栈中。例如可能会将 Hudi 与 Apache Flink 一起使用来构建低延迟管道,然后添加 Presto 或 Trino 或其他任何用于临时分析的内容。

Hudi + Daft 集成

需要注意的一件重要事情是,如今的湖仓一体平台主要是分布式的,以有效处理大规模、复杂和多样化的数据工作负载。这意味着您可能需要使用 Spark、JVM 和其他必要的配置来启动集群,以便与底层存储系统中存储的数据进行交互。然而,在单节点架构中直接使用来自湖仓一体的数据的需求正变得至关重要,尤其是在进行临时分析和构建分析应用程序时,这加快了洞察过程的时间。对于此类用例并不总是需要经历设置基础架构的繁琐过程。这就是基于 Python 的DataFrame(如 Daft[2])的用武之地。Daft 是一个分布式查询引擎,专为大规模 ETL、分析和 ML/AI 而设计。它提供了一个熟悉的 Python DataFrame API,旨在在性能和易用性方面超越 Spark。Daft 使用轻量级的多线程后端在本地运行。因此在本地开发环境中运行良好,但是当超出本地计算机的容量时,它可以转换为在分布式群集上运行。

最近发布的 Daft 引入了对读取 Apache Hudi Copy-on-Write (CoW) 表的支持。这意味着,用户现在可以使用纯 Python 直接从对象存储中使用 Hudi 表。Daft 的查询优化器还支持分区修剪和文件修剪(通过文件级统计信息)来跳过不相关的数据文件以返回更快的结果。

动手仪表板

这个动手示例的目的是展示如何使用 Daft 作为查询引擎来读取 Hudi 表,然后在 Python 中构建面向用户的分析应用程序。具体的数据集和用例不是本博客的主要关注点。相反,目标是展示 Hudi-Daft 集成的实际工作原理。在开始编写代码之前概述一个简约的湖仓一体架构,作为仪表板的基础。这也将介绍我们在本练习中使用的工具。这里使用的所有工具都是开源的。Amazon S3 采用即用即付模式,其成本基于存储和 API 使用情况。

架构:

  • • 数据湖存储:Amazon S3
  • • 文件格式 — CSV、Parquet
  • • 表格式 — Apache Hudi
  • • 计算引擎 — Apache Spark(写入)、Daft(读取)
  • • 用户界面 — Streamlit

要安装的库:Streamlit、Plotly、Daft、Pandas、boto3 我们将使用 Amazon S3 作为数据湖存储,在摄取作业完成后,所有数据文件都将安全地存储在其中。源数据将是一个 CSV 文件,在创建湖仓一体表时,我们将记录写入 Parquet。Apache Hudi 将用作表格式,Hudi 的湖仓一体平台(包括表服务(聚类、索引、文件大小等)将用于优化存储布局。

对于我们的计算需求,Apache Spark 将在引入阶段处理写入任务,而 Daft 将成为读取和分析的主要引擎,为这些操作提供优化的性能。最后我们将使用 Streamlit 使用直接来自湖仓一体的数据创建一个交互式仪表板。

本文档中的示例在 GitHub库[3]。

创建 Hudi 表和摄取记录

第一步是使用 Spark 创建 Hudi 表。以下是将 PySpark 与 Apache Hudi 一起使用所需的所有配置。如果数据湖中已有 Hudi 表,则这是一个可选步骤。请注意在实际环境中,如果是在分析层工作的数据分析师/科学家,则这些任务通常由数据工程团队处理。

代码语言:javascript
复制
from typing import *

from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark session
spark = SparkSession.builder \
.appName("Hudi Table") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.4-bundle_2.12:0.14.0,org.apache.hadoop:hadoop-aws:3.2.4,com.amazonaws:aws-java-sdk:1.12.262") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()

print("Spark Running")

s3_path = "s3a://my-bucket/sandbox/daft_hudi/"

# Access SparkContext
sc = spark.sparkContext

现在导入记录

代码语言:javascript
复制
# Read Source Data for Ingestion
TABLE_NAME = 'aldi_data'
INPUT = 's3a://my-bucket/input/retail/All_Data_Aldi.csv'
df_cow = spark.read.csv(INPUT, header=True, inferSchema=True)

# Minor Transformation
df_cow = df_cow.withColumnRenamed('prices_(£)', 'prices')
df_cow = df_cow.withColumnRenamed('prices_unit_(£)', 'prices_unit')

# Write the Records 
PATH = 's3a://my-bucket/sandbox/daft_hudi/'

hudi_options = {
        'hoodie.table.name': TABLE_NAME,
        'hoodie.table.keygenerator.class' : "org.apache.hudi.keygen.SimpleKeyGenerator",
        'hoodie.datasource.write.hive_style_partitioning' : "false",
  'hoodie.datasource.write.partitionpath.field' : "category" 
    }

spark.sql("DROP TABLE IF EXISTS " + TABLE_NAME)
df_cow.write.format("hudi").mode("overwrite").options(**hudi_options).mode("overwrite").save(PATH)

这将创建一个在 S3 数据湖中命名 aldi_data 的 Hudi 表,并将按 category 字段进行分区。

使用 Daft 读取 Hudi 表

现在我们已经将记录写入了 Hudi 表,我们应该可以开始使用 Daft 读取数据来构建我们的下游分析应用程序。如前所述,Daft 提供来自云数据湖的高性能 I/O 读取。

下面是代码片段展示了如何使用 Daft 的查询引擎读取 Hudi 表。

代码语言:javascript
复制
 df = daft.read_hudi("s3://my-bucket/sandbox/daft_hudi")
 df_analysis = df.select("supermarket", "prices", "names", "date", "own_brand", "category")
 df_analysis.collect()

让我们来了解一下 Daft API 中的方法。

  • • read_hudi() — 用于读取 Hudi 表。您可以在此处指定表位置 URI
  • • select() — 这将从提供的表达式创建一个新的数据帧(类似于 SQL SELECT)
  • • collect() — 此方法执行整个数据帧并将结果具体化

我们首先从之前引入记录的 S3 存储桶中读取 Hudi 表。接下来,我们使用该 select() 方法来选择分析所需的字段。由于 Daft DataFrame是惰性的,这意味着它们在明确指示之前不会计算结果,因此在这些操作之后不会立即显示结果。在此示例中,我们仅使用 Daft 来延迟读取数据和选择列的任务。实际上这种懒惰的方法允许 Daft 在执行查询之前更有效地优化查询。最后,我们可以告诉 Daft 执行 DataFrame 并使用 df_analysis.collect() 来缓存结果。需要注意的重要一点是,任何后续 df_analysis 操作都将避免重新计算,而只是利用这个具体化的结果。所有这些查询计划都可以通过调用该 explain() 方法进行检查。要查看 Daft 应用其优化后的计划,我们可以使用 explain(show_all=True) .这是我们的案例。

我们可以看到DataFrame的未优化、优化和物理计划。优化逻辑计划(突出显示)根据我们的查询显示投影下推。当这些查询引擎优化与 Hudi 的存储优化功能(如聚类、索引、文件大小等)相结合时,它们可以为处理大型数据集提供出色的性能。

构建 Streamlit 仪表板

截至目前,我们将 Hudi 表存储为 Daft 数据帧 df_analysis 。为了构建仪表板,我们将使用基于 Python 的库的组合,包括 Pandas 和 Plotly Charts,以及 Daft。虽然现在的可视化库主要与 Pandas 一起工作——因此它被用于这个特定目的——但我们将在受益于其性能优化功能的场景中使用 Daft。

例如,仪表板中的某些图表需要聚合值(例如每个类别的产品品种)。在这些情况下,我们不是在 Pandas 中执行聚合,而是利用 Daft 的功能先聚合数据,然后将结果传递到可视化库。事实证明,此方法在处理非常大的数据集时特别有效,这在湖仓一体工作负载中很常见。下面是一个显示相同内容的片段。

代码语言:javascript
复制
distinct_names_per_category = df_analysis.select("category", "names").distinct()
category_diversity_daft = distinct_names_per_category.groupby("category").agg(
    daft.col('names').count()
).to_pandas()
category_diversity_daft.columns = ['Category', 'Number of Unique Products']

我们首先从数据框中选择不同的名称和类别,然后按类别分组,并计算每个类别中的唯一产品名称。然后将结果转换为 Pandas 数据帧,以便与可视化图表一起使用。从仪表板的设计角度来看,我们将有四个图表来回答一些业务问题,以及一个过滤器来分析 category 数据。

代码语言:javascript
复制
# Charts 1 & 2
col1, col2 = st.columns(2, gap="large")
with col1:
    st.subheader('Price Distribution by Category')
    fig1 = px.box(df_filtered, x='category', y='prices', title='Price Distribution by Category', color_discrete_sequence=modern_mint)
    st.plotly_chart(fig1, use_container_width=True)

with col2:
    st.subheader('Product Variety per Category')
    filtered_category_diversity = category_diversity_pandas[category_diversity_pandas['Category'].isin(df_filtered['category'].unique())]
    fig_category_diversity = px.bar(filtered_category_diversity, x='Category', y='Number of Unique Products', title='Product Variety per Category', color_discrete_sequence=modern_mint)
    st.plotly_chart(fig_category_diversity, use_container_width=True)

# Charts 3 & 4
col3, col4 = st.columns(2, gap="large")
with col3:
    st.subheader('Share of Own Brand Products')
    own_brand_count = df_filtered['own_brand'].value_counts(normalize=True).reset_index(name='proportion')
    fig4 = px.pie(own_brand_count, values='proportion', names='own_brand', title='Share of Own Brand Products', labels={'own_brand': 'Brand Type'}, color_discrete_sequence=modern_mint)
    st.plotly_chart(fig4, use_container_width=True)

with col4:
    st.subheader('Average Price by Brand Type and Category')
    # Group by both 'own_brand' and 'category' for a more detailed breakdown
    brand_category_price_comparison = df_filtered.groupby(['own_brand', 'category'])['prices'].mean().unstack().fillna(0)
    fig5 = px.bar(brand_category_price_comparison, title='Average Price by Brand Type and Category', color_discrete_sequence=modern_mint, labels={'value':'Average Price', 'variable':'Category'})
    fig5.update_layout(barmode='stack')
    st.plotly_chart(fig5, use_container_width=True)

将所有这些放在一起,这是准备进行分析的最终应用程序。

仪表板还允许根据 category 筛选器筛选数据集并呈现相关可视化效果。这允许用户进行更精细的分析。

结论和未来工作

直接在开放式湖仓一体上构建仪表板具有多项优势。

  • • 更快的洞察:直接访问湖仓一体可加快洞察过程,确保分析及时且相关。
  • • 减少数据冗余:传统报告通常涉及跨多个系统(BI 的湖泊到仓库)移动数据,这可能会导致数据的大量副本和版本。通过支持直接访问数据的开放数据架构可以避免这种情况。
  • • 成本效益:使用开放式湖仓一体架构可消除对复杂 ETL 管道和频繁数据提取的需求,从而降低成本,而 Amazon S3 等云存储允许根据需要进行扩展。

在这篇博客中,我们介绍了如何使用 Daft 等高性能查询引擎在 Apache Hudi 等开放湖仓一体平台上快速无缝地构建面向用户的分析应用程序。这标志着我们第一次使用纯 Python 处理 Hudi 表,而无需在基于 Java 的环境中设置 Spark。Daft 的集成提供了熟悉的 Python API,同时提供了卓越的性能,为在 Hudi 上运行分析工作负载开辟了有趣的途径,而无需像 Spark 这样的分布式计算。我们在不久的将来正在研究的一些项目是:

  • • 支持写入时复制表的增量查询[4]
  • • 对 v1.0[5] 表格式的读取支持
  • • 读时合并表[6]的读取支持(快照)
  • • Hudi 写支持[7]
引用链接

[1] Streamlit: [https://streamlit.io/](https://streamlit.io/) [2] Daft: [https://www.getdaft.io/](https://www.getdaft.io/) [3] GitHub库: [https://github.com/dipankarmazumdar/DaftHudi](https://github.com/dipankarmazumdar/DaftHudi) [4] 增量查询: [https://github.com/Eventual-Inc/Daft/issues/2153](https://github.com/Eventual-Inc/Daft/issues/2153) [5] v1.0: [https://github.com/Eventual-Inc/Daft/issues/2152](https://github.com/Eventual-Inc/Daft/issues/2152) [6] 读时合并表: [https://github.com/Eventual-Inc/Daft/issues/2154](https://github.com/Eventual-Inc/Daft/issues/2154) [7] Hudi 写支持: [https://github.com/Eventual-Inc/Daft/issues/2155](https://github.com/Eventual-Inc/Daft/issues/2155)

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-05-12,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 ApacheHudi 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 开放湖仓一体平台
  • Hudi + Daft 集成
  • 动手仪表板
    • 创建 Hudi 表和摄取记录
      • 使用 Daft 读取 Hudi 表
        • 构建 Streamlit 仪表板
        • 结论和未来工作
        相关产品与服务
        对象存储
        对象存储(Cloud Object Storage,COS)是由腾讯云推出的无目录层次结构、无数据格式限制,可容纳海量数据且支持 HTTP/HTTPS 协议访问的分布式存储服务。腾讯云 COS 的存储桶空间无容量上限,无需分区管理,适用于 CDN 数据分发、数据万象处理或大数据计算与分析的数据湖等多种场景。
        领券
        问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档