Change query support in Delta Lake (3.2.0) — Jack Vanlightly

Change query support in Delta Lake (3.2.0)

This is the Delta Lake deep dive associated with the table format comparisons - Change queries and CDC blog post, which looks at how each table format supports change queries tables, including full CDC. This is not a how-to guide but an examination of Delta Lake capabilities through the lens of its internal design.

Preamble

In the parent blog post, I described four canonical change query types:

  • Latest-state

    • append-only: Return all inserted rows covering the query time period.

    • upsert: Return all inserted and updated rows (without the before image) covering the query time period. This may or may not return rows that were deleted in the time period; this will depend on the table format.

  • CDC

    • min-delta: Minimized CDC results covering the query time period.

    • full-delta: Complete CDC results, including redundant changes, covering the query time period.

I took the append-only and min-delta names from this incremental processing Snowflake paper that describes how Snowflake change queries work. Upsert and full-delta are additional types that are not supported by Snowflake but have some support among the table formats. We’ll be framing the Delta capabilities in terms of those four change query types.

In the parent post, I also covered the three main approaches to CDC support:

  • Materialize CDC files on write

  • Infer-CDC-on-read

  • Row-lineage (aka row tracking)

Delta Lake supports incremental latest-state reads, called streaming reads in Spark Structured Streaming, and CDC reads, known as CDC feeds.

Some relevant internals background

As with the Apache Iceberg post, it’s useful to cover some relevant aspects of Delta Lake internals to understand the change query support.

Incremental table scans

Each table commit results in a new delta log entry being written to the Delta Log. Compute engines can incrementally process these entries of the log by sorting the entries in lexicographical order. Each entry that represents an update to data will contain at least one AddFile and/or DeleteFile action. If deletion vectors are enabled, then an AddFile action may contain the location of a deletion vector file as well as a data file.

Fig 1. Compute engines can walk forwards through the delta log entries by sorting them lexicographically.

Copy-on-write vs merge-on-read

Delta Lake has a lot in common with Apache Iceberg, including the general design around copy-on-write and merge-on-read. Copy-on-write works in the same way as Iceberg. Merge-on-read is also essentially the same; Iceberg’s positional delete files are very similar to Delta’s deletion vectors, though Delta does not support equality deletes.

In this deep dive we’ll be using the same set of three SQL commands repeatedly to understand change queries. The figures below show the simplified representation of the data, delete and metadata files of a copy-on-write (COW) table and a merge-on-read (MOR) table, undergoing these same three commands.

Copy-on-write example:

Fig 2. Example of the three operations using copy-on-write.

Merge-on-read example:

Fig 3. The same three operations, but with deletion vectors enabled (making this a merge-on-read table).

The difference between the two table modes in Delta Lake, is that a MOR table:

  1. Suppresses the delete file action.

  2. Does not copy over unchanged rows to new data files.

  3. Adds deletion vectors to invalidate the original rows of updated/deleted rows.

Row-tracking (aka row lineage)

Delta Lake has recently added row tracking, which is also known as row lineage. I will use the term row tracking from here on as Delta uses that term. You can find a description of its design in the PROTOCOL.md file in the Delta GitHub repository.

Row tracking refers to the ability to track the history and evolution of a row across commits (table versions). Row tracking, when enabled, requires that each row be written with some additional metadata columns, including:

  • A stable row ID that identifies the same row across snapshots.

  • A stable row commit version, that indicates the commit when the row was last inserted/updated. This column will be null for an inserted row.

Snowflake uses a similar row-tracking approach to make append-only change queries efficient by only needing to scan added data files and return rows with a null value for their equivalent of the row commit version. Snowflake also uses row tracking to feed into its Delta Minimization (min-delta) compute stage, depicted in the Iceberg post.

