Table format comparisons - Change queries and CDC

I work for Confluent, where the table/stream duality has been written and spoken about for many years. Jay Kreps wrote about it back in 2013 while still at LinkedIn. This duality concept is about how the stream and the table are different sides of the same coin:

  • A stream is a log of events where each event represents some change in the world. For example, a user account was created, and then, in another event, it was deleted. OLTP databases use transaction logs (aka write-ahead-logs and redo-logs) where changes are written before being applied to a table.

  • A table is a point-in-time snapshot of the world. At point t1, user 1 existed in the table, and in point t2, the user did not exist. Some databases store the table state of multiple points in time, with the table formats being one example with their immutable data + log of deltas/snapshots design.

This post, and its associated deep dives, will look at how changes made to an Iceberg/Delta/Hudi/Paimon table can be emitted as a stream of changes. In the context of the table formats, it is not a continuous stream, but the capability to incrementally consume changes by performing periodic change queries.

These change queries can return full Change Data Capture (CDC) data or just the latest data written to the table. When people think of CDC, they might initially think of tools such as Debezium that read the transaction logs of OLTP databases and write a stream of change events to something like Apache Kafka. From there the events might get written to a data lakehouse. But the lakehouse table formats themselves can also generate a stream of change events that can be consumed incrementally. That is what this post is about.

Before I begin, I also recommend reading the Snowflake engineering paper, What’s the Difference? Incremental Processing with Change Queries in Snowflake, which has a great introduction to the concepts of change queries. Their implementation is focused on how Snowflake works internally but the Semantics section of the paper takes a broader look at change queries that I think is well written.

At a conceptual level

There are different forms of consuming changes:

  • Incremental change queries are SQL queries that return the changes that have occurred between two points in time:

    • CDC queries: Consume the latest changes with the operation type, such as insert, update-before, update-after and delete, as a column. For updates, update-before contains the original row and update-after contains the updated row. This allows a CDC reader to know how a row was changed. This is known as the before/pre image and after/post image of a row. 

    • Latest-state queries: Consume the latest inserted/updated rows without the historical before image. This may be limited to only the new rows that were inserted (append-only change queries), or both the new and updated rows (upsert change queries). Deletes by definition cannot be read using this technique, but they can be reflected by simply not returning a row that might otherwise have been returned.

  • Materialized view maintenance: a compute engine uses a table format’s change tracking capabilities to maintain a derived table by periodically consuming changes made to the source table, and applying them to the derived table. 

Non-CDC change queries

We’ll use the following three commands to try and make this concepts section a little more concrete. Let’s image the following three commands are executed on a table resulting in three table commits:

INSERT INTO FavFruit VALUES (‘jack’, ‘apple’), (‘sarah’, ‘orange’), (‘john’, ‘pineapple’)
UPDATE FavFruit SET fruit = ‘banana’ WHERE name = ‘jack’
DELETE FROM FavFruit WHERE name = ‘john’

Which of the following should be returned, for an incremental non-CDC change query that covers the period of these three commits?

+-----+---------+
| name|    fruit|
+-----+---------+
| jack|    apple|
|sarah|   orange|
| john|pineapple|
+-----+---------+
Fig 1. Corresponding to the inserts (insert-only) of the period.

+-----+---------+
| name|    fruit|
+-----+---------+
| jack|   banana|
|sarah|   orange|
| john|pineapple|
+-----+---------+
Fig 2. Corresponding to the inserts and updates (upserts only) of the period.

+-----+---------+
| name|    fruit|
+-----+---------+
| jack|   banana|
|sarah|   orange|
+-----+---------+
Fig 3. Corresponding to the inserts, updates and deletes (all) of the period.  The deletes are honored by not returning rows that were inserted and deleted in the query time period.

There is no right answer; it depends on the use case, but it’s worth thinking about. Different table formats take different approaches.

CDC results, one row or two?

Inserts and deletes only consist of a single change; either the row was added or deleted. However, in CDC, every row update has a before and after image. There are different approaches to how updates are returned in CDC change queries:

  1. Combine row changes into a single event/row with the before and after images as separate fields/columns. This is how Debezium and Apache Hudi do it.

  2. Make the before and after image into separate rows, with a change type of update-before and update-after. This is how Delta does it.

  3. Make the before and after images into separate rows, with a change type of delete and insert. To differentiate between an update and a delete-then-insert, another column can be used to indicate if this was an update or not. This is how Snowflake does it.

