Understanding Apache Paimon's Consistency Model Part 2

In part 1, we built a mental model for the basic mechanics of primary key tables in Apache Paimon. Now we’ll dig into the consistency model which allows Paimon to support concurrency and parallelism.

Concurrency and consistency

Paimon concurrency is heavily influenced by Apache Flink, which is no surprise given that Paimon was born as the Flink Table Store. Understanding concurrency and parallelism in Paimon, without taking into account Flink is not so straightforward. I want to highlight that Paimon is not like Apache Iceberg, Delta Lake or Apache Hudi with regard to concurrency. First, we should learn how Paimon is used in Flink, before exploring how other types of compute engine may use it.

Paimon and Flink

The Paimon design imposes different concurrency constraints in the data layer and the metadata layer. We can think of a write operation (or compaction) as being a two-phase commit:

  • Write phase: This involves writing to the data layer, where we find partitions, buckets, data files (forming sorted runs), and some index files (deletion vectors). Writer concurrency is generally limited to one writer per bucket, and reader concurrency depends on whether deletion vectors are used.

  • Commit phase: This involves writing to the metadata layer, which is where we find manifest files, index manifest files, manifest-list files, and snapshot files. Writer concurrency is possible using an optimistic concurrency model. Flink does not parallelize the commit phase and uses a “concurrent write, serial commit” topology.

A typical Flink job topology will shuffle data to sink tasks (Paimon writers) by partition and bucket, to ensure that each bucket only has a single writer. This means there is no concurrency per bucket, but work is parallelized overall by dividing data and work over the buckets. These tasks emit “committables” which are the sets of data and index files created during the write phase. The topology fans-in to a single Committer task which receives the committables and performs the commit phase serially over a linearized sequence of commitables (one committable per operation) from the upstream write phase tasks.

Fig 11. Simplified Flink topology for a Paimon sink.

This topology ensures that regular writes to each LSM tree (bucket) are serial (by a single writer) and writes to the metadata are also serial. This avoids any contention between multiple writers trying to commit snapshots concurrently. This topology uses Flink's capabilities to shuffle data to avoid concurrency at the Paimon level while still offering high amounts of parallelism at the Flink level. 

Given all write and commit operations are serial, data consistency seems to be trivially true. But what about concurrent writes and compactions in the same bucket? Here writes and compactions can be concurrent, within the same bucket.

In the Flink topology, a single bucket will have one regular writer (that does not perform compactions) and one dedicated compactor (running in a separate thread). The regular writer adds new files to level 0 only and the compactor reads existing data files (of all levels) and writes new data files only to higher levels of the LSM tree. Therefore there is no scope for conflict between writers and compactors as they write to different levels. Writers may wish to wait for compaction to finish if the number of small files has grown overly large. Basically, while compactions are asynchronous, they can still apply back pressure to the writer.

But what about non-Flink topologies? Paimon does include some concurrency controls for handling concurrent commits and compactions. Let’s look at these concurrency controls and some example alternative topologies next.

Digging deeper into the Paimon consistency model

The following is an exploration of Paimon’s consistency model regarding things like multiple writers per bucket and having multiple writers doing commits concurrently.

There are a number of invalid topologies of writers and committers where Paimon will produce data anomalies. There are a few places to trip up on:

  • Multiple writers to the same bucket: Sequence numbers are used to determine merge order. Even when deletion vectors are used, the compaction job still merges level 0 data files based on these sequence numbers. Sequence numbers are generated from per-writer counters so having multiple writers write to the same bucket can cause reordering of operations during merges. The counter inside each writer can advance at different rates, causing the merge order to not match the temporal order. A sequence field such as a timestamp can be used in place of sequence numbers, though this would be a best-effort approach (but probably good enough for most people).

  • Multiple compactors over the same bucket: When using deletion vectors, there is one deletion vector file per bucket. When a compaction job needs to modify the deletion vectors, the existing deletion vector file must be modified and then written as a new DV file (copy-on-write). Two concurrent compactions of the same bucket could overwrite each other's changes. Also, two compactions targeting different levels can cause dangling deletion vectors, vectors that point to a data file no longer present in the current table version. Paimon has a conflict check that is relevant here, though there is still room for conflicts. 

  • The global PK-to-bucket index used in dynamic bucket mode must be updated serially. Without controls, concurrent writers could overwrite each other's changes to the bucket mapping index, causing duplicates (two rows of the same primary key get written to two different buckets). For this reason, Apache Hudi requires a lock to protect its global PK->file slice index.

We’ll look at the following non-Flink topologies, the first is ok and the second is not:

  • Topology 1 (streaming or batch): Multiple writers, one per bucket, where each writer does its own commits. This differs from the Flink topology above in that it has multiple committers, possibly trying to commit snapshots concurrently.

  • Topology 2 (microservices): Multiple writers, any writer can write to any bucket, and each writer does its own commits. Secondly, each writer may do synchronous compactions or have a dedicated compaction thread.

