Table format comparisons - Streaming ingest of row-level operations

In the previous post, I covered append-only tables, a common table type in analytics used often for ingesting data into a data lake or modeling streams between stream processor jobs. I had promised to cover native support for changelog streams, aka change data capture (CDC), but before I do so, I think we should first look at how the table formats support the ingestion of data with row-level operations (insert, update, delete) rather than query-level operations that are commonly used in SQL batch commands. 

Note: I recommend reading the prior posts in this series, as I’m building things up piece by piece. I will assume you have understood many of the prior posts to fully comprehend this one.

Prior posts:

* How do the table formats represent the canonical set of files?

* Append-only tables and incremental reads.

Consistency model posts, part 1 of each goes over some of the core internals:

* Apache Iceberg

* Delta Lake

* Apache Hudi

* Apache Paimon

Batch vs streaming

The first thing to address with a discussion of streaming ingest is to realize that, ultimately, everything is still done in batches. While a compute engine may provide streaming semantics, these engines are not writing data to Parquet files one row, or even a handful of rows at a time. Many tiny files are extremely inefficient for object storage-based tables, which are optimized for large columnar data files, so even streaming ingest is done in batches.

However, semantics do play an important role in this discussion. We’re all probably quite used to batch-based queries that mutate a table’s data. These are the INSERT INTO, UPDATE, DELETE FROM, and MERGE statements. The first three are most commonly known and apply a single operation type (insert/update/delete) to all matching rows. MERGE is different as it applies a mix of insert/update/delete at an individual row level based on how the source and target dataset compare.

If a user has a stream of events where each event is tagged as an insert, update, or delete, the INSERT INTO, UPDATE…SET…WHERE and DELETE FROM… WHERE SQL commands simply don’t fit well. They apply a query-level operation, but we need row-level operations for this workload.

Uses cases for streaming row-level operation ingest

Append-only tables treat every row as an insert, but if you want to materialize an incoming stream of row-level operations as a data lake table based on a primary key, then simply appending data doesn’t work. One obvious use case is that of ingesting a CDC stream from something like Debezium. Each change event in such a CDC stream includes the operation type (Insert, Update, Delete) and possibly a before and after image of the row. A user may simply want to maintain a mirror of the source table by materializing the change events as a data lake table. Other use cases are stream processing jobs that maintain a materialized view by emitting a stream of row-level operations.

Table format support

The design of each table format determines how well they support this workload. We can categorize the four table formats into two categories:

  • Native primary key support, storage is based on an LSM or LSM-like design.

    • Apache Hudi

    • Apache Paimon

  • No native primary key support, storage not LSM-based.

    • Apache Iceberg

    • Delta Lake

Let’s start with the table formats of the first category.

Apache Hudi

As described in my prior posts, Hudi shards data across a set of file groups and consistently maps each primary key to a file group. Data of a given file group is stored in a base file, and when configured to use merge-on-read, deltas are written as log files on top. Log files are either data or delete files. Compaction involves merging the current base file and any log files into a new base file and prevents the number of log files from growing too numerous. In this post, I’ll focus on merge-on-read as copy-on-write is just a bad idea for any streaming ingest workload (due to excessive write amplification).

Fig 1. Depicts two file slices of a single file group in a MOR table. A compaction job compacts a file slice with three log files into a new base file.

Hudi readers that want the current state read from the latest file slice per file group. This involves reading the base file and any log files of the file slice and merging them to return a consistent result. While not a classic LSM tree, a file group shares some aspects of one - namely that a writer can commit row-level operations, such as upserts and deletes, in a layered file model, and the reader must merge the results based on the table version they wish to consult.

Hudi supports row-level operations with its UPSERT operation type. Using the UPSERT operation type, a compute engine submits a batch of rows, where each row is marked as either insert, update, or delete, and all are treated as upsert operations (treating a delete as a kind of update). 

Insert and update rows get written to new data log files. Delete rows get written to a delete log file, or I should say only the primary key does. Hudi delete files contain a set of keys that have been deleted in that file group in that table version. No reads of existing data files are needed for these deletes (as opposed to positional deletes of other table formats, which need to know the file and position of the row to be invalidated). 

We can visualize all this with an example of a set of three writes with row-level operations, the expected table scans based on each table version, and how that translates to files in a file group (imagining a table of a single file group).

Fig 2. How batches of row-level operations translate to files in a file group.

Hudi also supports writing tombstones instead of using delete log files. When this is configured, the deleted row is treated as an update but is written with all columns nulled out (except for the primary key).

Apache Paimon

Apache Paimon shares some similarities with Apache Hudi. It also models primary keys in its internal model and shards data by key. Hudi maps primary keys to file groups (within partitions) and Paimon maps bucket keys to buckets (within partitions).