+-----+---------+----------------+---------------+--------------------+
| name|    fruit|    _change_type|_commit_version|   _commit_timestamp|
+-----+---------+----------------+---------------+--------------------+
| jack|    apple|          insert|              1|2024-09-19 13:53:...|
|sarah|   orange|          insert|              1|2024-09-19 13:53:...|
| john|pineapple|          insert|              1|2024-09-19 13:53:...|
| jack|    apple| update_preimage|              2|2024-09-19 13:53:...|
| jack|   banana|update_postimage|              2|2024-09-19 13:53:...|
| john|pineapple|          delete|              3|2024-09-19 13:53:...|
+-----+---------+----------------+---------------+--------------------+
Fig 4. Shows a CDC change query result using Spark and Delta Lake, covering
the three commits. Delta returns two rows for the update in commit 2.

Complete or minimized?

The results of a change query may be complete or minimized:

  • A complete change log: Consume all changes, even if they cancel each other out. E.g a row is inserted, then deleted within the query time bounds. These are known as redundant changes.

  • A minimized change log: You consume a minimized set of changes where redundant changes are eliminated. An incremental read covers a time period (a number of commits in the log of deltas/snapshots), and the changes are minimized covering that period.

+-----+------+------------+---------------+--------------------+
| name| fruit|_change_type|_commit_version|   _commit_timestamp|
+-----+------+------------+---------------+--------------------+
| jack|banana|      insert|              3|2024-09-19 13:55:...|
|sarah|orange|      insert|              3|2024-09-19 13:55:...|
+-----+------+------------+---------------+--------------------+
Fig 5. Shows the same three commits as the previous figure but
minimized. The insert and update of the ‘jack’ row is represented
as a single insert row but with the updated state. The ‘john’
row is absent as its insertion and deletion were redundant.

Many ways of returning change data

The figure below shows five different forms of a change query result based on the same underlying operations. These are just a sample of the possible outputs that a change query could theoretically return.

Fig 6. Five examples of change events produced from a set of three operations.

Framing this analysis with four canonical use cases

The Snowflake engineering paper, What’s the Difference? Incremental Processing with Change Queries in Snowflake, names the use cases they support as append-only and min-delta. I will use the same terms and add two more:

  • append-only: Return inserted rows covering the query time period.

  • upsert: Return inserted and updated rows (without the before image) covering the query time period. This may or may not return rows that were deleted in the time period; this will depend on the table format.

  • min-delta: Minimized CDC results covering the query time period.

  • full-delta: Complete CDC results, including redundant changes, covering the query time period.

All later per-table format analyses will frame their capabilities in terms of these four change query use cases. It’s worth noting here that what the table formats offer doesn’t always nicely fit into these four categories, but it allows us to compare to a simple baseline.

Different categories of implementation across the table formats

Each table format has a different underlying design and therefore takes a slightly different approach to the four change query types. However, we can broadly classify the CDC query implementations as follows.

Materialize CDC files on write

Writers materialize change events in special change files that get added alongside the regular data and delete files in each commit. These CDC files form part of the table and its history. Regular table readers ignore the CDC files, but CDC readers only read the CDC files when they can. If a commit only inserts or only deletes data files, then these operations can also serve as CDC files. For example, if an operation adds a new data file that contains only inserts (and no mutations of existing rows), then a CDC reader can read that data file and treat all rows as inserts.

Fig 7. Non-CDC incremental readers tail the log of deltas/snapshots to learn of data files to read. CDC readers also tail the log, but to discover CDC files to read.

  • Pros: Efficient for CDC readers.

  • Cons:

    • More storage required.

    • Adds overhead to writers as additional files must be written.

Infer-CDC-on-read

Readers must compare each commit to the prior one to extract the row-level changes of each commit.

Fig 8. A CDC reader must compute the difference between adjacent snapshots to infer the row-level changes.

  • Pros: No overhead to writers.

  • Cons: Depending on the implementation, this can be IO and computationally expensive (and slow).

Row-level lineage/tracking

This is the approach taken in the Snowflake paper, and seems well suited to copy-on-write (COW) tables that need to differentiate between inserted/updated rows and carry-over rows (rows that were carried over from the original data file unchanged).

Writers add metadata to each row, that tells a compute engine if this is a new row or an updated row. It also contains enough information for the compute engine to find the original row, for comparison if it wants to generate a full CDC result. Row-lineage requires that rows be identified with a row id, even if the table format doesn’t explicitly support primary keys.

  • Pros:

    • Low storage overhead compared to CDC file materialization.

    • Useful for MV maintenance that doesn’t need a before-image.

  • Cons

    • Full CDC still requires additional work for the reader to go and figure out the before image of updates and deletes.

    • Seems that merge-on-read reduces the need for this as no carry-over rows are written to data files and positional delete files can be compared between commits (at lower cost than data file comparisons) to indicate the location of newly invalidated rows.

