前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >如何在 CDP 的湖仓一体中使用Iceberg

如何在 CDP 的湖仓一体中使用Iceberg

作者头像
大数据杂货铺
发布2022-12-02 21:09:39
1.2K0
发布2022-12-02 21:09:39
举报
文章被收录于专栏:大数据杂货铺大数据杂货铺

2022 年 6 月,Cloudera宣布在 Cloudera 数据平台 (CDP) 中全面推出 Apache Iceberg。Iceberg 是一种 100% 开放表格式,由Apache Software Foundation开发,可帮助用户避免供应商锁定并实现开放式 Lakehouse。

普遍可用性涵盖了在 CDP 中的一些关键数据服务中运行的 Iceberg,包括 Cloudera 数据仓库 ( CDW )、Cloudera 数据工程 ( CDE ) 和 Cloudera 机器学习 ( CML )。这些连接使分析师和数据科学家能够通过他们选择的工具和引擎轻松地就相同的数据进行协作。不再有锁定、不必要的数据转换或跨工具和云的数据移动,只是为了从数据中提取洞察力。

使用 CDP 中的 Iceberg,您可以从以下主要功能中受益:

  • CDE 和 CDW 支持 Apache Iceberg:分别按照 Spark ETL 和 Impala 商业智能模式在 CDE 和 CDW 中运行查询。
  • 探索性数据科学和可视化: 通过 CML 项目中自动发现的 CDW 连接访问 Iceberg 表。
  • 丰富的 SQL(查询、DDL、DML)命令集:使用为 CDW 和 CDE 开发的 SQL 命令创建或操作数据库对象、运行查询、加载和修改数据、执行时间旅行操作以及将 Hive 外部表转换为 Iceberg 表。
  • 时间旅行:重现给定时间或快照ID的查询,例如可用于历史审计和错误操作的回滚。
  • 就地表(架构、分区)演进:演进 Iceberg 表架构和分区布局,而不会造成代价高昂的干扰,例如重写表数据或迁移到新表。
  • SDX 集成 (Ranger):通过 Apache Ranger 管理对 Iceberg 表的访问。

在这篇由两部分组成的博客文章中,我们将向您展示如何在 CDP 中使用 Iceberg 来构建一个开放的湖仓,并利用从数据工程到数据仓库再到机器学习的 CDP 计算服务。

在第一部分中,我们将重点介绍如何在 CDP 中使用 Apache Iceberg 构建开放式湖屋;使用 CDE 摄取和转换数据;并利用时间旅行、分区演变和对 Cloudera 数据仓库上的 SQL 和 BI 工作负载的访问控制。

解决方案概述

先决条件:

应提供以下 CDP 公共云 (AWS) 数据服务:

  • Cloudera 数据仓库 Impala 虚拟仓库
  • 启用 Airflow 的 Cloudera 数据工程 (Spark 3)
  • Cloudera 机器学习

使用 CDE 将数据加载到 Iceberg 表中

我们首先在 CDE 中创建 Spark 3虚拟集群(VC)。为了控制成本,我们可以调整虚拟集群的配额并使用 Spot 实例。此外,选择启用 Iceberg 分析表的选项可确保 VC 具有与 Iceberg 表交互所需的库。

几分钟后,VC 将启动并运行,准备好部署新的 Spark 作业。

由于我们将使用 Spark 执行一系列表操作,因此我们将使用 Airflow 来编排这些操作的管道。

第一步是加载我们的 Iceberg 表。除了直接使用新数据创建和加载 Iceberg 表之外,CDP 还提供了一些其他选项。您可以导入或迁移现有的外部 Hive 表。

  • 导入使源和目标保持完整和独立。
  • 迁移会将表转换为 Iceberg 表。

在这里,我们只是将现有的航班表导入到我们航空公司的 Iceberg 数据库表中。

代码语言:javascript
复制
from pyspark.sql import SparkSession
import sys

spark = SparkSession \
    .builder \
    .appName("Iceberg prepare tables") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog")\
    .config("spark.sql.catalog.spark_catalog.type", "hive")\
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")\
    .getOrCreate()

spark.sql("""CALL spark_catalog.system.snapshot('airlines_csv.flights_external', \
          'airlines_iceberg.flights_v3')""")

我们导入的航班表现在包含与现有外部 hive 表相同的数据,我们可以快速检查按年份的行数以确认:

代码语言:javascript
复制
year _c1
1 2008 7009728
2 2007 7453215
3 2006 7141922
4 2005 7140596
5 2004 7129270
6 2003 6488540
7 2002 5271359
8 2001 5967780
9 2000 5683047
…
…

就地分区演变

接下来,最常见的数据管理任务之一是修改表的模式。如果它是非分区列,通常这很容易执行。但是如果分区方案需要更改,您通常必须从头开始重新创建表。在 Iceberg 中,这些表管理操作可以以最少的返工来应用,从而减轻数据从业人员在改进表以更好地满足业务需求时的负担。

在管道的第二阶段,我们使用一行代码更改分区方案以包含年份列!

代码语言:javascript
复制
print(f"Alter partition scheme using year \n")
spark.sql("""ALTER TABLE airlines_iceberg.flights_v3 \
ADD PARTITION FIELD year""")

When describing the table we can see “year” is now a partition column:

…
# Partition Transform Information
# col_name transform_type
year IDENTITY

…

在 ETL 管道的最后阶段,我们将新数据加载到此分区中。让我们看一下如何使用 Impala 来利用这个 Iceberg 表来运行交互式 BI 查询。

将 CDW 与 Iceberg 一起使用

时间旅行

现在我们已经将数据加载到 Iceberg 表中,让我们使用 Impala 来查询表。首先,我们将在 CDW 中打开 Hue 并访问我们刚刚在 CDE 中使用 Spark 创建的表。转到 CDW 并在 Impala 虚拟仓库中打开 Hue。

首先我们检查表的历史并查看:

代码语言:javascript
复制
DESCRIBE HISTORY flight_v3 ;

示例结果:

creation_time

snapshot_id

parent_id

is_current_ancestor

2022-07-20 09:38:27.421000000

7445571238522489274

NULL

TRUE

2022-07-20 09:41:24.610000000

1177059607967180436

7445571238522489274

TRUE

2022-07-20 09:50:16.592000000

2140091152014174701

1177059607967180436

TRUE

现在我们可以使用时间戳和snapshot_id在不同的时间点查询表来查看结果,如下所示。

代码语言:javascript
复制
select year, count(*) from flights_v3
FOR SYSTEM_VERSION AS OF 7445571238522489274
group by year
order by year desc;

数数(*)

2005年

7140596

2004年

7129270

2003年

6488540

2002年

5271359

2001年

5967780

2000

5683047

1999

5527884

1998

5384721

1997

5411843

1996

5351983

1995

5327435

我们看到,截至第一个快照 ( 7445571238522489274),我们在表中拥有 1995 年至 2005 年的数据。让我们看看第二个快照的数据:

代码语言:javascript
复制
select year, count(*) from flights_v3
FOR SYSTEM_VERSION AS OF 1177059607967180436
group by year
order by year desc;

数数(*)

2006年

7141922

2005年

7140596

2004年

7129270

2003年

6488540

2002年

5271359

2001年

5967780

2000

5683047

1999

5527884

1998

5384721

1997

5411843

1996

5351983

1995

5327435

现在我们在表中也有截至 2006 年的数据。使用“FOR SYSTEM_VERSION AS OF <snapshot id>”,您可以查询旧数据。您还可以使用“FOR SYSTEM_TIME AS OF <timestamp>”来使用时间戳。

就地分区演变

除了 CDE (Spark) 的就地分区演化功能外,您还可以使用 CDW (Impala) 执行就地分区演化。首先,我们将使用show create table命令检查表的当前分区,如下所示:

代码语言:javascript
复制
SHOW CREATE TABLE flights_v3;

我们看到该表是按年份列分区的。我们可以将表的分区方案从按年分区更改为按年和月列分区。将新数据加载到表中后,所有后续查询都将受益于月列和年列的分区修剪。

代码语言:javascript
复制
ALTER TABLE flights_v3 SET PARTITION spec (year, month);
SHOW CREATE TABLE flights_v3;


CREATE EXTERNAL TABLE flights_v3 (   month INT NULL,   dayofmonth INT NULL,   dayofweek INT NULL,   deptime INT NULL,   crsdeptime INT NULL,   arrtime INT NULL,   crsarrtime INT NULL,   uniquecarrier STRING NULL,   flightnum INT NULL,   tailnum STRING NULL,   actualelapsedtime INT NULL,   crselapsedtime INT NULL,   airtime INT NULL,   arrdelay INT NULL,   depdelay INT NULL,   origin STRING NULL,   dest STRING NULL,   distance INT NULL,   taxiin INT NULL,   taxiout INT NULL,   cancelled INT NULL,   cancellationcode STRING NULL,   diverted STRING NULL,   carrierdelay INT NULL,   weatherdelay INT NULL,   nasdelay INT NULL,   securitydelay INT NULL,   lateaircraftdelay INT NULL,   year INT NULL ) PARTITIONED BY SPEC (   year,   month ) STORED AS ICEBERG LOCATION 's3a://xxxxxx/warehouse/tablespace/external/hive/airlines.db/flights_v3' TBLPROPERTIES ('OBJCAPABILITIES'='EXTREAD,EXTWRITE', 'engine.hive.enabled'='true', 'external.table.purge'='TRUE', 'iceberg.catalog'='hadoop.tables', 'numFiles'='2', 'numFilesErasureCoded'='0', 'totalSize'='6958', 'write.format.default'='parquet')