Fig 12. Alternative writer and committer topologies.

Topology 1

In this topology, writers cannot conflict in the data layer as each bucket can only be written to by a single writer (and its compactor). However, multiple writers (and compactors) can conflict with each other when writing the metadata. 

A write operation is carried out in 3 to 4 steps. At this high level, the steps for writers and compactors really only change in step 1, where data and deletion vector files are written. 

When deletion vectors are enabled, compaction jobs will need to load the current deletion vector file. A streaming job can cache the deletion vector files in memory on start-up. A batch job that couldn’t cache would need to load the deletion vectors again on each compaction operation in order to modify them. 

Writers need to set their sequence number for writing rows to level 0. Therefore, the max committed sequence number needs to be determined on start-up (or before each operation with batch) to ensure monotonic sequence numbers. This can be obtained from the metadata of the latest snapshot.

The write and commit steps:

  1. Write data files:

    • Writer:

      1. Write the new data files (new sorted runs) to level 0.

      2. Pass the list of data files written to the next step.

    • Compactor:

      1. Read data files of target levels.

      2. Perform a merge.

      3. Write the new data files to the chosen level.

      4. Write a new deletion vector file, if needed, to replace the last. This is a copy-on-write operation that merges the existing deletion vectors with the new/removed vectors.

      5. Pass the list of data and DV files written and deleted to the next step.

  2. Read the latest snapshot, this is used to build a new snapshot, including its version number (which will be the latest version + 1). 

    • Write the new manifest files. 

    • Write a new base manifest list and the delta manifest list.

    • Write the new index manifest with the list of index files. 

    • Write the new snapshot with a temporary name. 

  3. Potentially acquire a lock if PutIfAbsent storage is not available.

    • While in the lock, check that a new snapshot has not been created.

  4. Rename the snapshot file, with the pattern “snapshot-<versionNo>”.

Fig 13. The steps performed by writer-committers. In a Flink topology, steps 0-1 would be run by writers and the rest run by the single committer task.

It is possible for two or more writers to attempt to perform a rename of their new snapshot file using the same target name. Snapshot file names are suffixed with the table version number, such as snapshot-12 for example. If the object store supports PutIfAbsent, then the second writer to try to perform a rename to snapshot-12 would fail - this is a snapshot conflict. For object stores that do not support PutIfAbsent, such as S3 at the time of writing, then a lock is acquired and the existence of a conflicting snapshot is checked before doing the rename.

When a snapshot conflict occurs, Paimon simply retries the commit, including rewriting all the metadata files of the commit. The list of new data and DV files of the bucket(s) will need to be merged with the latest snapshot (that conflicted). The actual data and DV files in the bucket(s) remain unchanged - we’re only merging the list of new files needed for metadata, not modifying the data and DV files themselves. These retries will continue until the commit is successful.

The fact that Paimon doesn’t perform a data conflict check after a snapshot conflict is a large divergence from the other table formats. Delta Lake and Apache Hudi perform a data conflict check after a Delta Log or Timeline conflict (their equivalent of a snapshot conflict). The Iceberg spec also indicates that a retry should have conditions and those conditions determine the isolation level. Delta Lake treats writes to different partitions as conflict-free and Hudi treats writes to different file slices as conflict-free. However, should the metadata indicate a write to the same partition/file slice as the writer’s own transaction, then the data conflict check fails and the Iceberg/Delta/Hudi writer must abort (and clean up the data files that were written). With a large number of writers, this data conflict check can be quite disruptive (impacting performance), unless writers can be aligned to partitions/file slices (which can be difficult).

It might sound like a problem that Paimon doesn’t have this check but it isn’t. When a Paimon topology of writers maps each bucket to a single writer (and its compactor), it cannot have a data conflict as rows and DV files are mutated serially - hence no need for a data conflict check. This lack of data conflict check will come into play in topology 2 though.

Topology 2 - Microservices using a Paimon table as the source of truth

The workload that Paimon is not designed for (at the time of writing) is multiple independent writers doing read, modify, write operations on the table, treating the table as the source of truth - think multiple applications over an OLTP database.

So far, we’ve looked at Paimon from the perspective of some kind of compute engine (such as Flink), writing to a Paimon table as a data destination only. The Flink job itself may be the source of truth, such as some kind of windowed aggregation, or the source of truth could be an upstream database and Flink is part of the CDC stream. 

Topology 2 issue #1 - Merge disorder due to sequence numbers

