Change query support in Apache Hudi (0.15)

This is the Apache Hudi deep dive associated with the table format comparisons - Change queries and CDC blog post, which looks at how each table format supports change queries tables, including full CDC. This is not a how-to guide but an examination of Apache Hudi capabilities through the lens of its internal design.

This analysis is based on the current 0.15 release. v1.0 is in beta at the time of writing and I haven’t seen anything that changes between 0.15 and 1.0 regarding what is covered in this post.

Preamble

In the parent blog post, I described four canonical change query types:

  • Latest-state

    • append-only: Return all inserted rows covering the query time period.

    • upsert: Return all inserted and updated rows (without the before image) covering the query time period. This may or may not return rows that were deleted in the time period; this will depend on the table format.

  • CDC

    • min-delta: Minimized CDC results covering the query time period.

    • full-delta: Complete CDC results, including redundant changes, covering the query time period.

I took the append-only and min-delta names from this incremental processing Snowflake paper that describes how Snowflake change queries work. Upsert and full-delta are additional types that are not supported by Snowflake but have some support among the table formats. We’ll be framing the Hudi capabilities in terms of those four change query types.

In the parent post, I also covered the three main approaches to CDC support:

  • Materialize CDC files on write

  • Infer-CDC-on-read

  • Row-lineage (aka row tracking)

Apache Hudi supports incremental latest-state reads, called incremental queries, and incremental CDC reads, known as incremental CDC queries.

Some relevant internals background

As with the Apache Iceberg and Delta Lake posts, it’s useful to cover some relevant aspects of Apache Hudi internals to understand the change query support.

It may be helpful to revisit some of my previous posts on Hudi:

* How do the table formats represent the canonical set of files?

* Append-only tables and incremental reads.

* Apache Hudi’s consistency model (focuses on COW only)

The timeline, file groups and file slices

Hudi operations are committed to a log of deltas, called the timeline, ordered by timestamp. It is something similar to Delta Lake’s delta log. Each commit in the timeline has a unique timestamp and this timestamp is used in multiple locations across the metadata files, data files and even at the row level.

Hudi bakes primary keys into its design, and it shards data by key across a number of static or dynamic file groups. Any given file group is a set of layered base and log files (though log files are only used in merge-on-read). Each base and log file name includes the commit timestamp of the commit that created it. These layered files are grouped into file slices, and that grouping depends on whether it is a copy-on-write (COW) table or a merge-on-read (MOR) table.

A file slice in a COW table is a single base file. Each write that touches a given file group causes the current base file to be rewritten as a new base file with the changes applied.

A file slice in a MOR table is a single base file and any number of log files layered on top, ordered by commit timestamp. Log files can be data or delete log files.

Let’s look at a series of three commits, and how that would be represented in a COW and a MOR table with one file group.

Fig 1. The three commits used throughout this series, and the expected table scans executed at different timestamps.

This would translate to COW and MOR as follows.

Fig 2. Depicts the same three operations on a file group, either with COW or MOR. The MOR table also depicts a compaction producing a new file slice consisting of a base file.

Understanding this sharding by key across file groups and layering within file groups is critically important to understanding Hudi.

Incremental table scans

The Hudi timeline allows a compute engine to incrementally process each timeline instant by reading the instant entries in timestamp order. There are different types of instant and also different action states.

Write operations produce Commit instants in COW tables and Delta Commit instants in MOR tables. Each instant consists of a number of states, each a separate instant file in the timeline, with requested -> inflight -> completed. The completed instant files include the base and log files that were added by the commit.

Fig 3. Depicts a timeline with three file groups in the data layer.

Row-tracking

Hudi implements a kind of row tracking that allows it to perform efficient incremental queries. Each row has a number of metadata columns, including:

  •  _hoodie_commit_time that denotes the commit timestamp when the row was added.

  • _hoodie_commit_seqno that acts like a kind of offset of the row within a commit. This allows an incremental reader to resume reading from a specific position within a commit.

This allows an incremental reader to filter out rows added (inserted or updated) with commit timestamps outside of the query range.