通过 SDX 集成进行细粒度访问控制 (Ranger)

为了保护 Iceberg 表,我们支持基于 Ranger 的行和列安全规则,如下所示。

taxout列的列掩码:

早于 2000 年的行掩码:

代码语言:javascript
复制
SELECT taxiout FROM flights_v3 limit 10;

SELECT distinct (year) FROM flights_v3;

BI 查询

查询查找所有国际航班,定义为目的地机场国家与始发机场国家不同的航班:

代码语言:javascript
复制
SELECT DISTINCT 
   flightnum,    uniquecarrier,    origin,    dest,    month,    dayofmonth,    `dayofweek`
FROM flights_v3, airports_iceberg oa, airports_iceberg da  
WHERE 
   f.origin = oa.iata     and f.dest = da.iata and oa.country <> da.country 
ORDER BY    month ASC,    dayofmonth ASC 

LIMIT 4  ;

flightnum

uniquecarrier

origin

dest

month

dayofmonth

dayofweek

2280

XE

BTR

IAH

1

1

4

1673

DL

ATL

BTR

1

1

7

916

DL

BTR

ATL

1

1

2

3470

MQ

BTR

DFW

1

1

1

查询以探索乘客清单数据。例如,我们有国际转机航班吗?

代码语言:javascript
复制
SELECT * FROM unique_tickets a,    flights_v3 o,    flights_v3 d,   airports oa,    airports da   WHERE    a.leg1flightnum = o.flightnum    AND a.leg1uniquecarrier = o.uniquecarrier     AND a.leg1origin = o.origin     AND a.leg1dest = o.dest     AND a.leg1month = o.month     AND a.leg1dayofmonth = o.dayofmonth    AND a.leg1dayofweek = o.`dayofweek`     AND a.leg2flightnum = d.flightnum    AND a.leg2uniquecarrier = d.uniquecarrier     AND a.leg2origin = d.origin     AND a.leg2dest = d.dest     AND a.leg2month = d.month     AND a.leg2dayofmonth = d.dayofmonth    AND a.leg2dayofweek = d.`dayofweek`     AND d.origin = oa.iata     AND d.dest = da.iata     AND oa.country <> da.country  ;

总结

在第一篇博客中,我们与您分享了如何使用 Cloudera 数据平台中的 Apache Iceberg 来构建一个开放的 Lakehouse。在示例工作流中,我们向您展示了如何使用 Cloudera 数据工程 (CDE) 将数据集摄取到Iceberg表中,执行时间旅行和就地分区演化,以及使用 Cloudera 数据仓库应用细粒度访问控制 (FGAC) ( CDW)。请继续关注第二部分!

要自行构建开放式 Lakehouse,请注册 60 天试用版或试驾 CDP,尝试 Cloudera Data Warehouse (CDW)、Cloudera Data Engineering (CDE) 和 Cloudera Machine Learning ( CML )。如果您有兴趣在 CDP 中讨论 Apache Iceberg,请让您的客户团队知道。

原文作者:Bill Zhang、Peter Ableda、Shaun Ahmadian和Manish Maheshwari

原文链接:https://blog.cloudera.com/how-to-use-apache-iceberg-in-cdps-open-lakehouse/

本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2022-08-26,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 大数据杂货铺 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 解决方案概述
  • 先决条件:
  • 使用 CDE 将数据加载到 Iceberg 表中
  • 就地分区演变
  • 将 CDW 与 Iceberg 一起使用
  • 就地分区演变
  • 通过 SDX 集成进行细粒度访问控制 (Ranger)
  • BI 查询
  • 总结
相关产品与服务
数据库
云数据库为企业提供了完善的关系型数据库、非关系型数据库、分析型数据库和数据库生态工具。您可以通过产品选择和组合搭建,轻松实现高可靠、高可用性、高性能等数据库需求。云数据库服务也可大幅减少您的运维工作量,更专注于业务发展,让企业一站式享受数据上云及分布式架构的技术红利!
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档