Table format comparisons - Change queries and CDC

I work for Confluent, where the table/stream duality has been written and spoken about for many years. Jay Kreps wrote about it back in 2013 while still at LinkedIn. This duality concept is about how the stream and the table are different sides of the same coin:

  • A stream is a log of events where each event represents some change in the world. For example, a user account was created, and then, in another event, it was deleted. OLTP databases use transaction logs (aka write-ahead-logs and redo-logs) where changes are written before being applied to a table.

  • A table is a point-in-time snapshot of the world. At point t1, user 1 existed in the table, and in point t2, the user did not exist. Some databases store the table state of multiple points in time, with the table formats being one example with their immutable data + log of deltas/snapshots design.

This post, and its associated deep dives, will look at how changes made to an Iceberg/Delta/Hudi/Paimon table can be emitted as a stream of changes. In the context of the table formats, it is not a continuous stream, but the capability to incrementally consume changes by performing periodic change queries.

These change queries can return full Change Data Capture (CDC) data or just the latest data written to the table. When people think of CDC, they might initially think of tools such as Debezium that read the transaction logs of OLTP databases and write a stream of change events to something like Apache Kafka. From there the events might get written to a data lakehouse. But the lakehouse table formats themselves can also generate a stream of change events that can be consumed incrementally. That is what this post is about.

Before I begin, I also recommend reading the Snowflake engineering paper, What’s the Difference? Incremental Processing with Change Queries in Snowflake, which has a great introduction to the concepts of change queries. Their implementation is focused on how Snowflake works internally but the Semantics section of the paper takes a broader look at change queries that I think is well written.

At a conceptual level

There are different forms of consuming changes:

  • Incremental change queries are SQL queries that return the changes that have occurred between two points in time:

    • CDC queries: Consume the latest changes with the operation type, such as insert, update-before, update-after and delete, as a column. For updates, update-before contains the original row and update-after contains the updated row. This allows a CDC reader to know how a row was changed. This is known as the before/pre image and after/post image of a row. 

    • Latest-state queries: Consume the latest inserted/updated rows without the historical before image. This may be limited to only the new rows that were inserted (append-only change queries), or both the new and updated rows (upsert change queries). Deletes by definition cannot be read using this technique, but they can be reflected by simply not returning a row that might otherwise have been returned.

  • Materialized view maintenance: a compute engine uses a table format’s change tracking capabilities to maintain a derived table by periodically consuming changes made to the source table, and applying them to the derived table. 

Non-CDC change queries

We’ll use the following three commands to try and make this concepts section a little more concrete. Let’s image the following three commands are executed on a table resulting in three table commits:

INSERT INTO FavFruit VALUES (‘jack’, ‘apple’), (sarah, ‘orange’), (‘john’, ‘pineapple’)
UPDATE FavFruit SET fruit = ‘banana’ WHERE name = ‘jack’
DELETE FROM FavFruit WHERE name = ‘john’

Which of the following should be returned, for an incremental non-CDC change query that covers the period of these three commits?

+-----+---------+
| name|    fruit|
+-----+---------+
| jack|    apple|
|sarah|   orange|
| john|pineapple|
+-----+---------+
Fig 1. Corresponding to the inserts (insert-only) of the period.

+-----+---------+
| name|    fruit|
+-----+---------+
| jack|   banana|
|sarah|   orange|
| john|pineapple|
+-----+---------+
Fig 2. Corresponding to the inserts and updates (upserts only) of the period.

+-----+---------+
| name|    fruit|
+-----+---------+
| jack|   banana|
|sarah|   orange|
+-----+---------+
Fig 3. Corresponding to the inserts, updates and deletes (all) of the period.  The deletes are honored by not returning rows that were inserted and deleted in the query time period.

There is no right answer; it depends on the use case, but it’s worth thinking about. Different table formats take different approaches.

CDC results, one row or two?

Inserts and deletes only consist of a single change; either the row was added or deleted. However, in CDC, every row update has a before and after image. There are different approaches to how updates are returned in CDC change queries:

  1. Combine row changes into a single event/row with the before and after images as separate fields/columns. This is how Debezium does it.

  2. Make the before and after image into separate rows, with a change type of update-before and update-after. This is how Delta does it.

  3. Make the before and after images into separate rows, with a change type of delete and insert. To differentiate between an update and a delete-then-insert, another column can be used to indicate if this was an update or not. This is how Snowflake does it.

+-----+---------+----------------+---------------+--------------------+
| name|    fruit|    _change_type|_commit_version|   _commit_timestamp|
+-----+---------+----------------+---------------+--------------------+
| jack|    apple|          insert|              1|2024-09-19 13:53:...|
|sarah|   orange|          insert|              1|2024-09-19 13:53:...|
| john|pineapple|          insert|              1|2024-09-19 13:53:...|
| jack|    apple| update_preimage|              2|2024-09-19 13:53:...|
| jack|   banana|update_postimage|              2|2024-09-19 13:53:...|
| john|pineapple|          delete|              3|2024-09-19 13:53:...|
+-----+---------+----------------+---------------+--------------------+
Fig 4. Shows a CDC change query result using Spark and Delta Lake, covering
the three commits. Delta returns two rows for the update in commit 2.

