Table format comparisons - Append-only tables and incremental reads

This post is about how the table formats support append-only tables and incremental reads. Streaming is becoming more and more important in the data analytics stack and the table formats all have some degree of support for streaming. One of the pillars of a streaming workload based on table formats is the append-only table. There are other pillars, such as changelog streams, and I’ll cover those in another post.

Incremental reads allow compute engines to perform repeated queries that return new records or changes to records that have occurred since the last query was executed. Basically, a table client polls the table on an interval, receiving the latest data on each occasion. Much like a Kafka consumer, albeit with a lot more end-to-end latency.

There are a number of types of incremental reads:

  • Append-only tables: Reading from an append-only table incrementally is conceptually simple, and this is the focus of this post.

  • Mutable tables, latest-state: Incrementally reading the latest version of each row. Covered in a future post.

  • Mutable tables, changelog streams (CDC): Transform all changes made to a table into a stream of change events with a before and after view of each modified row. This stream itself acts like an append-only table and can be easily read incrementally. Covered in a future post.

  • Streaming versions of batch reads. A streaming read will return the same results as a batch read, the only difference is that the query returns a stream of records that acts like an iterator, instead of a batch of results. I’m not covering this type of read here. 

Append-only tables are physically ordered by the order of commits to the log of deltas/snapshots. An open question for me is whether there is ordering among multiple files of the same commit. I haven’t tested ordering of append-only tables using the different compute engines, but we’ll look at the design of each table format and see what kind of ordered streams and even partially ordered streams (Kafka-style) they should be capable of. It’s theoretical at this point, as I haven’t tested these formats to see, but it’s interesting to speculate what is possible based on their architectures.

Note: I recommend reading the prior posts in this series, as I’m building things up piece by piece. I will assume you have understood the prior posts to fully comprehend this one.

Prior posts:

* How do the table formats represent the canonical set of files?

Consistency model posts, part 1 of each goes over some of the core internals:

* Apache Iceberg

* Delta Lake

* Apache Hudi

* Apache Paimon

Append-only tables

Append-only tables are one of the building blocks of streaming in the table format space. Append operations typically commit one or more new data files and do no reads or deletes. A compute engine only needs to read these added data files to know the row-level additions to the table. All the table formats provide APIs for sequentially scanning for added data files that were written by regular write operations. Other operations such as compactions are ignored as they only optimize the physical storage of data and make no logical changes.

Delta Lake and Apache Hudi use the log of deltas approach, where each delta (of a write operation) includes the added data files. Apache Iceberg and Apache Paimon use the log of snapshots method, where each snapshot contains the information about which files were added. Compute engines can incrementally read the log of deltas or snapshots and discover the files that were added in each commit.

Each of the compute engines has some support for incrementally reading append-only tables. I’ll use Apache Flink as an example. It uses the table format APIs to generate a stream of “splits” that downstream tasks consume for processing. A split is a unit of reading in Flink, usually a data file but could be a group of files or just a portion of a single large file.

Fig 1. A simplified view of a Flink topology that incrementally reads from a table.

This split enumeration becomes relevant when we look at using the table formats as ordered streams.

Apache Iceberg

Apache Iceberg has many operation types, one of which is the Append operation (Table.newAppend())), which can only add data files. An append-only table is one that is only written to via Append operations. Reading incremental data from an append-only table is done by reading forwards in the snapshot log since the last read and finding the manifest entries with the ADDED status.

Fig 2. A compute engine incrementally reads new data files by reading the next snapshot in the log of snapshots and searched for manifest entries with the ADDED status.

The above figure is grossly simplified. If you want a more in-depth description of Iceberg metadata, I recommend you read part 1 of my consistency model post on Iceberg.

Multi-writer support

Append operations cannot have data conflicts with other append operations - Iceberg treats an append-only table like a stream. Therefore there are no data consistency checks that can cause writers to abort due to conflicting writes by other writers. Metadata commit conflicts can still occur but they only require a retry of the metadata files commit process (which is much cheaper than retrying an entire write operation). This means Iceberg can support large writer topologies for high throughput append-only tables without data conflicts.