The most obvious issue with this “multiple writers per bucket” workload is its use of per-writer sequence number counters. Sequence numbers determine the merge order per primary key. The counter of each writer would advance at different rates and so the sequence numbers applied to any given key would not match temporal ordering. A sequence field based on a timestamp would have to be configured to take the place of the per-writer sequence number. Though this is not a water-tight solution to consistency, it is likely good enough for analytics.

Topology 2 issue #2 - Lost updates

Apache Iceberg, Apache Hudi, and Delta Lake can support this workload primarily because all (Iceberg optionally it seems) include a data conflict check after a Metadata file/Delta Log/Timeline conflict (their equivalent of a snapshot conflict). This data conflict check ensures that any data written by a transaction does not modify any data written by any other concurrent (conflicting) transactions. A general-purpose table format needs such a check in order to offer snapshot isolation, or else anomalies such as lost updates can occur. 

For example, if two concurrent writers were to both update the same row, based on the same version, then a lost update would occur if both transactions were allowed to complete successfully - the second transaction could overwrite the changes of the first.

Fig 14. At the top, two transactions start at the same time, and attempt to modify the row {‘Jack’, ‘Yellow’, ‘Hiking’}. The first transaction changes FavColor to ‘Blue’ and commits successfully. The second transaction only wants to change FavHobby to ‘Cycling’. If transaction 2 is allowed to commit it overwrites the commit of the first, resulting in [‘Jack’, ‘Yellow’, ‘Cycling’] which is a Lost Update anomaly. On the bottom, two clients each want to add one point to Jack. They each begin a transaction, then obtain the current number of points (1), then write an update row with points=2. If the second transaction is allowed to go through, then again, we have an anomaly violating snapshot isolation.

Lost updates could occur in the following scenarios:

  • Multiple writers doing concurrent UPDATE queries on the same row.

  • Multiple compactors doing concurrent modifications to the deletion vector file of a bucket.

Partial-update engine note

The partial-update engine could solve the first example, of favorite color and hobby, as +U rows would only need to contain values for the values that are being set. This way, the two operations don’t overlap. However, the partial-update engine would not solve the second case which writes to the same column.

Topology 2 issue #3 - Dangling deletion vectors

Paimon does include a deleted file check that can prevent some concurrent compaction jobs from conflicting with each other. Before committing, a compactor checks the latest snapshot and if it detects that one or more of the files it has logically deleted were already logically deleted, then it aborts. This prevents two concurrent compactions that share one or more levels to avoid conflict.

However, it does not prevent a level 0 job that adds a deletion vector, and a level 3 job that removes a deletion vector, from overwriting each other’s DV file changes. Another DV anomaly is that of a dangling deletion vector. This is possible when two compactions targeting different levels run concurrently and a newly created deletion vector points to a data file that the other compaction logically deleted and rewrote as a new file in a higher level.

Dangling deletion vectors can violate the guarantee that any given PK will only have one unmarked row in levels 1 and above. This could affect topologies that use parallel readers over the same bucket.

Data conflict check to the rescue

To avoid these anomalies, Paimon would need the data conflict check that would be carried out after a snapshot conflict. The writer would check the metadata files of any conflicting snapshot, to see if any writes were made to the same bucket(s) as its own operation. If so, then the writer/compactor would have to abort its operation to avoid the possibility of lost updates and dangling deletion vectors.

Implementing a data conflict check wouldn’t be a lot of work, so I’m sure Paimon can be made to support this workload in the future. Paimon PMC members tell me that this is not a workload that is currently being asked for but if it were, it wouldn’t be much work to support. One thing to note about Paimon however, is that part of its strength is the lack of a need for a data conflict check when using the right topology. Data conflicts are the worst for performance and being able to avoid them is a big advantage. The tradeoff is that you must get the number of buckets right and deal with the tension between the number of files vs the amount of parallelism that comes with that.

Consistency summary

To summarize how Paimon is consistent with multiple writer/committers with one writer per bucket:

  • Writes to each bucket are performed serially:

    • The same sequence number counter is used, ensuring correct merge order. When a writer starts it obtains the last sequence number used and carries on from there.

  • Compactions to each bucket are performed serially:

    • Deletion vector files are read then rewritten by the compactor thread, serially, so no conflicts are possible.

    • Also, compactions cannot conflict with writers as each writes to different levels.

  • Snapshots are committed serially either due to the Flink topology of serial commits or due to optimistic concurrency control when concurrent commits are allowed:

    • A commit merges the changes with the latest snapshot.

    • A snapshot conflict causes the whole commit process to be retried, which includes merging the changes with the newly detected latest snapshot (but leaving the data files unchanged).

While Paimon doesn’t support “a table as a source of truth” with multiple independent writers, it could easily do in the future, should there be demand for it.

Next in part 3, we’ll go over the formal specification and the results of model checking.

Analysis parts: