专栏首页祝威廉How delta works

How delta works

Most existing big data storages based on HDFS are lack of feature upsert(if exists then update otherwise add). This means you may suffer from many situations:

  1. You just can not update records. For incremental data sync, the ability of upsert is required.
  2. When you are updating data, people may not be able to update or even read the data.
  3. A huge number of small files will impact your storage memory and performance seriously. In order to reduce the number of small files, you may create a job to compact small files to big files. But this behavior will trigger situation 2, when you are doing compaction job, this may take a long time, and the other jobs will be forced to stop otherwise they may throw exceptions.

Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads.

It includes the following key features :

  1. ACID Transactions.
  2. Scalable Metadata Handling
  3. Time Travel(data versioning)
  4. Open Format(parquet)
  5. Unified Batch/Streaming Source and Sink
  6. Schema Enforcement
  7. Schema Evolution

With the help of these features, you may write/update/read the same data in parallel. You can do a compaction job without affecting the other jobs who write/read the same data collection. This is really incredible. However, the bad news is, the latest version of delta is 0.2.0, and the Upsert/Delete/Compaction features are still working in progress. The good news is that I have created a new open-sourced project delta-plus which has already added Upsert/Delete/Compaction features based on delta 0.2.0.

So in this post, I will also talk about the features which are available on delta-plus.

The design of delta is really amazing, and it's simple, but it works. A delta table is just a directory contains two parts of file collection: A bunch of parquet files and a bunch of metafiles with both JSON/Parquet formats. This can be explained by the following picture:

image.png

The first question is, with this design, how can we add new data into the table?

How to add new records to delta table

image.png

As described in the previous picture, there are three main steps:

  1. Create new parquet files to hold the new records. For now, the reader will not be affected, since the reader will read the existing metafiles and these metafiles do not contain the new parquet files.
  2. Finally, the delta will create a new JSON file named 000...0012.json which contains the new parquet files and marked these files are added.
  3. Because the delta has committed the parquet files with new commit 12, so now the readers can read the new records.

The data appending is easy, so what about upsert? And why the upsert operation will not affect the other readers?

How to upsert data to delta table

image.png

Suppose we have already had a1,a2 in parquet files. Now records collection A comes, we will do the following steps to complete the upsert operation:

  1. Create new parquet file a3 from A. Notice that in the actual situation, we may create more than one file.
  2. Filter a1,a2 with A, and get the records not in A, create new files for the records not in A. Suppose we create two new files a4,a5.
  3. finally, we commit a new JSON file named 12.json which contains information about a1,a2 are deleted and a3,a4,a5 are added.
  4. Before the final commit, we do not delete any files, so the readers will not be affected.
  5. After the final commit, there is no real deletion happen(we just mark the files are deleted), and the reader knows that a1,a2 have gone, and they need to read a3,a4,a5 from 000…012.json.

To filter records from all existing parquet files excluding the records in A and then creating new files from them is of course not a good idea. Just for simplicity, we describe the steps like that. Actually, we will use A to join all existing data and find the files contains records in A, this will not cause massive files are deleted and too many new files are created. We also can analyze the records A and extract the partition range information so we do not need to join all existing data to find the files contains records in A.

How to do compaction on delta table

The compaction is easy, but there are three limits for compaction implemented in delta-plus for now:

  1. The table you want to do compaction should not be performed upsert/delete operation.
  2. Only one compaction job is allowed at the same time.
  3. The specific version needs to be specified. The compaction will process all files are created before the target version.

The first limitation is caused by the current design of compaction. Compaction is a heavy operation because it means there are a lot of old files will be deleted and many new files will be created. The delta supports streaming writing and this means when you do a compaciton job, you should not affect the writing operation of streaming job. In order to accomplish this target, we will not lock the table before the job started, we will merge old files and create the new files firstly, then lock the table and finally, commit the new files and deleted files.

If the table allows upsert/delete operation when the compaction has merged some old files, after this, the old files may also be deleted(not really deleted) by the new operation, but the new files contain these records should be deleted will finally be committed. This should not happen.

So the compaction consists of three main steps:

  1. Find the files before the specific version(and these files will never be deleted by other operation)
  2. Merge these files to new files.
  3. Lock the table and try to commit.

Conclusions

You may have already noticed that no matter what operation you have applied in delta table, delta will always create a new commit file(JSON file) and record the files are newly created and the files are deleted for your operation. That's why we call the metafiles as delta log. With delta log, we can do time-travel (or version travel), we can do upsert/delete/append operation without affecting readers.

本文参与腾讯云自媒体分享计划,欢迎正在阅读的你也加入,一起分享。

我来说两句

0 条评论
登录 后参与评论

相关文章

  • Your Guide to DL with MLSQL Stack (3)

    This is the third article of Your Guide with MLSQL Stack series. We hope this ar...

    用户2936994
  • Introducing Spark-Kafka integration for realtime Kafka SQL queries

    Apache Kafka has been all the rage for the key join of the data pipeline. But in...

    用户2936994
  • The next goal of MLSQL Stack

    MLSQL is a SQL-Based language and MLSQL Stack is a platform including many syste...

    用户2936994
  • Code forces 719A Vitya in the Countryside

    A. Vitya in the Countryside time limit per test:1 second memory limit per test:2...

    Angel_Kitty
  • CornerNet: 成对关键点物体检测 | CSDN博文精选

    5、用于Grouping Corners的 embedding vector的工作原理

    AI科技大本营
  • 操作系统 Interrupt 执行的具体步骤

    本文内容主要摘抄于 Intel® 64 and IA-32 Architectures Software Developer's Manual,其详细介绍了 i...

    wangyuntao
  • How to find “hidden” remote jobs using Google Search.

    By using a special search operator with Google search, you can find remote jobs ...

    仇诺伊
  • Linux命令行基础

    AT&T公司于20世纪70年代发布了UNIX系统。经过多年的发展,Unix不再是某一个具体操作系统的名称,而是对遵循Unix规范、设计和哲学的一类操作系统的统称...

    雪飞鸿
  • 机器人体验营笔记(二)基础

    版权声明:署名,允许他人基于本文进行创作,且必须基于与原先许可协议相同的许可协议分发本文 (Creative Commons)

    zhangrelay
  • 另类Alpha:基于供应链数据的量化因子挖掘

    在量化交易中,如何获取适当的数据用于开发和测试交易策略,往往是投资者面临的难题。随着技术的发展,获取大数据的成本不断降低,但历史价格等传统数据已完全无法满足投资...

    量化投资与机器学习微信公众号

扫码关注云+社区

领取腾讯云代金券