This is the Apache Paimon deep dive associated with the table format comparisons - Change queries and CDC blog post, which looks at how each table format supports change queries, including full CDC. This is not a how-to guide but an examination of Apache Paimon capabilities through the lens of its internal design. Note that this post is not about CDC log ingestion but about Paimon’s support for querying the changes of the table itself.
WARNING: This is quite long and detailed, but it turned out that there is a lot to say about Paimon change queries!
Preamble
In the parent blog post, I described four canonical change query types:
Latest-state
append-only: Return all inserted rows covering the query time period.
upsert: Return all inserted and updated rows (without the before image) covering the query time period. This may or may not return rows that were deleted in the time period; this will depend on the table format.
CDC
min-delta: Minimized CDC results covering the query time period.
full-delta: Complete CDC results, including redundant changes, covering the query time period.
I took the append-only and min-delta names from the incremental processing Snowflake paper that describes how Snowflake change queries work. Upsert and full-delta are additional types that are not supported by Snowflake but have some support among the table formats. We’ll be framing the Paimon capabilities in terms of those four change query types.
In the parent post, I also covered the three main approaches to CDC support:
Materialize CDC files on write
Infer-CDC-on-read
Row-lineage (aka row tracking)
Paimon supports incremental latest-state and CDC queries, with Flink and Spark. We’ll be mostly focusing on Flink in this post.
Some relevant internals background
As I have done with each table format so far, we’ll start with some background on the internals. Understanding how Paimon works internally will explain the types of increment latest-state and CDC queries that it supports.
First of all, I recommend reading Understanding Apache Paimon's Consistency Model Part 1 (only part 1) as it describes the high-level internals of Paimon. I will summarize some points in this post, but I recommend reading the above post also.
Incremental table scans
Paimon metadata has a lot in common with Apache Iceberg in that it uses the log of snapshots approach, where each snapshot is the root of a tree of manifest list and manifest files. The biggest structural difference to Iceberg is that each snapshot has two manifest lists:
Base manifest list: lists the manifest files at the time when the operation began.
Delta manifest list: lists the manifests that describe the changes that were made in the operation that created the snapshot.
A Paimon reader can walk the log of snapshots and use the metadata stored in the base and delta manifest lists to perform incremental queries.
Sharding and LSM trees
Paimon shards data into a number of buckets per partition. This can be a fixed number of buckets or dynamic. This is similar to how Apache Hudi shards into file groups.
The focus of this post is the bucket, as reads, compactions, and change queries are all scoped to individual buckets. Each bucket is its own self-contained LSM tree made up of various levels. Paimon writers add data files to level 0 as sorted-runs. Compactors read level 0+ data files and compact them into sorted-runs of larger files at higher levels.
Level 0 can have multiple sorted-runs, whereas higher levels are compacted such that the entire level is a single sorted-run. Two sorted-runs of level 0 can have overlapping keys, but there can be no key overlap between the files of any single sorted-run. Having a single sorted-run per compacted level (level 1+) makes it more efficient to perform key lookups, as it is guaranteed that any given key can only exist in one data file of the level. Data file metadata contains min and max key data that the query planner can use.
Unlike the other table formats, Paimon always encodes the row-level operations (+I, +U, -D) into the rows in data files. Paimon and Flink refer to these operation types as the RowKind. A row-level delete is performed by adding a -D RowKind row to level 0, and a row-level update by adding a +U RowKind row with the new row state. This means that Paimon is a merge-on-read design where a reader must merge the set of row-level operations into a query result, such that each primary key of the input data only has one row in the output data.
There are a number of merge engines, but the one we’ll be focusing on is the standard deduplicate merge engine that ensures that a query only returns one row per primary key. The data is merged in sequence number order (not level order), producing one row per key output. The sequence number is a monotonic counter maintained by the bucket writer and is guaranteed to correctly represent the temporal ordering of operations.
The figure below shows how a series of writes (be it via streaming writes or SQL batch) is translated into some data files at level 0. Note that I will omit sorted-runs from my diagrams from here on to keep it simple.
A reader scans these data files and performs a merge, such that only one row per primary key is returned, using the sequence number field to determine the order.
Compaction
A compaction job has one or more input levels and an output level (the level where the compacted data is written). The input may only be level 0, or a subset of levels, or a full compaction that compacts the entire bucket into the highest level configured. A compaction job reads the data files of the selected levels, merges the data (just as a reader does), and then writes the merged result back as one or more new data files as a single sorted-run in the output level. Once the compaction commits, creating a new snapshot, the original data files still exist, but via the previous snapshot. In the new snapshot, the original data files have been logically deleted and replaced with the newly compacted files.
Compactions can be performed by the bucket writer after performing a commit to level 0, or they can be performed by a dedicated compactor asynchronously.
RowKind as CDC data
There are four operation types of CDC:
Insert (+I)
Update-after (+U)
Update-before (-U)
Delete (-D)
Because Paimon includes the RowKind in each row, CDC readers can get 3 of the 4 CDC operation types for free. However, if you want the update-before image of the row, then there is more work to be done. We’ll cover that in the min/full-delta section further down.
Let’s start looking at the specifics of change query support in Paimon.
Append-only and upsert change queries
I’ll use Flink SQL to demonstrate the change query support in Paimon. The same examples are possible using Spark, but since Paimon was born as the Flink Table Store, I figured we should use Flink for this one. I’m using Paimon 0.8.2 with Flink 1.19.1.
I’m using Flink in batch mode for these examples, but basically, there is no difference between using batch mode and streaming mode, except batch is a bit easier to play with.
First, create a table.
Flink SQL> CREATE CATALOG my_catalog WITH ( 'type'='paimon', 'warehouse'='file:/tmp/paimontest' ); Flink SQL> USE CATALOG my_catalog; Flink SQL> CREATE TABLE fav_fruit ( name STRING PRIMARY KEY NOT ENFORCED, fruit STRING ) WITH ( 'bucket' = '1' ); Flink SQL> SET 'execution.runtime-mode' = 'batch'; Flink SQL> INSERT INTO fav_fruit_normal VALUES('jack', 'apple'), ('sarah', 'orange'), ('john', 'pineapple'); Flink SQL> UPDATE fav_fruit_normal SET fruit = 'banana' WHERE name = 'jack'; Flink SQL> DELETE FROM fav_fruit_normal WHERE name = 'john';
We can run incremental queries using timestamps or snapshot IDs. The following is an example from the Paimon docs:
-- incremental between snapshot ids SELECT * FROM t /*+ OPTIONS('incremental-between' = '12,20') */; -- incremental between snapshot time mills SELECT * FROM t /*+ OPTIONS('incremental-between-timestamp' = '1692169000000,1692169900000') */;
We can confirm which snapshots have been created by querying the ‘table$snapshots’ view:
Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau'; Flink SQL> SELECT * FROM fav_fruit$snapshots; +-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------+--------------------+------------------------+----------------------+ | snapshot_id | schema_id | commit_user | commit_identifier | commit_kind | commit_time | base_manifest_list | delta_manifest_list | changelog_manifest_list | total_record_count | delta_record_count | changelog_record_count | watermark | +-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------+--------------------+------------------------+----------------------+ | 1 | 0 | a1c8bd05-6e9b-466a-abe2-bf7... | 9223372036854775807 | APPEND | 2024-10-07 16:13:01.296 | manifest-list-faf2b42b-2b0c... | manifest-list-faf2b42b-2b0c... | <NULL> | 3 | 3 | 0 | -9223372036854775808 | | 2 | 0 | 3cb0c337-5191-49c8-97f2-eee... | 9223372036854775807 | APPEND | 2024-10-07 16:13:07.144 | manifest-list-40f7354e-3d58... | manifest-list-40f7354e-3d58... | <NULL> | 4 | 1 | 0 | -9223372036854775808 | | 3 | 0 | 8b6e6696-276d-4166-8fb2-c84... | 9223372036854775807 | APPEND | 2024-10-07 16:13:12.911 | manifest-list-ee6c34c0-1908... | manifest-list-ee6c34c0-1908... | <NULL> | 5 | 1 | 0 | -9223372036854775808 | +-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------------------+--------------------------------+-------------------------+--------------------+--------------------+------------------------+----------------------+
There are three APPEND snapshots with snapshot ids 1-3.
Next, we’ll run a normal select statement against the table, to see its state in the current snapshot.
Flink SQL> SELECT * FROM fav_fruit; +-------+--------+ | name | fruit | +-------+--------+ | jack | banana | | sarah | orange | +-------+--------+
This matches the expected result.
Now, let’s run three incremental queries covering the three commits.
Flink SQL> SELECT * FROM fav_fruit /*+ OPTIONS('incremental-between' = '0,1') */; +-------+-----------+ | name | fruit | +-------+-----------+ | jack | apple | | john | pineapple | | sarah | orange | +-------+-----------+ SELECT * FROM fav_fruit /*+ OPTIONS('incremental-between' = '1,2') */; +------+--------+ | name | fruit | +------+--------+ | jack | banana | +------+--------+ SELECT * FROM fav_fruit /*+ OPTIONS('incremental-between' = '2,3') */; Empty set
The first query returns the rows corresponding to the three inserts of the 1st commit. The second query returns the updated row of PK Jack. The third query, covering the commit that added the -D row returned an empty set; this is because we’re running a latest-state query, which naturally omits deleted rows.
So far, what we’re seeing is upsert change query behavior that honors deletes. Paimon does not currently support append-only change queries, though it easily could. However, it does give you the choice to honor or not honor deletes with regular or incremental queries. You can ignore deletes with the option ‘ignore-deletes’.
Flink SQL> SELECT * FROM fav_fruit /*+ OPTIONS('ignore-delete' = 'true') */; +-------+-----------+ | name | fruit | +-------+-----------+ | jack | banana | | john | pineapple | | sarah | orange | +-------+-----------+
Paimon, therefore, gives you upsert change query support with the option to apply or ignore deletes.
Let’s now look at CDC change queries, which this series classifies as min-delta and full-delta.
min-delta/full-delta change queries
Paimon offers two forms of CDC query support:
Delta scan mode: Walk the log of APPEND snapshots. In each snapshot, read the rows of the added data files listed in delta manifest lists, including the RowKind column in the results. This does not include the before-image of the updated rows.
Changelog scan mode: Enable CDC file materialization and have the reader read directly from the materialized CDC files.
No matter which mode we use, we add the suffix “$audit_log“ to the table name to make it a CDC query.
Delta scan mode
As I wrote earlier, Apache Paimon stores the RowKind as a column in the data files. This means that for queries that don’t need the before-image of an update, a reader can walk the log of APPEND snapshots to find the added data files listed in the delta manifest list of each snapshot, including the RowKind in the results.
We can use the existing ‘fav_fruit’ table with its three commits to demonstrate this.
Flink SQL> SELECT * FROM fav_fruit$audit_log; +---------+-------+-----------+ | rowkind | name | fruit | +---------+-------+-----------+ | +U | jack | banana | | -D | john | pineapple | | +I | sarah | orange | +---------+-------+-----------+
As expected, we didn’t get the before-image (-U) of the updated Jack row.
We can incrementally query the table’s audit log as follows.
Flink SQL> SELECT * FROM fav_fruit$audit_log /*+ OPTIONS('incremental-between' = '0,1', 'incremental-between-scan-mode' = 'delta') */; +---------+-------+-----------+ | rowkind | name | fruit | +---------+-------+-----------+ | +I | jack | apple | | +I | john | pineapple | | +I | sarah | orange | +---------+-------+-----------+ Flink SQL> SELECT * FROM fav_fruit$audit_log /*+ OPTIONS('incremental-between' = '1,2', 'incremental-between-scan-mode' = 'delta') */; +---------+------+--------+ | rowkind | name | fruit | +---------+------+--------+ | +U | jack | banana | +---------+------+--------+ Flink SQL> SELECT * FROM fav_fruit$audit_log /*+ OPTIONS('incremental-between' = '2,3', 'incremental-between-scan-mode' = 'delta') */; +---------+------+-----------+ | rowkind | name | fruit | +---------+------+-----------+ | -D | john | pineapple | +---------+------+-----------+
Note that 'incremental-between-scan-mode' = 'delta' is not required, as the query defaults to this mode when there are no CDC files to read.
This is great; we don’t need to materialize any CDC files or do any infer-CDC-on-read work to return these results. Paimon stores the data in this format, so it just returns the rows as they are stored. The way it works under the hood is that it only reads from level 0 (a kind of moving historical level 0 at the time of each APPEND snapshot). Level 0 is uncompacted, so it doesn’t lose any detail. As long as the reader can keep up and doesn’t fall behind the expiry of snapshots, then this is a full-delta change query, albeit without the before-image (-U).
But if we want that before-image (RowKind -U) then we’ll need to enable CDC file materialization.
CDC file materialization and changelog scan mode
Paimon materializes CDC data files on compaction (when configured to). The reason for this is that typically, Paimon writers are streaming sinks that do not have the current row values in memory that are necessary to write the before-images of updates. However, a compaction job can compare the existing values against any new values and produce a set of CDC files that contain the four CDC operation types (RowKinds).
There are three types of compaction:
Level 0 only.
A subset of levels (which may include level 0).
Full compaction (all levels).
To produce CDC files (known as changelog files in Paimon), you must choose from two ‘changelog-producer’ modes:
lookup
full-compaction
We’ll dig into each and afterwards you’ll have an idea of the pros and cons of each.
Changelog-producer: full-compaction
A full compaction reads all the data (and deletion vector) files of all levels of a bucket and writes the merged data as a single sorted-run in the highest configured level. To produce CDC files, the result of the previous full compaction is compared with the result of this current full compaction.
The basic logic works as follows:
Scan all files of all levels of the bucket.
Store the row of each PK in the highest level of the bucket for later comparison (this is the value after the last full compaction).
Merge the entire dataset, ordered by sequence number.
For each primary key, compare the merged row with the last full compaction row to produce a CDC row (or a pair of rows in the case of an update with the before and after images).
Write the new compacted data files to the highest level of the bucket, as well as the CDC files.
Let’s run an experiment and understand the results using our knowledge of Paimon internals.
Let’s create a new table with ‘changelog-producer’ = ‘full-compaction’. We’ll run compaction as a dedicated job so that we have control over when it runs. The ‘write-only’ flag tells the bucket writer not to perform compactions itself.
Flink SQL> CREATE TABLE fav_fruit_full ( name STRING PRIMARY KEY NOT ENFORCED, fruit STRING ) WITH ( 'bucket' = '1', 'changelog-producer' = 'full-compaction', 'write-only' = 'true' );
Now, we’ll execute the same three commits as always:
Flink SQL> INSERT INTO fav_fruit_full VALUES('jack', 'apple'), ('sarah', 'orange'), ('john', 'pineapple'); Flink SQL> UPDATE fav_fruit_full SET fruit = 'banana' WHERE name = 'jack'; Flink SQL> DELETE FROM fav_fruit_full WHERE name = 'john';
Then we run a full-compaction. This will produce CDC files that we can query.
Flink SQL> CALL sys.compact( `table` => 'default.fav_fruit_full', options => 'sink.parallelism=1' );
We can see in the filesystem, that a CDC file has been created:
tree . ├── bucket-0 │ ├── changelog-312a52d8-37cb-4beb-a781-d9af5487738b-1.orc │ ├── data-312a52d8-37cb-4beb-a781-d9af5487738b-0.orc │ ├── data-4ac18514-6058-4018-9160-f397bbe44648-0.orc │ ├── data-5c0b4506-2e9e-4477-8f23-4b9e5024adcf-0.orc │ └── data-d1489900-d1f5-482f-86f8-db0b9bbc56ed-0.orc ├── manifest │ ├── manifest-1638cdaf-a096-4736-b4b2-f8d431b03679-0 │ ├── manifest-23150ca5-3770-4027-a094-1957c442714b-0 │ ├── manifest-8e8b8388-c1b6-4adb-b90b-af22e51401b3-0 │ ├── manifest-cce1fc0e-a48b-4933-a953-33902b37200e-0 │ ├── manifest-cce1fc0e-a48b-4933-a953-33902b37200e-1 │ ├── manifest-list-43c90a85-09e5-4029-be8e-666c79da5b6a-0 │ ├── manifest-list-43c90a85-09e5-4029-be8e-666c79da5b6a-1 │ ├── manifest-list-43c90a85-09e5-4029-be8e-666c79da5b6a-2 │ ├── manifest-list-6b6ec080-4f1c-4ae6-acff-3f53a6fbf091-0 │ ├── manifest-list-6b6ec080-4f1c-4ae6-acff-3f53a6fbf091-1 │ ├── manifest-list-8a2536e8-0bb4-473a-ad63-41efea266056-0 │ ├── manifest-list-8a2536e8-0bb4-473a-ad63-41efea266056-1 │ ├── manifest-list-b252e5c9-a804-4e52-aa78-7c70b902cf04-0 │ └── manifest-list-b252e5c9-a804-4e52-aa78-7c70b902cf04-1 ├── schema │ └── schema-0 └── snapshot ├── EARLIEST ├── LATEST ├── snapshot-1 ├── snapshot-2 ├── snapshot-3 └── snapshot-4
We also see four snapshots, but we can also query the $snapshots view to get the details of those snapshots.
Flink SQL> SELECT * FROM fav_fruit_full$snapshots; +-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------+--------------------+------------------------+----------------------+ | snapshot_id | schema_id | commit_user | commit_identifier | commit_kind | commit_time | base_manifest_list | delta_manifest_list | changelog_manifest_list | total_record_count | delta_record_count | changelog_record_count | watermark | +-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------+--------------------+------------------------+----------------------+ | 1 | 0 | 815725e7-2661-44ad-818b-35c... | 9223372036854775807 | APPEND | 2024-10-08 08:49:16.225 | manifest-list-8a2536e8-0bb4... | manifest-list-8a2536e8-0bb4... | <NULL> | 3 | 3 | 0 | -9223372036854775808 | | 2 | 0 | b38d0071-23bd-4337-8a68-021... | 9223372036854775807 | APPEND | 2024-10-08 08:49:26.399 | manifest-list-6b6ec080-4f1c... | manifest-list-6b6ec080-4f1c... | <NULL> | 4 | 1 | 0 | -9223372036854775808 | | 3 | 0 | b915fe91-9c22-4ea8-944b-7a3... | 9223372036854775807 | APPEND | 2024-10-08 08:49:32.197 | manifest-list-b252e5c9-a804... | manifest-list-b252e5c9-a804... | <NULL> | 5 | 1 | 0 | -9223372036854775808 | | 4 | 0 | 1d6f78af-c752-48a2-b99b-c30... | 9223372036854775807 | COMPACT | 2024-10-08 09:01:52.106 | manifest-list-43c90a85-09e5... | manifest-list-43c90a85-09e5... | manifest-list-43c90a85-09e5... | 2 | -3 | 2 | -9223372036854775808 | +-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------+--------------------+------------------------+----------------------+
There are four snapshots of type: APPEND, APPEND, APPEND, COMPACT as expected.
Let’s query the table’s audit log to see the state based on the current snapshot.
Flink SQL> SELECT * FROM fav_fruit$audit_log; +---------+-------+--------+ | rowkind | name | fruit | +---------+-------+--------+ | +U | jack | banana | | +I | sarah | orange | +---------+-------+--------+
This is a different result from the fav_fruit table, which used no changelog-producer configuration. The [-D John] row is missing from this result.
The reason for this difference is that we’re querying the current state of the table, where the data files containing the first three commits have been logically deleted and replaced by a new compacted data file with the above rows.
If we process the audit log incrementally, we’ll see different results. Let’s run an incremental query, reading the materialized CDC files, covering all three commits.
Flink SQL> SELECT * FROM fav_fruit_full$audit_log /*+ OPTIONS('incremental-between' = ‘0,4', 'incremental-between-scan-mode' = 'changelog') */; +---------+-------+--------+ | rowkind | name | fruit | +---------+-------+--------+ | +I | jack | banana | | +I | sarah | orange | +---------+-------+--------+
Notice that this time, the [Jack, Banana] row is returned with a +I RowKind, and the -D is still missing. This is because the query is reading from the CDC file created by the full compaction. The full-compaction compared the rows of the merged level 0 files, with the empty level 2. The Jack row is net new and, therefore, was logged as an insert. It has been minimized.
What would happen if we incrementally queried only the first three commits?
Flink SQL> SELECT * FROM fav_fruit_full$audit_log /*+ OPTIONS('incremental-between' = '0,3', 'incremental-between-scan-mode' = 'cha ngelog') */; Empty set
It is an empty set because there were no CDC files to read in the first three commits.
When we take into account the CDC file materialization of the full compaction, the previous figure should now be as follows:
The insight here is that when you use ‘changelog-producer’ = ‘full-compaction’, the result of a compaction is a set of CDC files that represent the min-delta of the changes that occurred since the last full-compaction.
This contrasts to Delta Lake, which stores the materialized CDC data without minimizing it because the CDC files are created as part of each commit. Delta Lake is fundamentally batch-oriented (even Spark Structured Streaming is micro-batch) which makes this possible. Apache Hudi also supports write-time materialization in batch use cases but offers compaction-time materialization for streaming cases. Apache Hudi compaction-time CDC materialization will likely result in the same minimization (but I haven’t tested it).
What if you don’t want the materialized CDC data to be minimized? The next option is the ‘lookup’ mode.
Changelog-producer: lookup
This mode ensures that, at the very least, a level 0 compaction is run after each commit. With ‘lookup’ mode, every write will result in two commits: one for the data and one for a compaction. When a compaction is triggered, the number of levels to compact is decided by some heuristics. The triggered compaction could be anything from a level-0-only to a full compaction. The most likely case for any given high-frequency streaming write is for the heuristics to determine that no compaction is necessary yet. However, when using ‘changelog-producer’ = ‘lookup’, if the heuristics say no compaction is required, then a level 0 compaction is forced.
It is only possible to produce CDC files by comparing the merged row data of the input levels with prior data. A full compaction scans all files and naturally has this prior data in memory, but a level 0 compaction will not. The level 0 compaction must perform key lookups to the higher levels to retrieve the current row state of primary keys being modified in the input levels.
The lookup process uses the fact that levels 1+ consist of a single sorted-run per level. A sorted-run comprises one or more data files such that there is no overlap of primary keys across the files. Each data file has metadata that includes the min and max key, and this metadata is maintained in memory. The compactor checks the file metadata level by level to find data files that could contain the key. When a candidate data file is found, it is fetched and cached. These lookups should see cache hits the vast majority of the time. Deletion vector maintenance also requires this same lookup-based compaction.
I’m told the additional cost to writes, in terms of compute, is relatively small for this lookup process (with a full cache). However, the space needed for the cache is the wrinkle, so it may be preferable in some use cases to run less frequent full-compactions instead of lookup compactions after each write.
The actual process of lookup vs full compaction doesn’t change the resulting materialized CDC files - what matters is when compaction is run. Let’s now go through an example using ‘changelog-producer’ = ‘lookup’, which ensures that a compaction is run after every write.
Let’s create a new table.
Flink SQL> CREATE TABLE fav_fruit_lookup ( name STRING PRIMARY KEY NOT ENFORCED, fruit STRING ) WITH ( 'bucket' = '1', 'changelog-producer' = 'lookup' );
Run the same three commits again.
Flink SQL> INSERT INTO fav_fruit_lookup VALUES('jack', 'apple'), ('sarah', 'orange'), ('john', 'pineapple'); Flink SQL> UPDATE fav_fruit_lookup SET fruit = 'banana' WHERE name = 'jack'; Flink SQL> DELETE FROM fav_fruit_lookup WHERE name = 'john';
Now, let’s look at the snapshots that have been created.
Flink SQL> SELECT * FROM fav_fruit_lookup$snapshots; +-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------+--------------------+------------------------+----------------------+ | snapshot_id | schema_id | commit_user | commit_identifier | commit_kind | commit_time | base_manifest_list | delta_manifest_list | changelog_manifest_list | total_record_count | delta_record_count | changelog_record_count | watermark | +-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------+--------------------+------------------------+----------------------+ | 1 | 0 | 608ecfcb-e791-4f13-8056-679... | 9223372036854775807 | APPEND | 2024-10-08 10:07:00.601 | manifest-list-67a5322d-713f... | manifest-list-67a5322d-713f... | <NULL> | 3 | 3 | 0 | -9223372036854775808 | | 2 | 0 | 608ecfcb-e791-4f13-8056-679... | 9223372036854775807 | COMPACT | 2024-10-08 10:07:00.616 | manifest-list-67a5322d-713f... | manifest-list-67a5322d-713f... | manifest-list-67a5322d-713f... | 3 | 0 | 3 | -9223372036854775808 | | 3 | 0 | 9493c29f-88a2-41bc-930d-30b... | 9223372036854775807 | APPEND | 2024-10-08 10:07:06.853 | manifest-list-54951545-0973... | manifest-list-54951545-0973... | <NULL> | 4 | 1 | 0 | -9223372036854775808 | | 4 | 0 | 9493c29f-88a2-41bc-930d-30b... | 9223372036854775807 | COMPACT | 2024-10-08 10:07:06.875 | manifest-list-54951545-0973... | manifest-list-54951545-0973... | manifest-list-54951545-0973... | 4 | 0 | 2 | -9223372036854775808 | | 5 | 0 | d7908231-9a05-438f-a0fb-d8d... | 9223372036854775807 | APPEND | 2024-10-08 10:07:12.434 | manifest-list-25133d65-5fe2... | manifest-list-25133d65-5fe2... | <NULL> | 5 | 1 | 0 | -9223372036854775808 | | 6 | 0 | d7908231-9a05-438f-a0fb-d8d... | 9223372036854775807 | COMPACT | 2024-10-08 10:07:12.460 | manifest-list-25133d65-5fe2... | manifest-list-25133d65-5fe2... | manifest-list-25133d65-5fe2... | 2 | -3 | 1 | -9223372036854775808 | +-------------+-----------+--------------------------------+---------------------+-------------+-------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------+--------------------+------------------------+----------------------+
This time we see six snapshots of type: APPEND, COMPACT, APPEND, COMPACT, APPEND, COMPACT. This is what we expected.
CDC files and compaction upgrades
In the file system, three CDC files have been created that correspond to the three compactions:
> tree . ├── bucket-0 │ ├── changelog-7485fe88-bd0b-4b38-8877-c684fbca0c36-1.orc │ ├── changelog-a0adb0d4-010c-4e2f-a52a-b6c84ce2635b-0.orc │ ├── changelog-a1b9c1d3-44cb-4cc8-a9f7-e66522947bae-0.orc │ ├── data-12388437-0b31-4804-9976-c31622018193-0.orc │ ├── data-4e7332b8-d747-41bf-9fca-65273a14b846-0.orc │ ├── data-7485fe88-bd0b-4b38-8877-c684fbca0c36-0.orc │ └── data-fced19d9-8de3-4fa7-97ff-dc4a597bdc25-0.orc …
This in itself is very interesting to me, having already looked at Delta Lake and Apache Hudi. Delta Lake does not materialize CDC files for commits that only add or logically delete data files. If a commit only adds data files, then the rows in those data files are treated by a CDC reader as inserts. There is no need to duplicate the data in a CDC file. Hudi does something similar with added/deleted base files. It seems that Paimon does not use this optimization; otherwise, it would only have written two CDC files.
The next question is why are there four data files at the end? Shouldn’t there have been six, corresponding to each data file added in an APPEND snapshot plus the three compactions? The reason for this is that a compaction can choose to simply move a file as-is to a higher level; this is known as upgrading.
We can see this if we look at the files created by the first two commits:
Flink SQL> SELECT * FROM fav_fruit_lookup$files /*+ OPTIONS('scan.snapshot-id' = '1') */; +-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+-------------------+--------------------------+-------------------------------+---------------------+---------------------+-------------------------+ | partition | bucket | file_path | file_format | schema_id | level | record_count | file_size_in_bytes | min_key | max_key | null_value_counts | min_value_stats | max_value_stats | min_sequence_number | max_sequence_number | creation_time | +-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+-------------------+--------------------------+-------------------------------+---------------------+---------------------+-------------------------+ | [] | 0 | data-fced19d9-8de3-4fa7-97f... | orc | 0 | 0 | 3 | 700 | [jack] | [sarah] | {fruit=0, name=0} | {fruit=apple, name=jack} | {fruit=pineapple, name=sarah} | 0 | 2 | 2024-10-08 10:07:00.536 | +-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+-------------------+--------------------------+-------------------------------+---------------------+---------------------+-------------------------+ 1 row in set (0.85 seconds) Flink SQL> SELECT * FROM fav_fruit_lookup$files /*+ OPTIONS('scan.snapshot-id' = '2') */; +-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+-------------------+--------------------------+-------------------------------+---------------------+---------------------+-------------------------+ | partition | bucket | file_path | file_format | schema_id | level | record_count | file_size_in_bytes | min_key | max_key | null_value_counts | min_value_stats | max_value_stats | min_sequence_number | max_sequence_number | creation_time | +-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+-------------------+--------------------------+-------------------------------+---------------------+---------------------+-------------------------+ | [] | 0 | data-fced19d9-8de3-4fa7-97f... | orc | 0 | 5 | 3 | 700 | [jack] | [sarah] | {fruit=0, name=0} | {fruit=apple, name=jack} | {fruit=pineapple, name=sarah} | 0 | 2 | 2024-10-08 10:07:00.536 | +-----------+--------+--------------------------------+-------------+-----------+-------+--------------+--------------------+---------+---------+-------------------+--------------------------+-------------------------------+---------------------+---------------------+-------------------------+ 1 row in set (0.98 seconds)
Notice how the data file created by the first commit is listed in the files of the second, but its level was changed from 0 to 5? I didn’t set a maximum level for the table, so it used a default of 5. The first compaction only changed the metadata of the file and left the data itself unchanged.
If you looked at the remaining two compactions by querying the files at those snapshots you would see that:
The data file created by snapshot 3 got moved to level 4 by the subsequent compaction.
Snapshot 5 created a data file in level 0. Then the subsequent compaction actually rewrote the three data files (of level 0, 4, and 5) as a new data file of level 5. It did a full compaction. Despite there being three compactions, only one actually rewrote data files.
The result is that four data files were created, but only one was live in the last snapshot.
It’s so interesting to combine knowledge of internals with visible behaviors, don’t you think?
Continuing the experiment
If we query the current state of the audit log, we see the same as with full-compaction:
Flink SQL> SELECT * FROM fav_fruit_lookup$audit_log; +---------+-------+--------+ | rowkind | name | fruit | +---------+-------+--------+ | +U | jack | banana | | +I | sarah | orange | +---------+-------+--------+
However, things differ when we incrementally query the audit log. Let’s first run an incremental query that covers all 6 snapshots:
SELECT * FROM fav_fruit_lookup$audit_log /*+ OPTIONS('incremental-between' = '0,6', 'incremental-between-scan-mode' = 'changelog') */; +---------+-------+-----------+ | rowkind | name | fruit | +---------+-------+-----------+ | -U | jack | apple | | +U | jack | banana | | -D | john | pineapple | | +I | sarah | orange | +---------+-------+-----------+
Now we’re getting the full-delta change query behavior. Compactions naturally create min-deltas of the changes that occur since the last compaction but when you compact after every write, it translates to full-delta.
If we incrementally query the audit log two snapshots at a time (corresponding to each APPEND then COMPACT snapshot), then we get:
Flink SQL> SELECT * FROM fav_fruit_lookup$audit_log /*+ OPTIONS('incremental-between' = '0,2', 'incremental-between-scan-mode' = 'c hangelog') */; +---------+-------+-----------+ | rowkind | name | fruit | +---------+-------+-----------+ | +I | jack | apple | | +I | john | pineapple | | +I | sarah | orange | +---------+-------+-----------+ 3 rows in set (1.25 seconds) Flink SQL> SELECT * FROM fav_fruit_lookup$audit_log /*+ OPTIONS('incremental-between' = '2,4', 'incremental-between-scan-mode' = 'c hangelog') */; +---------+------+--------+ | rowkind | name | fruit | +---------+------+--------+ | -U | jack | apple | | +U | jack | banana | +---------+------+--------+ 2 rows in set (1.00 seconds) Flink SQL> SELECT * FROM fav_fruit_lookup$audit_log /*+ OPTIONS('incremental-between' = '4,6', 'incremental-between-scan-mode' = 'c hangelog') */; +---------+------+-----------+ | rowkind | name | fruit | +---------+------+-----------+ | -D | john | pineapple | +---------+------+-----------+ 1 row in set (0.96 seconds)
If we had used the full-compaction mode and run a full compaction after every write, then we would have seen exactly the same results (I’ve tested it). But running a full compaction after every commit is pretty crazy. That is why the ‘lookup’ mode exists.
min/full-delta summary
Do you need the before image of updated rows?
If you don’t, then you can perform incremental queries that should read from level 0 data files, returning the rows as they are stored. That is pretty efficient, straightforward, and predictable.
If you do need the before image, then you need CDC file materialization. Now things get a little less straightforward and less predictable. However, I think understanding how compaction produces the CDC files helps a lot here.
At the time of writing, Paimon doesn’t give you the ability to choose min or full-delta at query time. It’s all about compaction-time and what CDC files get written. Each compaction materializes a min-delta of the changes since the last compaction. If you run compaction after every commit, then you materialize full-delta CDC files. Otherwise, you get a min-delta that corresponds to the size/frequency of the compactions. If you run one compaction per day, then you get min-delta scoped to each day. If you run compaction every 1 minute, then you get min-delta scoped to the minute level.
If you need full-delta behavior, because you need every single change event, then you must choose the lookup changelog producer mode (or run a full compaction after every commit). With the upgrade optimization, some compactions will only involve writing the CDC file and upgrading a data file to a higher level. Others may use a mix of rewriting and upgrades.
If you want min-delta, because you only need a minimized change set for efficiency, then you can choose full-compactions at an interval that works best for the workload. The larger, less frequent full compactions produce the least CDC data, but larger compactions require more resources to run.
The final note in this CDC summary is the lack of the insert-only/delete-only optimization that is used in Delta Lake and Hudi. Allowing CDC readers to read from regular data files when the contents of those files only include inserts or deletes.
Conclusions
Current support for Apache Paimon change queries:
Latest state:
append-only: No support (but could easily be added).
upsert: Yes, works very well and efficiently. Can choose whether to honor deletes or not.
CDC
min-delta: Yes. Use dedicated compaction with ‘changelog-producer’ = ‘full-compaction’ and run compaction jobs on an interval.
full-delta: Yes. Let the writer do compactions after every write, with ‘changelog-producer’ = ‘lookup’.
Regarding CDC change queries, the strength of Paimon is that it stores the RowKind as a column in each row. If you want CDC change queries, but without the before-image of updated rows, then Paimon can just return incremental results without any additional work. This is pretty nice.
If you do need the before-images, then Paimon doesn’t have an edge over the other formats, in my opinion, as it must resort to materializing CDC files. You have to choose between ‘lookup’ and ‘full-compaction’ changelog-producer modes, which then influences the min vs full delta behavior.
So far we’ve seen that Delta Lake and Hudi only support full-delta. Paimon supports both, by choosing the right changelog-producer mode. Iceberg supports both, but it is really just Apache Spark doing the heavy lifting (in v2 at least). Delta Lake only writes full-delta CDC files, whereas Hudi and Paimon can write min or full delta to CDC files. I would like to see Flink and Spark support min-delta change queries for Delta, Hudi, and Paimon, even when the materialized CDC files are full-delta.
As a reminder, this post has not been about CDC ingestion, but about CDC generation within the table itself. Paimon is also good at CDC ingestion, as it stores data in a natively CDC compatible way (with RowKind). It’s worth noting that Hudi also recently got this ability to include the equivalent of RowKind in the row metadata.
That’s the last deep dive into table format change query support. It’s been an interesting bit of research for me, and as always shows that there is no silver bullet design out there. All these projects are balancing trade-offs to suit the goals of the communities that back them.
If you made it all the way without skipping, then well done! You now know a lot more about Paimon internals and change query behavior!
Thanks for reading.