Paimon is a merge-on-read design. Readers must merge the rows of data files to form a consistent result. This merge process depends on the sequence number of the row and the RowKind of the row.

Paimon stores each row with its RowKind. RowKind values are:

  • +I: Insert

  • +U: Update (after image)  - the new row state of the updated row.

  • -U: Update before image - the previous row state (used in CDC).

  • -D: Delete

Sequence numbers are monotonically increasing integers maintained by the bucket writer. Readers apply the row-level operations in sequence number order to return the final row of each primary key.

Each bucket is an LSM tree made up of sorted runs divided into different levels. Paimon writers write rows to level 0, with the RowKind and sequence number. Compactors read the small level 0 data files and compact them into larger data files in higher levels. Writers only write to level 0 but compactors read from all levels and compact to higher levels.

If we take the same example from the Hudi section, we can visualize all this in Paimon (imagining a single bucket table):

Fig 3. Paimon natively stores data as row-level operations.

The whole data model of Paimon is oriented around row-level operations, which makes it an ideal choice for this particular workload of streaming row-level operation ingestion. It’s hard to beat Paimon in terms of write efficiency for this workload.

Delta Lake and Apache Iceberg

I’m going to cover Delta Lake and Iceberg together as they are remarkably similar when it comes to ingestion of row-level operations.

Unlike Hudi and Paimon, Delta Lake and Iceberg do not include primary keys in their internal model. Iceberg has identifier fields, which act more like a hint to compute engines to treat a column, or set of columns as a primary key, but Iceberg itself doesn’t enforce them. It’s up to the compute engines to enforce primary key constraints. This difference means that compute engines cannot submit a set of rows with row-level operations and let Delta/Iceberg figure out how to make the necessary file changes. Instead, the compute engine needs to write data and delete files such that they respect the uniqueness of primary keys. 

Using the same example again, the figure below can apply to Delta and Iceberg. They use different metadata, but essentially the data and delete files are the same.

Fig 4. The data and delete files written with Iceberg/Delta Lake.

In the above case, data and delete files are written as follows:

  1. Commit 1: write data-1 with the three inserted rows.

  2. Commit 2: write data-2 with the new row for PK ‘Jack’ and delete-1 which invalidates the old ‘Jack’ row in data-1.

  3. Commit 3: write data-3 with the inserted and updated rows, and delete-2 invalidating the ‘Jack’ row of data-2.

By the end, in table version 3, all the above files are members of the current snapshot, and a reader must merge the files on reading.

The question is, how do the compute engines know to write these files? How do they know the current files and positions of rows in order to write delete files? Hudi and Paimon take care of this aspect natively, but with Iceberg and Delta it's down to the compute engine. There are a few options for how to do this, and I’ll cover two common ways.

Option 1: Using MERGE to apply row-level operations on write

SQL insert, update, and delete statements are query-level operations; they all apply the same operation to all matching rows. The merge statement performs row-level operations depending on a row-level comparison of a source and target dataset. 

For example, a typical MERGE I might have written in an OLTP database back when I worked as a consultant might look like this:

MERGE INTO FavFruit tgt
USING my_source src
ON tgt.name = src.name
WHEN MATCHED UPDATE
WHEN NOT MATCHED BY src THEN DELETE
WHEN NOT MATCHED BY tgt THEN
    INSERT (name, fruit) 
    VALUES(src.name, src.fruit)

This takes a source table and target table, compares them row by row based on the name column, and makes the necessary row-level changes to sync the target with the source. The source may not actually be a table but a batch of rows in memory. 

Merges can be extremely flexible and used for different use cases, but the general gist is that:

  • you tell the query engine how to perform a full join between the two datasets.

  • the row-level operations to apply to the target table based on the join.

There are some different options for how to apply a merge and it depends on the current support of the compute engine. Perhaps the most common way right now for data lakes is to land data in an append-only table, and perform periodic micro-batch-based merges using the landing table as the source. Alternatively, if the compute engine supports it, it can simply use a batch of records to be ingested that it already has in memory as the source and perform the merge with the target table directly. The latter method is more efficient as raw row-level data doesn’t have to be written to a table before merging. It also avoids issues such as multiple rows of the same primary key existing in the source table, which will cause a SQL MERGE to fail.

A merge statement based on an in-memory batch as the source dataset with row-level operations might look like this:

MERGE INTO FavFruit tgt
USING my_batch src
ON tgt.name = src.name
WHEN MATCHED AND src.op = ‘-D’ THEN DELETE
WHEN MATCHED AND src.op = ‘+U’ THEN UPDATE
WHEN NOT MATCHED AND src.op = ‘+I’ THEN 
    INSERT (name, fruit) 
    VALUES(src.name, src.fruit)