Now we’ve explored what change queries are and some common implementation patterns, let’s look at my findings (as of October 2024).

The deep dives are done, the results are in

I’ve now completed a deep dive for each of the four table formats that I have been covering. It’s clear to me now that the subject of tracking changes and emitting a stream of changes is nuanced because there are multiple forms of incrementally consuming changes and many implementations. This section will summarize my findings.

You can find the deep dives with the following links. Each explores the capabilities of each table format through the lens of its internal design:

Findings summary

  • Append-only change queries:

    • Apache Iceberg (v2): Yes, but only includes inserted rows of APPEND snapshots. Any inserts that result from a MERGE statement for example will be skipped (v3 will provide the building blocks for fixing this).

    • Delta Lake (3.2.0): Yes, but identical to Iceberg v2 (though compute engines could use the new row tracking to fix this).

    • Apache Hudi (0.15): No.

    • Apache Paimon (0.8): No.

  • Upsert change queries (with or without honoring deletes):

    • Apache Iceberg (v2): No (but v3 will provide building blocks for this).

    • Delta Lake (3.2.0): No, (technically yes but the functionality is broken in my opinion). Compute engines could leverage the new row tracking to fix this gap.

    • Apache Hudi (0.15): Yes, and an efficient implementation. COW tables honor deletes by not returning deleted rows. MOR tables can be configured to ignore deletes but then may also return duplicate rows (if multiple updates were made to the same primary key).

    • Apache Paimon (0.8): Yes, and an efficient implementation. You can choose whether or not deletes are honored in the query results (without the duplication of Hudi).

  • min-delta (CDC):

    • Apache Iceberg (v2): No native support in v2. Expect v3 to include building blocks for this.

      • Spark supports min-delta over Iceberg COW tables, by doing the heavy lifting itself.

    • Delta Lake (3.2.0): No, (though if you perform a batch query over the CDC data then it is returned as min-delta, the incremental change queries return full-delta).

    • Apache Hudi (0.15): No.

    • Apache Paimon (0.8): Yes, if you configure CDC file materialization via full-compaction changelog-producer mode, using dedicated compaction jobs.

  • full-delta (CDC):

    • Apache Iceberg (v2): No native support in v2. Expect v3 to include building blocks for this.

      • Spark supports full-delta over Iceberg COW tables, by doing the heavy lifting itself.

    • Delta Lake (3.2.0): Yes.

    • Apache Hudi (0.15): Yes.

    • Apache Paimon (0.8): Yes, if you configure CDC file materialization via lookup changelog-producer mode.

It’s interesting to see that Iceberg and Delta can do a limited append-only change query but Hudi and Paimon can’t, and the reverse for upsert change queries. I expect all to be able to do both in the end, as there is no fundamental reason that would prevent it (once Iceberg gets row lineage in v3). All you need is row-level metadata that can tell a compute engine when a row was added or updated. Hudi may struggle as a Hudi reader currently cannot tell the difference, when doing a scan between an inserted and updated row (it’s all just upserts).

For CDC feeds, it seems that full-delta is the predominant type, perhaps because that is how the CDC data gets materialized in CDC files in most cases. However, there’s no reason a compute engine couldn’t minimize that data at query time if told to do so, perhaps we’ll see that if there is demand for it.

Hudi was notable for its “dial” to choose between CDC materialization and infer-on-read. I haven’t used Hudi at scale, so I can’t say how useful it is, but it’s there. It would be interesting to hear from users in the wild that are choosing the DATA_BEFORE and OP_KEY_ONLY supplemental logging modes.

Paimon was interesting in good and bad ways. The good was that it stores RowKind in each row, so if you don’t need the before image of a row update, then the compute engine only needs to return the data in its existing form (no CDC file materialization or infer-CDC-on-read). The bad is that the type of compaction determines whether you get full or min-delta, and the type of compaction may be forced on you by the workload and other factors.

Finally, Iceberg has some catching up to do here. v3 looks promising with the row lineage proposal from Snowflake engineers. This will provide the much needed row-level metadata needed for change queries.

Final thoughts on the future of change queries

All this stuff is changing fast, and I expect a lot to change over the coming months. Whether a format is ahead or behind is likely to change over time as they slowly converge in terms of capabilities. Delta now has some row-level capabilities with its row tracking, and Iceberg will be getting this in v3 hopefully. Adding row-level metadata is the key to making incremental change queries to work efficiently, and we should expect that all the table formats will ultimately converge to somewhat similar row-level capabilities (and efficiencies) in the end.

I’ll be trying to stay on top of table format evolution, and blog about any interesting developments.