We’re still waiting for the open-source compute engines to leverage row-tracking in Delta change queries. At the time of writing, Delta-Spark 3.2.0 supports the reading and writing the change-tracking columns only, but doesn’t use to make certain queries more efficient. Later, we’ll show some code that outputs the row ID and row commit version columns of a table using Spark 3.5 with Delta-Spark 3.2.0.

Now time to dive into change query support in Delta Lake.

Append-only and Upsert Change Queries

As in the Apache Iceberg post, I’ll include the following open-source compute engines:

  • Apache Spark

  • Spark Structured Streaming (Spark SS)

  • Apache Flink.

Flink and Spark SS both offer the ability to incrementally query a Delta Lake table, though with some rough edges. None yet use row tracking in their change queries, and therefore there are a number of limitations at the time of writing.

Firstly, both Spark SS and Flink only read the data files of AddFile actions. This means that delete operations are not reflected in the results. As I covered in the parent blog post, it’s open to debate as to whether an incremental append-only or upsert query should reflect deletes by not returning deleted rows or not. Snowflake described in their incremental processing paper, that they decided to reflect deletes, even in append-only change queries, by not returning deleted rows. 

Secondly, both Spark SS and Flink error out when encountering a delta log entry that both adds and deletes files. These are commits that perform row-level updates and deletes. Because the basics of row tracking was only just implemented in Delta Lake, none of the three sampled compute engines make use of it yet. Because they don’t make use of it and because performing row-level comparisons between added and deleted files is expensive, we are left with rather poor support for append-only and upsert change queries (as we’ll see next).

Erroring when a row-level update/delete commit is encountered is not ideal, so to avoid these errors, users are given some configs:

  • ignoreChanges (now deprecated in Spark), tells Spark SS/Flink that when it encounters a commit with both AddFile and RemoveFile actions, to return all rows in the added data files, regardless of whether those rows were just carried over from the original file or were actually updated rows, or the AddFile corresponds to an unchanged data file that has a DV file change. Thus, this config can cause duplicates, but avoids Spark/Flink streaming reads from erroring when row-level updates/deletes are made to the table.

  • ignoreDeletes, tells Spark/Flink to suppress an error that would be raised by encountering a commit that only delete files.

  • skipChangeCommits, tells Spark to simply skip any commits that both add and remove data files. This config therefore causes queries to only return inserted rows. Flink does not yet have this config.

We’ll run some examples in a moment to make things clearer. Before we do, the last thing to take into account is that the rows returned by a Spark SS streaming read depend on the starting commit of the read. The Delta Lake docs state that: “When you load a Delta table as a stream source and use it in a streaming query, the query processes all of the data present in the table as well as any new data that arrives after the stream is started.

This means that a streaming read is the combination of an initial batch query based on the starting table version + an ongoing streaming query that walks the Delta Log as new commits are made. You can specify a starting commit or timestamp, and the table is queried batch style up to that point (exclusive), then the streaming read starts going through the Delta Log, commit by commit returning results.

Next, we’ll run some streaming reads based on the same three commits, but using different configs.

Examples using Spark SS

Let’s run the same three commits that I used earlier in the COW vs MOR diagrams. These were three commits to insert, update, and delete some rows (this is an excerpt of some Python code I used to test this stuff):

inputdf1 = spark.createDataFrame([
   Row(name='jack', fruit='apple'),
   Row(name='sarah', fruit='orange'),
   Row(name='john', fruit='pineapple')
])
inputdf1.coalesce(1).write.format("delta").mode("overwrite").save(path)
deltaTable = DeltaTable.forPath(spark, path)
deltaTable.update("name == 'jack'", { "fruit": "'banana'" })
deltaTable.delete("name == 'john'")

The configs we’ll use are:

  • Table property: delta.enableDeletionVectors.

  • Streaming read option: ignoreChanges

  • Streaming read option: skipChangeCommits

  • Streaming read option: startingVersion

Because there are only three commits, the streaming read will only ever return one batch of results, no matter which configs we choose.