Complete or minimized?

The results of a change query may be complete or minimized:

  • A complete change log: Consume all changes, even if they cancel each other out. E.g a row is inserted, then deleted within the query time bounds. These are known as redundant changes.

  • A minimized change log: You consume a minimized set of changes where redundant changes are eliminated. An incremental read covers a time period (a number of commits in the log of deltas/snapshots), and the changes are minimized covering that period.

+-----+------+------------+---------------+--------------------+
| name| fruit|_change_type|_commit_version|   _commit_timestamp|
+-----+------+------------+---------------+--------------------+
| jack|banana|      insert|              3|2024-09-19 13:55:...|
|sarah|orange|      insert|              3|2024-09-19 13:55:...|
+-----+------+------------+---------------+--------------------+
Fig 5. Shows the same three commits as the previous figure but
minimized. The insert and update of the ‘jack’ row is represented
as a single insert row but with the updated state. The ‘john’
row is absent as its insertion and deletion were redundant.

Many ways of returning change data

The figure below shows five different forms of a change query result based on the same underlying operations. These are just a sample of the possible outputs that a change query could theoretically return.

Fig 6. Five examples of change events produced from a set of three operations.

Framing this analysis with four canonical use cases

The Snowflake engineering paper, What’s the Difference? Incremental Processing with Change Queries in Snowflake, names the use cases they support as append-only and min-delta. I will use the same terms and add two more:

  • append-only: Return inserted rows covering the query time period.

  • upsert: Return inserted and updated rows (without the before image) covering the query time period. This may or may not return rows that were deleted in the time period; this will depend on the table format.

  • min-delta: Minimized CDC results covering the query time period.

  • full-delta: Complete CDC results, including redundant changes, covering the query time period.

All later per-table format analyses will frame their capabilities in terms of these four change query use cases. It’s worth noting here that what the table formats offer doesn’t always nicely fit into these four categories, but it allows us to compare to a simple baseline.

Different categories of implementation across the table formats

Each table format has a different underlying design and therefore takes a slightly different approach to the four change query types. However, we can broadly classify the CDC query implementations as follows.

Materialize CDC files on write

Writers materialize change events in special change files that get added alongside the regular data and delete files in each commit. These CDC files form part of the table and its history. Regular table readers ignore the CDC files, but CDC readers only read the CDC files when they can. If a commit only inserts or only deletes data files, then these operations can also serve as CDC files. For example, if an operation adds a new data file that contains only inserts (and no mutations of existing rows), then a CDC reader can read that data file and treat all rows as inserts.

Fig 7. Non-CDC incremental readers tail the log of deltas/snapshots to learn of data files to read. CDC readers also tail the log, but to discover CDC files to read.

  • Pros: Efficient for CDC readers.

  • Cons:

    • More storage required.

    • Adds overhead to writers as additional files must be written.

Infer-CDC-on-read

Readers must compare each commit to the prior one to extract the row-level changes of each commit.

Fig 8. A CDC reader must compute the difference between adjacent snapshots to infer the row-level changes.

  • Pros: No overhead to writers.

  • Cons: Depending on the implementation, this can be IO and computationally expensive (and slow).

Row-level lineage/tracking

This is the approach taken in the Snowflake paper, and seems well suited to copy-on-write (COW) tables that need to differentiate between inserted/updated rows and carry-over rows (rows that were carried over from the original data file unchanged).

Writers add metadata to each row, that tells a compute engine if this is a new row or an updated row. It also contains enough information for the compute engine to find the original row, for comparison if it wants to generate a full CDC result. Row-lineage requires that rows be identified with a row id, even if the table format doesn’t explicitly support primary keys.

  • Pros:

    • Low storage overhead compared to CDC file materialization.

    • Useful for MV maintenance that doesn’t need a before-image.

  • Cons

    • Full CDC still requires additional work for the reader to go and figure out the before image of updates and deletes.

    • Seems that merge-on-read reduces the need for this as no carry-over rows are written to data files and positional delete files can be compared between commits (at lower cost than data file comparisons) to indicate the location of newly invalidated rows.

Next, a dive into each table format

The subject of tracking changes and emitting a stream of changes is nuanced because there are multiple forms of incrementally consuming changes and many implementations. I will cover much of this from a relatively high level, covering the four table formats I have been writing about: Apache Iceberg, Delta Lake, Apache Hudi and Apache Paimon.

These table-format specific deep dives will be posted to the Analysis section of my blog where I put my more detailed work, which often is split over multiple parts.

Change query deep dives:

  • Apache Iceberg (coming next)

  • Delta Lake (coming soon)

  • Apache Hudi (coming soon)

  • Apache Paimon (coming soon)

Once everything has been released, I’ll add a condensed summary to this post, covering the support that each table format provides at the time of writing. That way this post should suffice alone for people not interested in understanding the internals of the table formats in gory detail.

All this stuff is changing fast, and I expect a lot to change over the coming months and years. Whether a format is ahead or behind is likely to change over time as they slowly converge in terms of capabilities. But we’ll just have to see.