Understanding Apache Paimon's Consistency Model Part 1 — Jack Vanlightly

Understanding Apache Paimon's Consistency Model Part 1

Apache Paimon is an open-source table format that has come after the more established Apache Iceberg, Delta Lake and Apache Hudi projects. It was born in the Apache Flink project where it was known as Flink Table Store, but has since spun out as a top-level Apache project. When I first started digging into Paimon I remarked that if Iceberg, Delta and Hudi had a baby, it might be Paimon. But Paimon has a number of its own innovations that set it apart from the Big Three table formats. 

Paimon has a lot of features and it would be hopeless to include them all, so I’m going to try to focus on the core mechanics. As I have done with my previous posts on the consistency models of various table formats, this post is focused on the logical building blocks rather than getting too into the implementation details. In this post, I’ll build the mental model at a high level and then go over the consistency model, just as I did with Apache Hudi and Delta Lake previously.

The logical model of Paimon

Paimon has the concept of a catalog, databases, and tables. For this analysis, we’re only going to focus on primary key tables, though append-only tables also exist, as do change logs.

Paimon splits up a table into two layers:

  • The metadata layer: tells a compute engine things such as the schema of the table, which data files exist, statistics about the data in those files, and so on.

  • The data layer: a set of data and index files that constitute the actual data of the table. This layer is organized into one or more LSM trees. The thing that sets Paimon apart most from Iceberg, Hudi, and Delta is this organization of data as a set of LSM Trees.

The metadata layer

Paimon organizes the metadata layer into a set of files that form one tree per point in time. That is, each write operation writes a set of files that include:

  • a tree root (a snapshot file).

  • two manifest-list files (base and delta).

  • an index manifest file.

  • one or more manifest files.

  • one or more data files.

Fig 1. A tree of metadata and data files forming one point in time (one version) of a Paimon table.

Snapshot files are written to the snapshots directory and are suffixed with a monotonically increasing integer (the table version), such that there are no gaps in the numbering. This forms a log of snapshot files whose order is the lexicographical ordering of the file names.

While this log appears to resemble the Delta Log or Hudi Timeline, it is not the same at all. It is more akin to the snapshot list in an Apache Iceberg metadata file. In Paimon, just like Iceberg, each snapshot file is a single root of a tree of files at a given point in time. Delta and Hudi obtain the current state of the table by reading the log (from the last checkpoint) and rolling up the entries to form a logical snapshot - there is no single tree root file.

This lexicographical ordering of files in a directory allows a compute engine to know which snapshot file is the latest (though there is also a copy of the latest snapshot named LATEST) without requiring an external catalog like Apache Iceberg does. It also allows for time travel by querying a table at previous points in time.

Each snapshot file has a number of fields, but the important ones regarding the metadata organization are:

  • Version: the table version of this snapshot. 

  • SchemaId: Points to a schema file that contains the set of fields, primary key columns, and partitioning columns.

  • baseManifestList: A manifest-list file that contains the manifest files of the previous snapshot.

  • deltaManifestList: A manifest-list file that contains the new manifest files written in this snapshot.

  • indexManifest: Contains a list of the current index files which includes bloom filters, bucket mapping and deletion vectors (which we’ll cover later).

  • commitKind: The type of commit such as Append, Compact, Overwrite. In this post we’re focusing on:

    • Append commits, which can be appending insert/update/delete row operations.

    • Compact commits, which rewrite data files to form fewer, larger files at higher levels in the LSM trees.

The base manifest list of snapshot S, is the merged base + delta manifest lists of snapshot S-1.

Fig 2. The base manifest list files. In reality, manifest files also get compacted to reduce the number of metadata files, which is not reflected here.

Each manifest file can point to one or more data files and include statistics about those data files that compute engines can leverage to prune data files from queries. The use of two manifest list files makes it trivial for streaming readers to only read the latest changes to data pointed to by the delta manifest list. A standard batch-based query needs to combine the two manifest list files of a snapshot to get a global view of the table (at that version number).

When a new snapshot is created, it merges the base + delta manifest list of the previous snapshot, and also compacts the manifest files into fewer, larger files. This compaction is not the same as the data file compaction, but just house cleaning that is carried out during commits.

The data layer

Any given table is stored as a set of data files such as Parquet or ORC. These data files are organized into one or more partitions, and each partition can be further sub-divided into one or more buckets. Partitioning is defined based on a set of partitioning columns. Buckets can be fixed in number or be dynamically created as data is written.

Partitioning allows for more efficient querying by allowing compute engines to prune whole partitions that are not relevant to a query. Commonly, partitions are time-based, such as the date. A query that filters by date can avoid loading huge numbers of data files simply due to the partitioning.

Fig 3. A table of two partitions (by date) and four buckets per partition.

Within each partition, data is distributed over the buckets according to the bucket key (or primary key in the case of no bucket key) with data ordered in the files by the primary key. Each bucket is organized into its own LSM Tree. With fixed-number buckets, data is routed to a bucket based on a hash of the bucket key (or primary key), and for dynamic buckets, which get created on demand, a mapping of bucket key to bucket is maintained in a global bucket index (just like with Apache Hudi and its file slices).

Partitions and buckets serve different needs. Partitions allow for more efficient queries by allowing compute engines to more effectively prune data files when reading. Buckets on the other hand exist to provide more parallelism to reads and writes. We’ll go into more detail about concurrency, parallelism, and buckets later on.

The data and metadata files are organized into a set of directories (prefixes in object stores without a directory abstraction). Data files are written into sub-directories according to the partition and bucket. Metadata files are written to directories outside of the partition and bucket directories.

LSM tree storage (one LSM tree per bucket)

The generic LSM tree

An LSM tree is typically formed of an in-memory memtable buffer, a write-ahead-log (WAL), and a set of sorted-run files which are labelled with level numbers. Paimon doesn’t use a WAL as it is designed to fallback on Flink’s checkpointing feature. This means it can recover from any failure that causes the loss of in-memory data.

A sorted run is a set of one or more Sorted String Table (SST) files that is sorted internally by a key but whose key-range can overlap with other sorted runs. A write will hit the in-memory memtable buffer and the WAL (if it exists), then asynchronously, the in-memory memtable buffer is flushed as a sorted run. Also asynchronously, compaction jobs merge sorted-runs into fewer, larger sorted files in deeper levels for read efficiency. 

Unlike a B-Tree, the data of a given key might exist in any of the sorted-run files. A read of a given key will hit the in-memory memtable buffer first and if no match is found, then traverse the sorted runs of each level in level order (0-N). Assuming the read wants the latest version of a key, it stops traversing the sorted-runs on the first match (as this will be the most recent). Any given key can exist in multiple sorted-run files (with the older versions of the key existing in older sorted-runs). Over time, sorted-run files are compacted into larger files in higher levels. Older versions of rows can be expired by compaction by not rewriting them into the new larger sorted-run files. An expiry strategy is employed to only keep the last V versions of data, in order to avoid continued disk growth.

Paimon’s LSM tree approach

Paimon takes some of this classic LSM tree approach and it reminds me a lot of ClickHouse, except that ClickHouse does not sub-divide partitions any further. They both share a number of similarities:

  • Data is buffered in-memory and flushed as level 0 sorted-runs, though no write-ahead-log is used.

  • Compaction is time-aligned, that is, only a set of sorted runs that cover a contiguous time-range can be compacted.

  • While the data files may be tagged with levels, these levels do not affect the read path like a normal LSM tree engine such as RocksDB. The read path does not traverse levels (from 0 to N) to pick the latest value of any given key. Instead, all data files that intersect with the queried key range must be read, and a merge strategy must be employed to do things such as deduplication.

  • Different merge engines are supported. Paimon defaults to the deduplicate merge engine (equivalent to ClickHouse’s ReplacingMergeTree), but it also has an aggregating merge engine (equivalent to ClickHouse’s AggregatingMergeTree) and others. Paimon also has the partial-update merge engine which I’ll cover further down.