Using a table as an ordered stream

Theoretically, Iceberg can model a totally ordered stream in single writer-reader topologies. The question of ordering across multiple added files of the same commit still needs answering though. I would expect that in the vast majority of cases, writers only add one data file, per partition, per commit.

In multiple writer/reader topologies, Iceberg can model a partitioned stream with the help of the Iceberg bucket partitioning transform. This transform determines the partition of each row based on a table column and a bucket count: bucket(count, col). Under the hood it performs hash-based partitioning on the specified column. This is similar to how partitioning works in Kafka.

Fig 3. Partitioning data with the bucket(n, col) transform.

Spark and Flink can shuffle data to writer tasks by Iceberg partition, which results in single writer partitions and therefore partial ordering by partition spec. In Flink, to ensure ordering guarantees, there should be fewer writers than partitions; otherwise, rows are round-robined across writers of the same partition.

Spark has a partition-aware scan mode that provides the same per-partition ordering guarantees to reads. Basically, it allows data files of a given partition to be read in the same order they were written. 

Flink doesn’t currently support reader topologies aligned to partitions, so in the case of multiple reader tasks, the split enumerator assigns splits to readers without regard for partitions. To process splits in partition order, data would need to be shuffled again by partition by the user. This seems like a missing feature of split enumeration in Flink right now.

Code reading

Table.newIncrementalAppendScan() of the API module, allows a compute engine to specify a starting point in the snapshot log. The planFiles() and planTasks() methods of the IncrementalAppendScan return the data files that were added as part of any APPEND operation of any snapshots after the starting point. Flink uses this to tail the log of snapshots for added files in append-only tables in the FlinkSplitPlanner class of its Iceberg adapter module (used in turn by the ContinuousSplitPlannerImpl).

Delta Lake

An append-only table in Delta Lake is one that is only written to by insert commands that add data files and do not read existing files nor delete any files. In the code, these are referred to as blind appends. Reading incremental data from an append-only table is done by reading forwards in the delta log since the last read and finding the AddFile actions.

Fig 4. A compute engine incrementally reads new data files by reading the log of deltas searching for AddFiles actions.

Multiple writer support

Delta Lake also treats an append-only table like a stream. Delta Lake skips any data consistency checks when there are no file deletes and no reads associated with the transaction. Because there are no data consistency checks, operations do not abort due to conflicting writes by multiple writers. As with Iceberg only metadata commit conflicts must be retried. It’s worth pointing out that Iceberg and Delta don’t support primary key tables, which is the reason they don’t consider it possible for two inserts to conflict.

Using a table as an ordered stream

A Delta Lake table with a single writer/reader topology should act as an ordered stream. However, as far as I can tell, Delta doesn’t have good building blocks for creating a partially ordered partitioned stream. Partitions are based on columns like Hive, rather than transforms, as is the case with Iceberg, which makes a Kafka-style partitioning scheme more difficult (but not impossible). However, I haven’t seen any partition-aware scanning by Spark or Flink for multiple readers. So I think ordered streams need to stick with single writer/reader topologies (which probably have enough bandwidth for most people).

Code reading

DeltaLog.getChanges(long startVersion, boolean failOnDataLoss) of the Standalone module is one example of a DeltaLog implementation. This method returns a list of actions since the startVersion. Actions include AddFile and RemoveFile actions. This can be used by a compute engine to tail the delta log. Flink uses this for tailing the log for AddFile actions on a table, in the TableMonitor class of its Delta adapter module, which is ultimately used by the ContinuousDeltaSourceSplitEnumerator

Apache Hudi

Apache Hudi is a slightly different case as it shards a table’s data across a number of file groups per partition. An append-only table in Hudi will either be a COW or a MOR table, though as we’ll see, a MOR table is much more efficient for this type of table.

For MOR tables, the first data of a file group is written as a base file. Subsequent data files are added as log files over the top of the base file and get periodically compacted as base files by compaction jobs. Once the compute engine has read the base file of a file group, it only needs to keep reading the latest log files (as long as it keeps up with the timeline archiver).

Fig 5. A compute engine incrementally reads new data log files by reading the timeline of a MOR table.

With COW tables, appends can end up rewriting existing base files as new base files. This makes reading the latest data more difficult as a compute engine would need to compute a diff between the newer base file and the older base file to figure out which rows got appended.

Hudi tables have primary keys, either explicitly defined by a user or automatically generated by Hudi itself. Primary keys, whether specified or auto-generated, also include the partition columns (if any). Primary keys are mapped to buckets which themselves map to file groups. This is relevant to how Spark and Flink use Hudi as we’ll see next.

Multi-writer support

At the time of writing, concurrency control with multiple writers is based on optimistic concurrency control, and because of the way that Hudi shards data over a fixed or dynamic set of file groups, even append-only tables can experience data conflicts between multiple writers. The most obvious conflict is in COW tables where two concurrent writers could attempt the same “read the current base file and rewrite it with new records” operation. Inserts into MOR tables should only add log files and therefore not conflict, however, the same data conflict checks raise an error if any concurrent write has written to the same file group in a MOR table.

This isn’t necessarily an issue for a Hudi append-only table. For one, Flink shuffles data by primary key and then by bucket id (which translates to file groups). Therefore each file group ends up being single-writer anyway, which avoids this issue. For other scenarios where single-writer file groups are not supported, Hudi is introducing a new concurrency control that append-only tables should benefit from: Non-blocking concurrency control. Non-blocking concurrency control is experimental at the time of writing and I haven’t had a chance to dig into it, but it promises to relieve the issue of conflicting writers. I will do a write-up on that concurrency control when it becomes generally available.

Using a table as an ordered stream

On the write side, Flink shuffles data by primary key (which in an append-only table will be auto-generated by Hudi), and then by bucket (mapping to file group). If there are multiple file groups per partition, then there can be multiple log files per commit which makes FIFO ordering at the partition level unclear to me (can the reader process the log files within the same commit and same partition in the same order they were written?).

On the read side, neither Spark nor Flink have support for enumerating splits with partition awareness. So ordered streams seem limited to single writer/reader topologies (which is probably fine for most people).

Code reading

Hudi has the IncrementalQueryAnalyzer of the hudi-common module, that can be combined with the FileIndex to return file slices (base files and any related log files) that were added between a start and end timestamp. Flink uses these to tail the Hudi timeline for added file slices in the IncrementalInputSplits class of its Hudi adapter module.

Apache Paimon

Apache Paimon is a merge-on-read design where data is written with row-level information denoting the row operation (called RowKind in Paimon):

  • Insert (+I)

  • Update-before (-U), for CDC use cases.

  • Update-after (+U), for updates and CDC.

  • Delete (-D).

Instead of encoding row-level additions and deletions using file-level operations (add/delete) and delete files (as the rest do), Paimon writers just add new data files with a row kind for each row. This row-level design is unique to Paimon among the four table formats covered.

Paimon tables are either primary key tables or append-only tables. The main differences between PK and append-only tables are:

  • Append-only tables will only have +I row kind and have no primary key.

  • Reads of PK tables must perform key-based merges to ensure no duplicate rows are returned. Each returned row represents the latest state of the given key. Append-only tables have no keys and, therefore, no key-based merging.

Incremental reads from an append-only table tail the log of snapshots to learn of the added files. Each Paimon snapshot contains a delta manifest list that contains the added data files of the snapshot.

Fig 6. A compute engine incrementally reads new data files by reading the next snapshot in the log of snapshots and searching the delta manifest list for manifests with entries with the ADDED status.

For primary key tables, Paimon shards data across multiple buckets per table partition, where each bucket is an LSM tree.

Fig 7. Each partition is sharded by a number buckets.

Append-only tables work a bit differently, without multiple levels but can still be bucket based. This sharding within table partitions is leveraged by Flink to provide greater parallelism while avoiding data conflicts between writers. Just as Flink does with file groups in Hudi.

Using a table as an ordered stream