Fig 4. Depicts the row_commit_ts metadata column in a COW table file group. It also works the same way in MOR tables. Soft deletes in Hudi are upserts with the columns nulled out. A hard-delete is a delete operation that removes the row. Both types of delete are treated the same way by reads.

That’s enough technical background, let’s get into the change query support.

Append-only and upsert change queries

An incremental query is one using the option 'hoodie.datasource.query.type': 'incremental'.

There are two types of incremental queries in Hudi, configured via the config hoodie.datasource.query.incremental.format, with two values:

  • latest_state

  • cdc

This section covers latest_state queries.

Incremental latest_state queries are upsert change queries, and provide no append-only support. However, the upsert support that it provides can’t be faulted, if you want to be able to perform incremental upsert change queries, then Hudi has it covered.

As already explained, Hudi writes the commit timestamp at the row level, allowing the filtering of rows by commit timestamp ranges. It doesn’t matter if the table is COW or MOR, the same row-level filtering is performed. In a COW table, the reader will scan the base files of the commits within the range, applying the row level filter. 

With a MOR table, only the base and log files of each file group within the timestamp range of the query are consulted. This can mean a query may only touch a base file, or only log files or both. Within a single file slice, it’s possible that a single primary key could have been updated multiple times, appearing in the base file and multiple log files. If the reader simply returns any rows that match the commit timestamp range, then it can return multiple versions of the same row.

This may or may not be desirable, so Hudi provides the hoodie.datasource.merge.type config, with values:

  • skip_merge: Just return the rows as they are found.

  • payload_combine: Merge the rows such that there is only one row per primary key. The merge chooses the most recent row.

As I have done with previous posts, I’ll use Apache Spark to run some commands.

Hudi wants a timestamp field as it uses that to deduplicate on write, plus it also needed a partition column, so I added dummy values for those fields.

This is a code snippet using Pyspark with Hudi 0.15.

# set some generic options
hudi_options = {
       …
       'hoodie.datasource.write.recordkey.field': 'name',
       'hoodie.datasource.write.partitionpath.field': 'part',
       'hoodie.datasource.write.precombine.field': 'ts',
       'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
   }

# upsert and delete specific options
upsert_options = hudi_options.copy()
upsert_options.update({'hoodie.datasource.write.operation': 'upsert'})
delete_options = hudi_options.copy()
delete_options.update({'hoodie.datasource.write.operation': 'delete'})

# Do the inserts (commit 1)
inputdf1 = spark.createDataFrame([
   Row(name='jack', fruit='apple', part='a', ts=1),
   Row(name='sarah', fruit='orange', part='a', ts=1),
   Row(name='john', fruit='pineapple', part='a', ts=1)
])

inputdf1.coalesce(1).write.format("hudi") \
   .options(**upsert_options) \
   .mode("overwrite") \
   .save(path)

# Do the update (commit 2)
data = [
   {"name": "jack", "fruit": "banana", "part": "a", "ts": 2}
]
update_df = spark.createDataFrame([Row(**x) for x in data])
update_df.write.format("hudi") \
   .options(**upsert_options) \
   .mode("append") \
   .save(path)

# Do the delete (commit 3)
delete_df = output_df1 = spark.read.format("hudi").load(path).filter("name == 'john'")
delete_df.write.format("hudi") \
   .options(**delete_options) \
   .mode("append") \
   .save(path)

We now have executed the same three commits we used as examples above. Now let’s run some incremental queries with different configurations to see what results are returned.

Config 1: COW, covering all three commits

First we’ll execute an incremental query that covers all three commits in a COW table. We set the start timestamp at 0 with no end timestamp.

inc_read_options = hudi_options.copy()
inc_read_options.update({'hoodie.datasource.query.type': 'incremental'})
inc_read_options.update({'hoodie.datasource.read.begin.instanttime': 0})

inc_read_df = spark.read.format("hudi"). \
 options(**inc_read_options). \
 load(path)
inc_read_df.show()

+-------------------+--------------------+------------------+----------------------+--------------------+-----+------+----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| name| fruit|part| ts|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+------+----+---+
|  20240926152906313|20240926152906313...|             sarah|                     a|0fd398f9-f8c4-431...|sarah|orange|   a|  1|
|  20240926152914622|20240926152914622...|              jack|                     a|0fd398f9-f8c4-431...| jack|banana|   a|  2|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+------+----+---+