Fig 6. Time-aligned compaction. Higher levels are further back in time. Time-based expiry can be very efficient as it can mean just deleting entire files as they fall off the end of the retention period.

In Paimon, each sorted run can be formed of multiple sorted data files. Within a sorted run, data is spread across data files with no overlap of key range. However, data across sorted runs can overlap in key-range, so data of a given key can exist in multiple sorted runs within the bucket.

Each data file contains a number of rows and each row has a row kind (denoted by a letter) which tags the row as either an insert (+I), update (+U), or delete (-D). Updates can support update-before (-U) and update-after (+U) for change log producers. I won’t go into changelog producers in this post to keep this as concise as possible but know that Paimon bakes in changelogs into the spec.

Fig 7. A sequence of insert, update and delete operations translated into rows across data files.

On top of deduplicating and aggregating merge engines, Paimon includes a partial-update merge engine. This allows a writer to only include a subset of the columns of a row in an update op. The merge engine can then apply the set of partial updates of each primary key, ordered by the sequence number to form a final result of that row.

Merge-on-read

It is the job of the reader to merge the data of the multiple sorted runs into a final result (one row per primary key for the deduplicating merge engine). In order to correctly merge the data, the reader must know the order in which to merge the results, and this is accomplished by a sequence number. Each writer maintains a counter which it uses to set the sequence number. It is also possible to use a sequence field instead of a sequence number, such as using a timestamp field of the table.

The Paimon writer can do a full compaction immediately after writing a sorted run, or every Nth sorted run. Alternatively, a dedicated compaction job running in a separate thread can do compactions asynchronously. The size of the compacted files depends on the table size, the number of partitions and the number of buckets per partition. The docs recommend buckets of 200MB-1GB which are pretty small considering that other analytics systems can end up with data files of many GB in size. One reason for the relatively small bucket size is that parallelism can be limited based on the number of buckets. We’ll go into this aspect of Paimon in more detail in the concurrency section. But by default, parallelism is limited to one reader/writer per bucket. Having a small number of large buckets could limit the number of reader workers and impact performance.

Deletion vectors

Deletion vectors were introduced to improve the performance of reads while not overly impacting the performance of writes. The idea is to invalidate specific rows in data files that are now stale after a delete or update operation.

Fig 8. The deletion vector file of the bucket has 5 deletion vectors invalidating 5 rows in 2 data files. Level 0 files are never invalidated.

Without deletion vectors, the number of readers is limited by the number of buckets (maximum one reader per bucket). The reason why reads over a single bucket can’t be parallelized without deletion vectors is that a reader must read all the sorted runs of the bucket into memory, and then perform a merge operation based on the row sequence numbers. A reader cannot know if any given row is valid if it has read only a subset of the data files. This also means that predicates cannot be pushed down to the low-level reader level, as the merge logic must see all rows. The entire bucket needs to be read and merged by a single reader applying the merge logic. For any given primary key, the row with the highest sequence number will added to the query result (when using the deduplication merge engine). You can read more about the multiway merge algorithm here.

In reality there are optimizations regarding this merging. Only data files with a key range that overlaps with the query (if it has a key-based predicate) need to be read and merged for example.

Deletion vectors allow the reading of a bucket to be spread across multiple readers by replacing the sequence number-based merge with a filter that is based on positional deletes (like Iceberg, Delta and Hudi). It is also a more efficient merge operation, as it is not a multi-way merge between data files, just one file at a time with data skipping based on the deletion vectors. 

Deletion vector maintenance

Deletion vectors are not updated during a write operation but instead are maintained by compactions. Writers continue to write to level 0 as usual. Because of this, deletion vectors shouldn’t normally add any overhead to a write, the additional cost is paid during compactions. However, there are two cases where it can impact the performance of writes:

  1. The writer performs compactions itself.

  2. The dedicated compaction job runs too slowly and applies back pressure to the writer (as the number of small files exceeds a configured threshold).

