Most existing big data storages based on HDFS are lack of feature upsert(if exists then update otherwise add). This means you may suffer from many situations:
Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark and big data workloads.
It includes the following key features :
With the help of these features, you may write/update/read the same data in parallel. You can do a compaction job without affecting the other jobs who write/read the same data collection. This is really incredible. However, the bad news is, the latest version of delta is 0.2.0, and the Upsert/Delete/Compaction features are still working in progress. The good news is that I have created a new open-sourced project delta-plus which has already added Upsert/Delete/Compaction features based on delta 0.2.0.
So in this post, I will also talk about the features which are available on delta-plus.
The design of delta is really amazing, and it's simple, but it works. A delta table is just a directory contains two parts of file collection: A bunch of parquet files and a bunch of metafiles with both JSON/Parquet formats. This can be explained by the following picture:
image.png
The first question is, with this design, how can we add new data into the table?
image.png
As described in the previous picture, there are three main steps:
The data appending is easy, so what about upsert? And why the upsert operation will not affect the other readers?
image.png
Suppose we have already had a1,a2 in parquet files. Now records collection A comes, we will do the following steps to complete the upsert operation:
To filter records from all existing parquet files excluding the records in A and then creating new files from them is of course not a good idea. Just for simplicity, we describe the steps like that. Actually, we will use A to join all existing data and find the files contains records in A, this will not cause massive files are deleted and too many new files are created. We also can analyze the records A and extract the partition range information so we do not need to join all existing data to find the files contains records in A.
The compaction is easy, but there are three limits for compaction implemented in delta-plus for now:
The first limitation is caused by the current design of compaction. Compaction is a heavy operation because it means there are a lot of old files will be deleted and many new files will be created. The delta supports streaming writing and this means when you do a compaciton job, you should not affect the writing operation of streaming job. In order to accomplish this target, we will not lock the table before the job started, we will merge old files and create the new files firstly, then lock the table and finally, commit the new files and deleted files.
If the table allows upsert/delete operation when the compaction has merged some old files, after this, the old files may also be deleted(not really deleted) by the new operation, but the new files contain these records should be deleted will finally be committed. This should not happen.
So the compaction consists of three main steps:
You may have already noticed that no matter what operation you have applied in delta table, delta will always create a new commit file(JSON file) and record the files are newly created and the files are deleted for your operation. That's why we call the metafiles as delta log. With delta log, we can do time-travel (or version travel), we can do upsert/delete/append operation without affecting readers.