Notice that because the incremental query covers the entire table’s history, it returns the current state of the table, i.e. the same results as a batch query.

Config 2: COW, two queries covering the first commit, then the remaining two

First we run a query covering only the timestamp of the first commit.

…
inc_read_options.update({'hoodie.datasource.read.begin.instanttime': 0})
inc_read_options.update({'hoodie.datasource.read.end.instanttime': ts1})
…
+-------------------+--------------------+------------------+----------------------+--------------------+-----+---------+----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| name|    fruit|part| ts|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+---------+----+---+
|  20240926152906313|20240926152906313...|             sarah|                     a|0fd398f9-f8c4-431...|sarah|   orange|   a|  1|
|  20240926152906313|20240926152906313...|              john|                     a|0fd398f9-f8c4-431...| john|pineapple|   a|  1|
|  20240926152906313|20240926152906313...|              jack|                     a|0fd398f9-f8c4-431...| jack|    apple|   a|  1|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+---------+----+---+

This time we see the three inserts of the first commit.

Next we run the second query, covering the timestamps of the remaining two commits.

...
inc_read_options.update({'hoodie.datasource.read.begin.instanttime': ts2})
inc_read_options.update({'hoodie.datasource.read.end.instanttime': ts3})
...

+-------------------+--------------------+------------------+----------------------+--------------------+----+-----+----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name|name|fruit|part| ts|
+-------------------+--------------------+------------------+----------------------+--------------------+----+-----+----+---+
|  20240926152914622|20240926152914622...|              jack|                     a|0fd398f9-f8c4-431...|jack|banana|   a|  2|
+-------------------+--------------------+------------------+----------------------+--------------------+----+-----+----+---+

This time we see only the Jack row. This is because the file slice covering the last commit includes the delete of the John row, and so it is not returned. This means with a COW table, deletes are honored in incremental queries.

Config 3: MOR, skip_merge, covering all three commits

The merge type skip_merge tells Hudi to just return all the rows that match the commit timestamp range of the incremental query. Remember that the merge type is only relevant to MOR tables.

hudi_options = {
	…
      'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
	…
}
…
inc_read_options = hudi_options.copy()
inc_read_options.update({'hoodie.datasource.query.type': 'incremental'})
inc_read_options.update({'hoodie.datasource.read.begin.instanttime': 0})
inc_read_options.update({'hoodie.datasource.merge.type': 'skip_merge'})

inc_read_df = spark.read.format("hudi"). \
 options(**inc_read_options). \
 load(path)
inc_read_df.show()

+-------------------+--------------------+------------------+----------------------+--------------------+-----+---------+----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| name|    fruit|part| ts|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+---------+----+---+
|  20240926154313714|20240926154313714...|             sarah|                     a|905979dd-f909-4fa...|sarah|   orange|   a|  1|
|  20240926154313714|20240926154313714...|              john|                     a|905979dd-f909-4fa...| john|pineapple|   a|  1|
|  20240926154313714|20240926154313714...|              jack|                     a|905979dd-f909-4fa...| jack|    apple|   a|  1|
|  20240926154319529|20240926154319529...|              jack|                     a|905979dd-f909-4fa...| jack|   banana|   a|  2|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+---------+----+---+

Two interesting points here:

  • We see that there are two Jack rows corresponding to the insert and the update.

  • The John row is also included, despite having been deleted.

This is because the incremental query has been configured to return rows “as is”.

Config 4: MOR, payload_combine, covering all three commits

The merge type payload_combine tells Hudi to merge the rows that match the commit timestamp range, such that only one row per primary key is returned.

inc_read_options.update({'hoodie.datasource.query.type': 'incremental'})
inc_read_options.update({'hoodie.datasource.read.begin.instanttime': 0})
inc_read_options.update({'hoodie.datasource.merge.type': 'payload_combine'})
...