Deletion vectors are never generated for level 0; writers write to level 0 and compactions write to higher levels. This means that readers must skip level 0 when deletion vectors are enabled; otherwise, they would need to apply the sequence number merge operation on top of the positional delete filter (losing the benefits of parallel readers per bucket). Waiting for a row to reach level 1 could add some additional latency between a write and a read of a given row update, according to the frequency of compactions.

The deletion vectors are stored in a single DV file per bucket and maintained by copy-on-write operations, i.e. the file is rewritten after any compaction that adds or removes deletion vectors.

When a compaction job runs, it selects a set of levels to compact and performs an operation to rewrite the source files into fewer larger files. When the compactor includes level 0, it must perform a regular sequence number-based merge of the source data files. Stale and deleted rows get removed, and the result gets written as a new set of larger data files at a higher level.

Now for the part where deletion vectors are generated or removed. Primary keys with +U or -D rows in the compacted data files, may also have rows in data files of higher levels (above the compacted levels). These rows, outside of the levels of the compaction, need to be invalidated with deletion vectors.

Fig 9. Before compaction, two valid rows exist for “Jack”, one in level 0 and one in level 3. After the compaction, the row in level 3 has been invalidated by a deletion vector entry in the bucket deletion vector file.

We can go through a more detailed scenario, using the example of multiple users and their points from before:

Fig 10. Deletion vectors are added during compactions when a row in a higher level (than that of the levels being compacted) is invalidated by a row in the compacted levels. Deletion vectors are removed when they point to a file that gets logically deleted. DV files themselves only get deleted when they ,and the other data files of their version, get expired.

The guarantee is that in levels 1 and above, a primary key can have at most, one row unmarked by deletion vectors. Level 0 is uncompacted and has no deletion vectors associated with it.

A reader must load the deletion vectors first and then when it reads the sorted-run data files of level 1 and above, it skips the rows indicated in the deletion vector files. Essentially, the stale rows have been masked. This allows for parallelism when reading from a single bucket, as each reader can read a portion of the data files and use the masking of the deletion files to filter out stale rows. It also means that predicates can be pushed down to lower levels of the reader logic, as the merge logic is not obligated to read all rows of a bucket. The final result can then be combined from the various readers. Just remember that readers skip level 0, so may have to wait for the very latest changes, though compaction should be frequent.

Support for Copy-on-write (COW) and merge-on-read (MOR)

Iceberg, Delta, and Hudi support copy-on-write (COW) and merge-on-read (MOR) tables, it is up to the user to decide which they want.

COW means that any update or delete on a given row means that a new version of the affected data file must be rewritten. For example, 1 GB data file which has one row updated would get completely rewritten. This is inefficient for mutation-heavy workloads as it causes a large amount of write amplification, though it is great for read performance. 

MOR takes a different approach where mutations cause additional small files to be written, and the reader must merge these small files with the larger base file to get a result. For example, when the 1 GB data file has a single row updated, the 1 GB file remains unchanged. If it is a delete, then the row in the 1 GB file gets invalidated by an entry in a deletion file. Iceberg uses the term delete file and Delta uses the term deletion vector like Paimon. An update operation will write the new value to a new data file and the old value will be invalidated in the same way as a delete operation, via a delete file. The reader must merge the original 1 GB file with these new small files. This is great for write performance but adds some overhead to reads (read amplification due to having to read more files).

Paimon is naturally a merge-on-read design as the reader must merge the rows of multiple sorted-runs to return a result. Deletion vectors make this merge process more efficient but it is still classed as a MOR approach. However, COW can be emulated to a certain degree by performing a full-compaction after every write, which will reduce the amount of merging required. So Paimon offers some methods to control the read vs write amplification.

Next

Now we’ve built a mental model of the basic mechanics, in part 2, we’ll dive into the consistency model.

Analysis parts:

Share