Understanding Apache Paimon's Consistency Model Part 3

In this final part of this Apache Paimon series, I’ll go over the formal verification with Fizzbee.

Fizzbee specification and model checking results

Normally I use TLA+ for formal verification but this time I decided to try out Fizzbee, a language and model checker that maps closely to TLA+ semantics but uses a subset of Python called Starlark. Fizzbee is still relatively immature but it shows a lot of potential. I’ll be writing about my experiences with Fizzbee in a future blog post.

The specification models:

  • A three column primary key table.

  • One or more writers (with co-located dedicated compactors), over a configurable number of partitions and buckets. 

  • Deletion vectors enabled or disabled

  • PutIfAbsent enabled or disabled. When disabled, you can enable a lock.

  • You can choose different topologies such as one writer/compactor/committer per bucket, or multiple writers per bucket.

It has the following safety properties:

  • SequentialSnapshotIds: Snapshot versions are sequential (no gaps).

  • NoDanglingDeletionVectors: Simply, no dangling deletion vectors.

  • DVEnabledOnlyOneValidRowPerPkInLevels1Plus: For any given PK, no two rows can be unmarked by a deletion vector in levels 1 and above.

  • ConsistentRead: No data loss, no lost updates. A linearized history variable keeps track of the expected read value of each column of each primary key at each table version. If a read of a column of a given PK at a given version differs from the expected read, this property is violated.

I wrote a readme with a more detailed description of the spec.

The good thing about Fizzbee was that writing the specification in Python allowed me to model more features of Paimon than I might have done with TLA+. Anyone can read Python code, so I felt more comfortable adding more complexity. 

The downside of Fizzbee right now is that the model checker does not spill to disk and so only supports small state spaces that can fit in memory. Even with 100GB available on my workstation, Fizzbee ran out of memory on anything but the trivially small models (model referring to the configured number of writers, number of keys, number of buckets, number of ops, etc). This means that I haven’t yet been able to run the spec through the kinds of model sizes that I would routinely do with TLA+.

Multiple writers, one writer per bucket (valid topology)

Given the caveat on restrictions on model size, I did not find any surprises. Multiple writers can attempt concurrent commits when PutIfAbsent storage is used, or a lock is used to perform a manual snapshot conflict check.

Invalid topologies

I also model checked some topologies that Paimon is not designed for and should result in data anomalies. I’ve included this to show off Fizzbee, but also to be complete. I should reiterate, that Paimon was not designed for the following topologies.

Multiple writers per bucket

As expected Fizzbee found consistency violations due to sequence numbers reordering the merges. I did not get around to modeling a sequence field that would be the solution to this issue. Set the NUM_WRITERS = 2, NUM_COMPACTORS = 0, NUM_BUCKETS = 1, NUM_PARTITIONS=1 to reproduce.

One writer, multiple compactors in a single bucket

This topology aims to check if the file deletion check is enough to avoid multiple concurrent compactions in the same bucket from conflicting. To avoid multiple writers causing issues, set the NUM_WRITERS = 1 and NUM_COMPACTORS = 2, with one bucket and one partition.

Unfortunately, the state space is too large for Fizzbee to reach a dangling deletion vector on my workstation (due to the out of memory issue). So I cheated. I managed to reproduce the dangling deletion vector issue by reducing the state space using some custom guard clauses (enabling conditions in TLA+ parlance). By using these custom guard clauses, I allowed the state space to fit into 40 GB of memory (I documented this in the readme).

This produced two invariant violations (NoDanglingDeletionVectors and DVEnabledOnlyOneValidRowPerPkInLevels1Plus), caused by histories such as:

  1. Writer writes row [+I, Jack, Red, 1] to data file A in level 0, and commits.

  2. Compactor 1 compacts level 0, rewriting data file A to data file B in level 1, and commits.

  3. Writer writes row [+U, Jack, Blue, 1] to a data file C in level 0, and commits.

  4. Compactor 1 and 2 start a compaction at the same time based on table version 3.

    • Compactor 1 targets level 1.

    • Compactor 2 targets level 0.

  5. Compactor 1 rewrites data file B of level 1 with row  [+I, Jack, Red, A] to data file D in level 2. It marks data file B as (logically) deleted.

  6. Compactor 2 rewrites data file C of level 0 with row  [+U, Jack, Blue, A] to data file E in level 1. It adds a DV for data file B, position 0. It marks data file C as logically deleted.

  7. Compactor 1 performs its commit for table version 4.

  8. Compactor 2 tries to commit. 

    • It loads the metadata of table version 4.

    • First it checks if any files it wants to logically delete have already been logically deleted. It wants to logically delete data file C, and sees that it remains valid (compactor 1 deleted data file B).

    • It goes to do the snapshot file rename but gets a snapshot conflict. It goes back to the metadata file step and commits again, successfully committing the DV file with a deletion vector pointing to data file B, which is no longer part of the current table version - a dangling deletion vector.

The following is the output of the DVEnabledOnlyOneValidRowPerPkInLevels1Plus property that got violated. See how only two data files are part of the snapshot, and they do not match the one referenced in the DV file.

INVARIANT VIOLATION! Two valid rows of the same primary key!
Version: 5
File ids of versions: ["p-0_b-0_data-17", "p-0_b-0_data-25"]
DV file: record(dvectors = {"p-0_b-0_data-5": set([0])})
Rows:
PK: jack, data file: p-0_b-0_data-17, level: 2, pos: 0
PK: jack, data file: p-0_b-0_data-25, level: 1, pos: 0

Because deletion vector files are written during the write phase, their contents do not get included in the metadata conflict checks. The data conflict check I discussed in part 2, which is pretty trivial to implement, would solve this issue.

Model checking summary

Fizzbee found no issues with “single writer/compactor per bucket” topologies. Concurrent commits also passed.

As expected, non-supported topologies and configurations failed. Don’t focus too much on these issues, Paimon as used today, shouldn’t see any of these.

The author of Fizzbee is working on some improvements as we speak, to allow me to run larger state spaces in Fizzbee, so I’ll be rerunning the Paimon spec on larger models soon. I’ll be doing a more thorough write-up of Fizzbee in the coming weeks.

Conclusions

Apache Paimon is a really interesting project and worth a look. It has been designed around streaming use cases primarily and is still very much a Flink project even if it has spun out as a top-level project. It will be interesting to see if it gains more adoption with support from other compute engines.

A stand out design point for me is its use of partitions for data pruning and single writer buckets for parallelism allowing it to scale out without introducing costly data conflicts. The tradeoff is that this scaling means getting the number of buckets right. Too many and data is distributed too thinly resulting in too many small files. Too few, and the parallelism of readers and writers is limited.

I hope this series goes some way for people that want to understand how Apache Paimon works, at a logical level at least. Next up is the final post of this table format series on consistency, where I will examine Apache Iceberg (v2).

Finally, on Fizzbee, check out the Paimon specification. A motivated reader should be able to combine these blog posts with the specification and be able to understand it. TLA+ can be somewhat more brutal to the uninitiated, so I’m keen to know if Fizzbee is more approachable.

Analysis parts: