前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >3.数据湖deltalake之时间旅行及版本管理

3.数据湖deltalake之时间旅行及版本管理

作者头像
Spark学习技巧
发布2021-03-05 15:41:36
9150
发布2021-03-05 15:41:36
举报
文章被收录于专栏:Spark学习技巧Spark学习技巧

浪尖在deltalake第一讲的时候说过,它支持数据版本管理和时间旅行:提供了数据快照,使开发人员能够访问和还原早期版本的数据以进行审核、回滚或重新计算。

1.场景

delta lake的时间旅行,实际上就是利用多版本管理机制,查询历史的delta 表快照。时间旅行有以下使用案例:

1).可以重复创建数据分析,报告或者一些输出(比如,机器学习模型)。这主要是有利于调试和安全审查,尤其是在受管制的行业里。

2).编写复杂的基于时间的查询。

3).修正数据中的错误信息。

4).为一组查询提供快照隔离,以快速变更表。

2.配置

DataframeTable支持创建dataframe的时候指定一个delta lake表的版本信息:

代码语言:javascript
复制
val df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("/delta/events")
val df2 = spark.read.format("delta").option("versionAsOf", version).load("/delta/events")

对于版本号,直接传入一个版本数值即可,如下:

代码语言:javascript
复制
val df2 = spark.read.format("delta").option("versionAsOf", 0).table(tableName)

对于timestamp字符串,必须要是date格式或者timestamp格式。例如:

代码语言:javascript
复制
val df1 = spark.read.format("delta").option("timestampAsOf", "2020-06-28").load("/delta/events")
val df1 = spark.read.format("delta").option("timestampAsOf", "2020-06-28T00:00:00.000Z").load("/delta/events")

由于delta lake的表是存在更新的情况,所以多次读取数据生成的dataframe之间会有差异,因为两次读取数据可能是一次是数据更新前,另一次是数据更新后。使用时间旅行你就可以在多次调用之间修复数据。

代码语言:javascript
复制
val latest_version = spark.sql("SELECT max(version) FROM (DESCRIBE HISTORY delta.`/delta/events`)").collect()
val df = spark.read.format("delta").option("versionAsOf", latest_version[0][0]).load("/delta/events")

3.数据保存时间

默认情况下,deltalake保存最近30天的提交历史。这就意味着可以指定30天之前的版本来读取数据,但是有些注意事项:

3.1 没对delta 表调用VACUUM函数。VACUUM函数是用来删除不在引用的delta表和一些超过保留时间的表,支持sql和API形式。

slq表达式:

代码语言:javascript
复制
VACUUM eventsTable   -- vacuum files not required by versions older than the default retention period

VACUUM '/data/events' -- vacuum files in path-based table

VACUUM delta.`/data/events/`

VACUUM delta.`/data/events/` RETAIN 100 HOURS  -- vacuum files not required by versions more than 100 hours old

VACUUM eventsTable DRY RUN    -- do dry run to get the list of files to be deleted

scala API 表达式

代码语言:javascript
复制
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

deltaTable.vacuum()        // vacuum files not required by versions older than the default retention period

deltaTable.vacuum(100)     // vacuum files not required by versions more than 100 hours old

可以通过下面两个delta 表属性配置来

  • delta.logRetentionDuration =“ interval <interval>”:控制将表的历史记录保留多长时间。每次写入checkpoint时,都会自动清除早于保留间隔的日志。如果将此配置设置为足够大的值,则会保留许多日志。这不会影响性能,因为针对日志的操作是常量时间。历史记录的操作是并行的(但是随着日志大小的增加,它将变得更加耗时)。默认值为 interval 30 days。
  • delta.deletedFileRetentionDuration =“ interval <interval>”:在这个时间范围内的数据是不会被VACUUM命令删除。默认值为间隔7天。要访问30天的历史数据,请设置delta.deletedFileRetentionDuration = "interval 30 days"。此设置可能会导致您的存储成本上升。

注意:VACUUM命令是不会删除日志文件的,日志文件是在checkpoint之后自动删除的。

为了读取之前版本的数据,必须要保留该版本的日志文件和数据文件。

4.案例

修复意外删除的用户111的数据。

代码语言:javascript
复制
INSERT INTO my_table
  SELECT * FROM my_table TIMESTAMP AS OF date_sub(current_date(), 1)
  WHERE userId = 111

修复错误更新的数据

代码语言:javascript
复制
MERGE INTO my_table target
  USING my_table TIMESTAMP AS OF date_sub(current_date(), 1) source
  ON source.userId = target.userId
  WHEN MATCHED THEN UPDATE SET *

查询过去七天新增的消费者数:

代码语言:javascript
复制
  SELECT count(distinct userId)
  FROM my_table TIMESTAMP AS OF date_sub(current_date(), 7))
本文参与 腾讯云自媒体分享计划,分享自微信公众号。
原始发表:2020-07-20,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 浪尖聊大数据 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体分享计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档