To understand the results returned by these experiments, it’s useful to remember the COW vs MOR diagrams from earlier. Please take a moment to review them again.

As a reminder, streaming reads (after the initial batch query) will only read the data files referenced by AddFile actions in the Delta Log.

Config 1: DV enabled or disabled, no startingVersion, ignoreChanges=false

spark.readStream.format("delta")
           .option("ignoreChanges", "false")
           .option("skipChangeCommits", "false")

The three commits include two that perform row-level updates or deletes, so a streaming read should error when ignoreChanges is set to false. However, no startingVersion is specified in this example so the initial batch query will be based on the current table version (which includes all three commits), and so the subsequent incremental queries have no data to process - therefore the no error occurs.

Result:
+-----+------+
| name| fruit|
+-----+------+
| jack|banana|
|sarah|orange|
+-----+------+

Config 2: DV enabled or disabled, startingVersion specified, ignoreChanges=false

spark.readStream.format("delta")
    .option("ignoreChanges", "false")
    .option("skipChangeCommits", "false")
    .option("startingVersion", 0)

This time we specify a starting table version of 0, so the streaming read has no initial batch query to run and immediately starts walking the Delta Log. However, because ignoreChanges=false, it immediately errors out with:

org.apache.spark.sql.delta.DeltaUnsupportedOperationException: [DELTA_SOURCE_TABLE_IGNORE_CHANGES] Detected a data update (for example UPDATE (Map(predicate -> ["(name#56 = jack)"]))) in the source table at version 2. This is currently not supported. If this is going to happen regularly and you are okay to skip changes, set the option 'skipChangeCommits' to 'true'. If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.

Config 3: DV disabled (COW) , startingVersion specified, ignoreChanges=true

DeltaTable...property("delta.enableDeletionVectors", "false")
spark.readStream.format("delta")
    .option("ignoreChanges", "true")
    .option("skipChangeCommits", "false")
    .option("startingVersion", 0)

This time Delta will not error out when encountering the update commits of table versions 2 and 3. Instead it will simply return the rows in each AddFile action. With DV being disabled, this makes Delta operate using copy-on-write, so we should have three data files, corresponding to the three table versions. The results are the union of those three data files.

Result (I added the commit numbers):

+-----+---------+
| name|    fruit|
+-----+---------+
| jack|    apple|   (commit 1)
|sarah|   orange|   (commit 1)
| john|pineapple|   (commit 1)
| jack|   banana|   (commit 2)
|sarah|   orange|   (commit 2)
| john|pineapple|   (commit 2)
| jack|   banana|   (commit 3)
|sarah|   orange|   (commit 3)
+-----+---------+

Config 4: DV enabled (MOR) , startingVersion specified, ignoreChanges=true

DeltaTable...property("delta.enableDeletionVectors", "true")
spark.readStream.format("delta")
    .option("ignoreChanges", "true")
    .option("skipChangeCommits", "false")
    .option("startingVersion", 0)

This time, there are a different set of AddFile actions to be read. Each DV file change results in an add and a remove of the associated data file, plus the AddFile of any data file added because of an update. Refer back to the MOR diagram earlier on for a reminder. The results are different with DVs enabled. The reason is that the data files of AddFile actions with MOR are not the same as COW, so different results are returned.

Result (again, I added the commit numbers
to help you make sense of the results):
+-----+---------+
| name|    fruit|
+-----+---------+
| jack|    apple|   (commit 1)
|sarah|   orange|   (commit 1)
| john|pineapple|   (commit 1)
|sarah|   orange|   (commit 2)
| john|pineapple|   (commit 2)
| jack|   banana|   (commit 2)
|sarah|   orange|   (commit 3)
+-----+---------+