+-------------------+--------------------+------------------+----------------------+--------------------+-----+------+----+---+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| name| fruit|part| ts|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+------+----+---+
|  20240926154313714|20240926154313714...|             sarah|                     a|905979dd-f909-4fa...|sarah|orange|   a|  1|
|  20240926154319529|20240926154319529...|              jack|                     a|905979dd-f909-4fa...| jack|banana|   a|  2|
+-------------------+--------------------+------------------+----------------------+--------------------+-----+------+----+---+

Notice that:

  1. The results have been merged to remove duplicates.

  2. The John row is not included. This is because the John key in the delete log block was merged with the John row of the base file.

Merging the results of incremental queries over MOR tables doesn’t just eliminate duplicates, it also applies deletes to the results.

Summary

Apache Hudi provides good upsert change query support, with the following properties:

  • COW tables: The query returns the latest state of all rows that were inserted, updated or deleted in the time period, without any duplicates. Deletes are honored by not returning rows that were deleted within the query time period.

  • MOR tables:

    • Unmerged results return all rows as they are found in data log files, so duplicates (different versions of the same row) can be returned. Deleted rows can be returned if the row was either added or updated inside the timestamp range of the query.

    • Merged results have the same behavior as COW incremental queries.

min-delta/full-delta change queries

Apache Hudi supports CDC file materialization like Delta Lake, but it also provides some infer-CDC-on-read capabilities too. This effectively gives the user a dial between  infer-CDC-on-read and materialize-CDC-on-write. Additionally, CDC file materialization can be generated synchronously by the writer or asynchronously by a compaction job.

Apache Hudi uses the Debezium CDC format, where it places the before and after image of a row in the same change row. Where Delta represents an update as two change rows with update_preimage and update_postimage operation types, Hudi just has an operation of U, with the pre and post image in the same row.

We’ll start by looking at the various levels of inference vs materialization.

Inference vs materialization

I will refer to the “writer” materializing CDC files, but this writer could be the one doing the table write, or it could be a compaction job, depending on the configuration.

A Hudi table will materialize CDC data when the hoodie.table.cdc.enabled config is set to true. There are multiple levels of CDC file materialization that are controlled by the config hoodie.table.cdc.supplemental.logging.mode

This config has the following values and corresponding writer behavior:

  • DATA_BEFORE_AFTER: CDC files are created that contain the primary key, operation (I, U, D), full before and after images of the inserted/updated/deleted rows of the commit.

  • DATA_BEFORE: The CDC file includes the primary key, operation (I, U, D), but only the before images of rows.

  • OP_KEY_ONLY: The CDC file only includes the primary keys and the associated operations (I, U, D).

The supplemental logging mode determines the level of detail in CDC files, but CDC files aren’t always created. If a commit only logically deletes a base file, then all the rows in the logically deleted base file can be treated as deletes. If a commit only adds a base file, then all rows in the new base file are treated as inserts.

The storage cost of each of these levels of materialization gets cheaper as we go down through the list from DATA_BEFORE_AFTER to OP_KEY_ONLY. But while the storage cost goes down, the read cost goes up. The level of detail in the CDC files determines how much work CDC readers must perform to produce the same incremental CDC query result.

Incremental CDC readers walk the timeline and for each commit they must figure out how to produce a CDC result for that commit, based on the type of commit and the supplemental logging mode. When a CDC reader inspects a commit, it classifies it’s approach as one of the following inference cases:

  • AS_IS - read a CDC file (as there is a CDC file to read).

  • BASE_FILE_INSERT - treat all rows in a base file as inserts.

  • BASE_FILE_DELETE - treat all rows in the base file of the previous commit as deletes.

  • LOG_FILE - brute force comparison between a log file, and the file slice of the prior commit.

  • REPLACE_COMMIT - treat all rows of the file group in its state in the previous commit as deletes.

Let’s look at an example of three commits and how that is materialized as CDC files according to the supplemental logging modes.

The commits as always in this series are:

  1. INSERT INTO FavFruit VALUES (‘Jack’, ‘Apple’), (‘Sarah’, ‘Orange’), (‘John’,  ‘Pineapple’)

  2. UPDATE FavFruit SET fruit = ‘Banana’ WHERE name = ‘Jack’

  3. DELETE FROM FavFruit WHERE name = ‘John’

It doesn’t matter if these are actually done via a batch based SQL or a set of streaming upserts.

Supplemental logging mode DATA_BEFORE_AFTER

The CDC reader treats the base file created by the first commit as all inserts, then reads the CDC files for the remaining two commits.

Fig 5. A CDC reader reads a mix of base files and CDC files to return CDC query results.

Supplemental logging mode DATA_BEFORE

This time the CDC reader must do some extra work to produce CDC data for the second commit, in order to obtain the after image of the row.

Fig 6. A CDC reader reads a mix of base files and CDC files to return CDC query results. It must source the after image of the row in commit ts=2 from the file slice of the current commit (at that time, being ts=2).

Supplemental logging mode OP_KEYS_ONLY

This time the CDC reader must do some extra work for the second and third commits. For the second commit, it must load the file slice at the current and previous commits to get the before and after image. For the third commit, a delete, it only needs the before image, sourced from the file slice at the previous commit.

Fig 7. With OP_KEY_ONLY, the reader must load the file slice at the current and previous commit for the before and after images.

CDC inference case logic

This CDC reading logic is as follows (it ignores some nuance around other types of commits such as compactions that only rearrange data not change it logically):

  • If no CDC file exists for the commit

    • If the commit added a base file

      • If the commit added an empty base file as a delete commit

        • Known as the BASE_FILE_DELETE inference case.

        • Treat all rows in the previous file slice as deleted

      • If the commit was one that only inserted rows (known from the column stats)

        • Known as the BASE_FILE_INSERT inference case.

        • Treat all rows in the current file slice as inserted.

    • If the commit added a log file

      • Known as the LOG_FILE inference case. This is discussed in more detail below.

      • The reader must load the file slice as of the previous commit. 

      • Go row by row through the log file, doing a diff between it and the loaded file slice of the prior commit.

  • Else (one or more CDC files exist for the commit)

    • Known as AS_IS inference cases.

    • If supplemental logging mode is DATA_BEFORE_AFTER

      • Read the CDC file.

    • If supplemental logging mode is DATA_BEFORE

      • Load the current file slice.

      • Go row-by-row through the CDC file, and add the missing after image of each row, sourcing it from the loaded file slice.

    • Else it’s OP_KEY_ONLY

      • The reader must load the file slice as of the previous commit.

      • Also load the file slice of this commit.

      • Go key-by-key through the CDC file and add the before and after image of each key using the prior and current states of the file slice.

Whether you choose DATA_BEFORE_AFTER, DATA_BEFORE or OP_KEY_ONLY, the CDC query result is the same, only the amount of work to produce the result changes. This is why I refer to Hudi as having a kind of dial.

The infer-CDC-on-read case with Flink

The LOG_FILE inference case is not typical because a table with CDC enabled will have materialized CDC files to read, so there is no need for this pure brute force diffing technique. However, Flink can be configured to implement incremental CDC queries without CDC files when using MOR tables. In this case, every inference case is either BASE_FILE_INSERT, BASE_FILE_DELETE or LOG_FILE and never AS_IS.

This means that the dial can be set all the way to the infer-CDC-on-read end of the spectrum with Flink. When the option READ_CDC_FROM_CHANGELOG is true, Flink will use any materialized CDC files, when false, it will not use CDC files at all and use the brute force comparison approach.

Synchronous vs asynchronous CDC file materialization

When table writes are executed as a batch table scan, then modification, then write, the writer knows the before and after images of all the affected rows. Therefore it can simply materialize the CDC files synchronously during the write itself.

But in the case of streaming upserts, the current value of a row may not be known and so materializing a CDC file is not possible. This is the case with Flink. To deal with this, CDC files are written by the compaction jobs. When a compaction of a file group is run, the compactor reads the current base file and all its associated log files, providing it all the information it needs to write out the CDC files.

The problem with writing CDC files during compaction is the additional lag it imposes on CDC readers. This is why Flink can be configured to not read from CDC files but only using the BASE_FILE_INSERT, BASE_FILE_DELETE or LOG_FILE inference cases.

Let’s run some code to try out Hudi CDC reads

I’ll use Spark as before. We need two table properties:

'hoodie.table.cdc.enabled': 'true',
'cdc.supplemental.logging.mode': 'DATA_BEFORE_AFTER'

It doesn’t actually matter what supplemental logging mode we use, that only affects how the CDC reader produces the result, not the result we will see. 

We’ll also need the following options for reading:

inc_read_options.update({'hoodie.datasource.query.type': 'incremental'})
inc_read_options.update({'hoodie.datasource.query.incremental.format': 'cdc'})

One incremental CDC query covering all commits

+---+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|op |ts_ms            |before                                                                                                                                                                                                                                                                                                                 |after                                                                                                                                                                                                                                                                                                 |
+---+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|i  |20240927124038137|NULL                                                                                                                                                                                                                                                                                                                   | |
|i  |20240927124038137|NULL                                                                                                                                                                                                                                                                                                                   |{"fruit":"pineapple","_hoodie_record_key":"john","_hoodie_partition_path":"a","_hoodie_file_name":"5c2cb74d-6527-402a-bf31-93927ab2ead2-0_0-23-18_20240927124038137.parquet","_hoodie_commit_seqno":"20240927124038137_0_1","part":"a","name":"john","_hoodie_commit_time":"20240927124038137","ts":1}|
|i  |20240927124038137|NULL                                                                                                                                                                                                                                                                                                                   |{"fruit":"apple","_hoodie_record_key":"jack","_hoodie_partition_path":"a","_hoodie_file_name":"5c2cb74d-6527-402a-bf31-93927ab2ead2-0_0-23-18_20240927124038137.parquet","_hoodie_commit_seqno":"20240927124038137_0_2","part":"a","name":"jack","_hoodie_commit_time":"20240927124038137","ts":1}    |
|u  |20240927124044246|{"_hoodie_commit_time": "20240927124038137", "_hoodie_commit_seqno": "20240927124038137_0_2", "_hoodie_record_key": "jack", "_hoodie_partition_path": "a", "_hoodie_file_name": "5c2cb74d-6527-402a-bf31-93927ab2ead2-0_0-23-18_20240927124038137.parquet", "name": "jack", "fruit": "apple", "part": "a", "ts": 1}    |{"_hoodie_commit_time": "20240927124044246", "_hoodie_commit_seqno": "20240927124044246_0_1", "_hoodie_record_key": "jack", "_hoodie_partition_path": "a", "_hoodie_file_name": "5c2cb74d-6527-402a-bf31-93927ab2ead2-0", "name": "jack", "fruit": "banana", "part": "a", "ts": 2}                    |
|d  |20240927124045546|{"_hoodie_commit_time": "20240927124038137", "_hoodie_commit_seqno": "20240927124038137_0_1", "_hoodie_record_key": "john", "_hoodie_partition_path": "a", "_hoodie_file_name": "5c2cb74d-6527-402a-bf31-93927ab2ead2-0_0-23-18_20240927124038137.parquet", "name": "john", "fruit": "pineapple", "part": "a", "ts": 1}|NULL                                                                                                                                                                                                                                                                                                  |
+---+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

We see that this conforms to the full-delta change query type. No minimization has occurred despite covering the entire set of commits of the table.

Two incremental CDC queries, first commit only, then last two commits

Running two incremental CDC queries over the same period just returns the same rows when unioning the result of the two queries.

The first query covers the timestamps up to and including the first commit.

+---+-----------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|op |ts_ms            |before|after                                                                                                                                                                                                                                                                                                 |
+---+-----------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|i  |20240927124953484|NULL  | |
|i  |20240927124953484|NULL  |{"fruit":"pineapple","_hoodie_record_key":"john","_hoodie_partition_path":"a","_hoodie_file_name":"01e7b7a7-d14a-46ac-b0fc-171b5c2babda-0_0-23-18_20240927124953484.parquet","_hoodie_commit_seqno":"20240927124953484_0_1","part":"a","name":"john","_hoodie_commit_time":"20240927124953484","ts":1}|
|i  |20240927124953484|NULL  |{"fruit":"apple","_hoodie_record_key":"jack","_hoodie_partition_path":"a","_hoodie_file_name":"01e7b7a7-d14a-46ac-b0fc-171b5c2babda-0_0-23-18_20240927124953484.parquet","_hoodie_commit_seqno":"20240927124953484_0_2","part":"a","name":"jack","_hoodie_commit_time":"20240927124953484","ts":1}    |
+---+-----------------+------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

The second query covers the timestamps covering the update and delete commits.

+---+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|op |ts_ms            |before                                                                                                                                                                                                                                                                                                                 |after                                                                                                                                                                                                                                                                             |
+---+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|u  |20240927124959217|{"_hoodie_commit_time": "20240927124953484", "_hoodie_commit_seqno": "20240927124953484_0_2", "_hoodie_record_key": "jack", "_hoodie_partition_path": "a", "_hoodie_file_name": "01e7b7a7-d14a-46ac-b0fc-171b5c2babda-0_0-23-18_20240927124953484.parquet", "name": "jack", "fruit": "apple", "part": "a", "ts": 1}    |{"_hoodie_commit_time": "20240927124959217", "_hoodie_commit_seqno": "20240927124959217_0_1", "_hoodie_record_key": "jack", "_hoodie_partition_path": "a", "_hoodie_file_name": "01e7b7a7-d14a-46ac-b0fc-171b5c2babda-0", "name": "jack", "fruit": "banana", "part": "a", "ts": 2}|
|d  |20240927125000768|{"_hoodie_commit_time": "20240927124953484", "_hoodie_commit_seqno": "20240927124953484_0_1", "_hoodie_record_key": "john", "_hoodie_partition_path": "a", "_hoodie_file_name": "01e7b7a7-d14a-46ac-b0fc-171b5c2babda-0_0-23-18_20240927124953484.parquet", "name": "john", "fruit": "pineapple", "part": "a", "ts": 1}|NULL                                                                                                                                                                                                                                                                              |
+---+-----------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

CDC summary

CDC materialization can be done at different levels of detail and either synchronously or asynhcornously. 

  1. Writers who perform batch operations that involve a scan phase write the CDC files (just like Delta Lake). This is how Apache Spark uses Hudi.

  2. Streaming writers do not write CDC files and leave that to the compaction jobs. This means that CDC change queries must wait for compaction jobs in order to consume the latest changes.

The level of detail in the CDC files affects the write and read cost. What is better for writes and storage is worse for readers, and vice-versa.

Conclusions

Current support for Apache Hudi change queries in Flink, Spark and Spark Structured Streaming:

  • append-only: No support.

  • upsert: Yes, works very well and efficiently.

  • min-delta: No support.

  • full-delta: Good support and with configurations that can optimize for writers/storage or for CDC readers.

There are some other features of Hudi I haven’t covered, one is the ability to write CDC streams from Flink directly to Hudi, maintaining the row-level operations as metadata columns. This can then be read until compaction removes this additional metadata. This requires a connector by Ververica that I haven’t had time to play with or read the code of.

Overall the change query support in Hudi is pretty good. The upsert change queries use commit timestamp based row tracking to make them efficient. This places Hudi ahead of Iceberg and Delta at the time of writing. But when Iceberg gets row lineage (row tracking) in v3, it should be able to match Hudi on the upsert change queries. Iceberg will also offer append-only change queries that Hudi can’t (as a Hudi reader can’t tell the difference between an insert or update in the row metadata). Delta which has row tracking, doesn’t yet have support for making use of it in the open source compute engines such as Spark and Flink, but once that is added, then Delta should also be able to match Hudi on the upsert change queries.

Hudi doesn’t support min-delta, which may be an issue for some. Hudi and Delta are pretty evenly matched on the CDC front, though Hudi is more flexible in terms of balancing the competing demands of writer efficiency, space efficiency and CDC reader efficiency. I haven’t tested Hudi at scale, so I don’t know to what degree the supplemental logging mode affects read and write performance.

Apache Paimon is next and is last in this series of deep dives. I actually haven’t looked at Paimon CDC at all yet, so I’m really curious what I’ll find when I start reading the docs, reading the code and running Paimon and Flink to see some CDC results.