Paimon has two types of append-only tables:

  • Append tables: These tables do not use buckets and there is no key-based data shuffling to writers. This means that this table type does not provide ordering guarantees. Reads are best-effort in the order that files were committed.

  • Append queues: These act as partitioned streams, with data sharded across multiple buckets per table partition. Rows are mapped to buckets based on a bucket key (a table column). Flink shuffles data to writers such that each bucket has one writer. This provides the same partial ordering as a Kafka topic, with ordering at the bucket level. Each bucket can be read by a single reader to maintain the ordering end-to-end. This is quite nice as it doesn’t rely on using a partitioning strategy, you pretty much get it for free.

Fig 8. Append Queues. Rows are mapped to buckets by a bucket key (one of the table columns) and data is shuffled to writers by bucket. This produces a similar topology to Iceberg when its bucket partition transform is used, however, Paimon is able to separate partitioning from this bucket based sharding.

Multi-writer support

Paimon performs no data conflict checks as Flink topologies align writer tasks to buckets such that each bucket has only one writer - thus avoiding the possibility of data conflicts in the same bucket. In the case of Append Tables that do not use buckets, multiple writers cannot conflict with each other as append tables have no primary keys so inserts don’t conflict with other inserts.

Code reading

Has the StreamDataTableScan interface (DataTableStreamScan implementation) in the core module that allows a compute engine to repeatedly call the plan() method which returns the added data files of the next snapshot in the log. In the Flink module, its ContinuousFileSplitEnumerator uses this to generate a stream of splits.

Some musings on performance

One note on performance, and these are just musings really. For Iceberg and Delta, that just keep writing new parquet files, there seems little difference at the data layer that could impact performance. If there is a difference then it seems it must be at the metadata layer. I don’t know whether Delta’s use of checkpoints in Parquet format make a difference to performance, it’s something I want to look at.

The other thing to note about performance is that avoiding costly data conflicts seems critical and we’ve seen that Iceberg and Delta have no such conflicts in append-only tables, and Hudi/Paimon avoid conflicts (in Flink at least) by shuffling data to writers by file group/bucket. So all four can be deployed with multiple writers while avoiding data conflicts.

Another musing is that Delta and Hudi advertise their streaming capabilities and are based on the log of deltas approach. However, Paimon, which was designed specifically around streaming, uses the log of snapshots approach like Iceberg. I suspect that whether metadata is stored as a log of snapshots or a log of deltas, it may not be critical to streaming performance. I am really really curious to run some benchmarks one day.

Conclusions

All of the table formats support append-only tables and incremental reads, which is one of the core pillars of streaming using table formats. I can’t say whether they all perform similarly as I haven’t run those tests, but at least from a logical level, they all can do it.

Despite the different designs of append-only tables, the way incremental reads work is remarkably similar. All rely on reading the log of deltas/snapshots and finding the data files that were added. It’s pretty simple.

In terms of writing to append-only tables, Iceberg and Delta are perhaps the most similar, in that append operations add new data files and don’t perform data conflict checks. Their metadata is quite different, but if you squint they generalize to the same thing (logically).

Hudi is a bit different due to its file groups with base and log files, but MOR tables are relatively close to the Iceberg/Delta approach of only adding data (log) files. The biggest difference regarding Hudi is the possibility of data conflicts between multiple writers with the existing optimistic concurrent check. This is avoided by Flink as it shuffles data to writers based on buckets (file groups) which avoids inter-writer conflicts. Also, the new non-blocking concurrency control that is in the works will help non-Flink users who want multiple writer topologies.

Finally, Paimon provides ordered and unordered streams with Append Queues and Append Tables table types. The former provides partial ordering like a Kafka topic. Something similar is possible with Iceberg, due to its bucket partition transform, but Paimon gives us this option out of the box.

While I’m encouraged at the streaming support I’ve seen so far, I don’t think we can come to judgments about streaming and table formats until we’ve looked at incrementally reading from mutable tables. These incremental reads either return a latest-state view of changed rows or a full changelog stream with before and after state. That is the subject of the next post.

ps. Writing about multiple table formats at the same time is pretty hard, if you see any mistakes or omissions then please let me know.