This result looks pretty odd, the results of commit 1 and commit 2 make sense, but why is the Sarah row returned at the end? Let’s unpack it:

  • Commit 1 inserted the three rows to data-1. 

  • Commit 2 updated Jack to Banana, which involved two AddFiles actions. The first AddFile was readding data-1, but with a deletion vector invalidating the [Jack, Apple] row. It also contained an AddFile action containing data-2, added for the new Jack row corresponding to [Jack, Banana].

  • So far so good, but why is only Sarah returned for commit 3? The answer is that commit 3 added a DV for the John row of data-1, which resulted in an AddFile for data-1 with the new DV file (that invalidates the Jack and John rows). Because it was a delete, no additional AddFile occurred, so data-1 was read again but with delete-2 applied, which resulted in only one valid row: the Sarah row.

Fig 4. Explaining the results of the last Spark SS query,

Understanding the underlying metadata, data and delete files, and the logic of a streaming read demystifies these results (but it is not intuitive at all).

Config 5. DV enabled or disabled, startingVersion specified, skipChangeCommits=true

spark.readStream.format("delta")
    .option("ignoreChanges", "false")
    .option("skipChangeCommits", "true")
    .option("startingVersion", 0)

When skipChangeCommits is enabled, a streaming read walking the Delta Log looking for AddFile actions will simply skip right over any commits with both AddFile and RemoveFile actions. This makes the choice of COW or MOR irrelevant, and makes a streaming read one that only returns inserted rows. Revisit the COW vs MOR diagrams to convince yourself this is true.

Result:
+-----+---------+
| jack|    apple|
|sarah|   orange|
| john|pineapple|
+-----+---------+

Append-only/upsert change query summary

It is no wonder that ignoreChanges is deprecated in Spark. It’s hard to make sense of and build a data pipeline or app based on its results. If you’re going to use streaming reads on mutable tables, it’s probably best to stick with skipChangeCommits and only process new data. But remember that any inserted rows from a MERGE statement will be skipped, just like with Apache Iceberg. I’ll therefore classify the append-only support as non-conformant to my append-only change query definition.

The good news is that Delta has a much clearer and high read performance CDC feed feature that I think makes up for the current shortcomings of the append-only/upsert streaming reads in Spark and Flink.

min/full-delta via the CDC feed feature

Despite the storage and writer overhead, CDC file materialization is the way Delta Lake supports CDC change queries. While there is a storage and writer cost, the benefits of efficient reads likely outweigh the overhead. This would especially be true in cases with many CDC readers and derived tables that depends on the CDC feed. Snowflake took a different approach, using the row-lineage approach, highlighting the greater space efficiency. Having not implemented these table formats, I can’t tell you which is the best approach.

Using the same three commits from the last section, the next two figures show the metadata, data and CDC files created for COW and MOR tables.

COW example:

Fig 5. Depicts a simplified view of a Delta Lake table that materializes changes as CDC files, with deletion vectors disabled (copy-on-write).

MOR example:

Fig 6.  Depicts a simplified view of a Delta Lake table that materializes changes as CDC files, with deletion vectors enabled (merge-on-read).

Delta Lake was designed around batch and micro-batch use cases of Apache Spark and, therefore, assumes that the compute engine will perform a scan phase where it collects the rows to be modified. It then applies the changes to the target rows and, at that moment, has all the information needed to write CDC files with the before and after images. If a commit only adds one or more data files or only logically deletes one or more files, then no CDC file is created. When CDC is enabled, and no CDC file exists in a commit, CDC readers assume any rows in an added data file are inserts and any rows in a deleted data file are deletes. 

The process works the same way whether the table is configured with copy-on-write or merge-on-read. Whether a table is COW or MOR is orthogonal to writing CDC files.

A CDC change query will return the changes since a given table version. The reader inspects each delta log entry since the specified table version, and checks whether it includes a CDC file. If CDC files exist, then AddFile actions correspond to inserts and RemoveFile actions correspond to deletes.

Let’s run some example CDC feed reads to see if there are any surprises.

CDC config 1 - DV enabled or disabled, startingPosition specified

spark.readStream.format("delta")
    .option("readChangeFeed", "true")
    .option("startingVersion", 0)

Spark will perform a batch query based on the table state at the starting commit, then perform streaming reads over subsequent log entries. In this case we set a startingVersion of 0 so no initial batch query is executed and the streaming portion walks the whole Delta Log from the first version onwards. This returns an unminimized sequence of CDC events, where pre and post image are separate rows.

+-----+---------+----------------+---------------+
| name|    fruit|    _change_type|_commit_version|
+-----+---------+----------------+---------------+
| jack|    apple|          insert|              1|
|sarah|   orange|          insert|              1|
| john|pineapple|          insert|              1|
| jack|    apple| update_preimage|              2|
| jack|   banana|update_postimage|              2|
| john|pineapple|          delete|              3|
+-----+---------+----------------+---------------+

CDC config 2 - DV enabled or disabled, startingPosition not specified

spark.readStream.format("delta")
    .option("readChangeFeed", "true")

When we don’t specify the startingPosition, the starting point defaults to the current table version so the initial batch query covers all three commits. The batch portion returns a minimized set of CDC events that represent the state of the table right now.

+-----+------+------------+---------------+
| name| fruit|_change_type|_commit_version|
+-----+------+------------+---------------+
| jack|banana|      insert|              3|
|sarah|orange|      insert|              3|
+-----+------+------------+---------------+

CDC feed summary

This CDC feed behavior is pretty simple and straightforward. Additionally, because the feed has been materialized by the writer, it is extremely efficient to read.

You don’t get to choose min-delta or full-delta; basically, the initial batch is min-delta and the subsequent streaming portion is full-delta.

Seeing the hidden row tracking columns using Spark

First you must ensure that row tracking is enabled via the table property delta.enableRowTracking.

The following code, run after the three commits, selects the row id and row commit version columns:

spark.read.format("delta").load(path) /
	.select("name", /
            "fruit", /
            "_metadata.row_id", /
            "_metadata.row_commit_version").show()

+-----+------+------+------------------+
| name| fruit|row_id|row_commit_version|
+-----+------+------+------------------+
| jack|banana|     0|                 2|
|sarah|orange|     1|                 1|
+-----+------+------+------------------+

Now we just need the open source compute engines to leverage this for better append-only and upsert change queries.

Conclusions

Current support for Delta Lake change queries in Flink, Spark and Spark Structured Streaming:

  • append-only: Yes, but only commits that add data files only, so some inserts can be missed (such as inserts from MERGE statements).

    • The recently added row tracking, implemented in Delta Spark 3.2.0 could be leveraged to fix this.

  • upsert: Deprecated, pretty poor support.

  • min-delta: Partial support in Spark Structured Streaming. The initial batch query returns minimized CDC. Uses the materialize CDC files on write approach.

  • full-delta: Support in Spark Structured Streaming. The continuous incremental portion of the streaming read returns full CDC. Uses the materialize CDC files on write approach.

The sampled open-source compute engines essentially have the same append-only/upsert change query limitations with Delta Lake as they do with Apache Iceberg. They only really support the same non-conformant append-only change query as they do not return all inserts. Being conformant, by my definition, means that an append-only change query should return all inserts, whether they were from an INSERT, MERGE or some kind of bulk operation. This will be solved if/when the open-source compute engines make use of the Delta Lake row tracking feature.

The CDC support is good, with a straightforward interface. You don’t get to choose between min-delta and full-delta is the only limitation. It will be interesting to see if Apache Iceberg follows suit (by implementing CDC file materialization) at some point, or sticks with a row-lineage-based implementation like Snowflake. Row-lineage is still in the design phase, so it’s early days for Iceberg, but Delta Lake has it right now. If you have any opinions about row-lineage vs CDC file materialization as the basis for CDC, then do let me know!

Next in the change query analysis is Apache Hudi.

Share