Spark Structured Streaming currently only supports merging an append-only table as the source, but support for merging an in-memory batch should be coming at some point. Merging from an append-only table as source adds the complication of limiting the source to a window of data that covers the current micro-batch, which also avoids any duplicate rows in the source. Ryan Blue of Iceberg wrote about this.

What MERGE does under the hood

Let’s use the example of a compute engine merging an in-memory batch with a target Iceberg or Delta table. First, the compute engine must perform a table scan based on the latest snapshot and essentially perform a full join operation between the source and target datasets. Here the source should be small and the target could be very large. There should be a number of possible optimizations, such as data file pruning based on partitions and data file column statistics, but it depends on the source dataset. Comput engines employ several join strategies, which is out of scope for this post.

Based on the results of this join, a number of data files in the target table will contain rows that must be updated or deleted, and a certain number of new data files must be written for the inserted rows. The updates and deletes can be scattered across a number of data files but only touch a few rows per file, so using a merge-on-read table mode is highly recommended (to avoid having to rewrite entire data files based on relatively few row changes per file). From this scan, the compute engine knows the data files and positions in those files that need to be invalidated with delete files, and knows the new values to be written as new data files.

If we compare the “micro-batch merge” of Iceberg and Delta, vs the “micro-batch row-level ingest” of Hudi and Paimon, we see that on the write side of things, there isn’t much difference, at least if we’re talking about merge-on-read tables. Where the “micro-batch merge” approach loses out is that to perform writes, reads must be performed. The read side can be expensive, and where I would judge the inefficiencies are found with this micro-batch merge model.

Option 2: Upserts using index lookups

Since Iceberg and Delta don’t use an LSM-like storage approach nor model primary keys natively, they can’t do native upserts. However, the compute engine can store some limited state, such as an index that maps a primary key to its current data file and position. Flink shuffles data by a key to Iceberg sink tasks and maintains local indexes per task that store the file and position of each key that it writes. Every time an update or delete row is received, it can look up the current data file and position from the index and add an entry to a positional delete file to invalidate that version of the row. This avoids the costly table scan and join of a merge.

The question is, how to populate this cached index when a streaming job starts? 

Here we have some options:

  1. Store the index in persistent job state, such as Flink state. This could end up expensive if it's a large index.

  2. Perform a table scan when the job starts to populate the index. This could make jobs slow to start.

  3. Store the index as another table on object storage (much like Hudi does internally). That acts as the backing store, but uses RocksDB locally for fast lookup cache.

  4. If you’re using Iceberg, don’t prepopulate the index, but just fill it up on the fly. This results in a partial index based on data seen since the streaming job started. To cope with only having a partial index, use a mix of positional and equality deletes. Let’s look at how equality deletes can be used with Iceberg.

Leveraging Iceberg’s equality deletes. When Flink doesn’t know if a primary key exists in an Iceberg table (due to an incomplete local index), it issues an equality delete and adds the data file and position of the new row (in the case of an update) to the index so that next time it can issue a positional delete. Iceberg readers use both equality deletes and positional deletes to skip old row versions. The main drawback of this approach is that equality deletes can slow down reads significantly, so to mitigate this, aggressive compaction is recommended to keep the number of equality deletes to a small number.

This upsert approach avoids the costly reads of the MERGE technique but comes with the trade-offs of managing extra state in the compute engine and/or relying on equality deletes. There’s no free lunch!

Summary

Streaming ingest of row-level operations doesn’t exist with the table formats; it’s all really just micro-batching when it comes to working with Parquet and object storage. But even so, Hudi and Paimon have the edge regarding this workload. The LSM-like storage architecture with native primary key support really helps here.

But don’t discount Delta and Iceberg; while their native support is not as strong, the compute engines can largely compensate for this shortfall (if not now, then eventually).

Iceberg and Delta place the burden of performing row-level operations on the compute engine, either through something like a SQL MERGE or, preferably in my opinion, using the upsert via index lookup approach. Instead of the table format being responsible for the primary key mapping index, the compute engine must do it. This comes at some expense to the compute engine, as more code must be written and maintained, but it doesn’t have to translate to poorer performance.

Indexing is an area that will evolve a lot over the coming months and years, there’s room for some kind of cross-compute engine indexing standards to emerge, which include indexes for allowing compute engines to avoid costly reads when performing row-level operations.

Next

Next up is a look at native CDC support in the four table formats, as previously promised.

Also I will write more about clustering and partitioning soon, including how that is leveraged to avoid conflicts between concurrent writers. I wrote about concurrent writer conflicts in the append-only table post but things change for mutable tables. No (theoretical) discussion on performance could be complete without also taking into account concurrent writer support.