Understanding Apache Iceberg’s Consistency Model Part 3

In this final part of my Apache Iceberg series on its consistency model, I’ll cover the formal specification I wrote for it and the results of model checking.

The Apache Iceberg formal specification (Fizzbee)

Just like I did with Apache Paimon, I chose Fizzbee to do the formal verification instead of TLA+. Despite the immaturity of the tooling around Fizzbee, I enjoyed writing Python and found expressing a lot of the table format logic in Python easier than TLA+. I love TLA+, especially for higher level specifications, but for reverse engineering Paimon and Iceberg, which involve many levels of indirection, Python was a little more comfortable. The great thing is that Fizzbee gives me most of the semantics of TLA+, and an exhaustive model checker (though with some serious limitations at the time of writing).

The specification models:

  • A three-column Iceberg table where the first column acts as an id column. The workload models a primary key table by constraining INSERT operations to only happen once per id column value. The possible column values are controlled by three sets representing the three columns, and the model checker will only explore histories with those column values.

  • Three roles:

    • One or more writers.

    • An object store.

    • A catalog.

  • Serializable or Snapshot Isolation, which enables validation checks based on how Apache Spark configures them.

  • Four operations:

    • MergeAppend.

    • OverwriteFiles (copy-on-write).

    • RowDelta (merge-on-read).

    • RewriteFiles (compaction).

  • All the validation checks that were described in part 2, as configured by Apache Spark.

  • Each time metadata files (manifest-lists, manifest files etc) get written, the writer performs a full manifest file merge resulting in one manifest file for DATA and one manifest file for DELETES. Model checking involves a very small number of operations and rows, so keeping the number of manifests small via full manifest compaction in each op made sense.

The following safety properties (invariants) are modeled:

  • SequentialSequenceNumbers: Sequence numbers across the snapshots are sequential (no gaps). 

  • NoDanglingDeleteFiles: No delete file of a committed snapshot references a data file not present in that snapshot.

  • ConsistentRead: A linearized history variable keeps track of the expected read value of each column of each id column value at each table version. If a read of a column of a given id column value at a given version differs from the expected read, this property is violated.

Each id column value, acting like a primary key, maintains a linearized history for each column of its row. On each successful commit, the committed data changes are pushed onto their respective column history. An insert will push every column value of the inserted row onto its respective history. An update pushes only the SET column value onto its history. A delete pushes None onto the histories of each column.

Fig 1. How a series of operations is registered as a set of column value histories.

This correlates to the following results of a table scan at each table version:

  • v1: ["jack", “apple“, "red"]

  • v2: ["jack", “apple“, "red"], ["sarah", “plum“, "blue"] 

  • v3: ["jack", “apple“, "blue"], ["sarah", “plum“, "blue"] 

  • v4: ["sarah", “plum“, "blue"]

  • v5: ["sarah", “plum“, "green"]

The ConsistentRead invariant uses this to compare table reads against the recorded history to ensure consistency.

  • The ConsistentRead invariant, in each step, checks that a read of each possible id column value, at each committed table version so far, returns the correct data. If a row with a given id value has not been written yet at a given version, then both the history and the table read should return no rows.

  • If the ConsistentRead invariant detects divergence between a table read and the linearized history, the model checker stops and prints an error trace.

The specification also includes the following liveness checks:

  • OperationsEventuallyCommitOrAbort: Once an operation begins, either it commits or aborts. It does not get stuck.

  • AllSnapshotsCommitted: The specification limits the number of write and compaction operations to limit the state space. Eventually, the number of snapshots that get committed equals the write op limit + the compaction op limit minus the number of aborted operations. This ensures that all operations that should have been able to commit, do so.

Liveness checks are as important for detecting bugs in the specification as they are for detecting liveness bugs in the target protocol.

The specification has a number of limitations:

  • It does not model partitions.

  • It does not model data conflict filters.

  • It does not model equality deletes.

  • It does not model any delete-by-filter (not covered in this series).

Implementing partitions and data conflict filters might be worth it, as a mistake in the filtering logic based on partitions and conflict filters could cause a false negative validation result. I’ll leave that for future work. Equality deletes would also be worth implementing.

Model checking results

Model checking flagged one specific configuration, which only needed a single row:

UPDATE_MODE = TableMode.MERGE_ON_READ
DELETE_MODE = TableMode.MERGE_ON_READ
CONFIGURED_ISOLATION = Isolation.SNAPSHOT_ISOLATION
NUM_WRITERS = 2
ALLOW_UPDATES = True    # Include update queries
ALLOW_DELETES = True   # Include delete queries
MAX_WRITE_OPS = 3      # The max number of write operations
MAX_COMPACTION_OPS = 0 # The max number of compactions
Col1Values = ['jack']
Col2Values = ['red', 'blue']
Col3Values = ['A']

It detected an “undelete” based on how Apache Spark enables validation checks for RowDelta operations. Currently, Spark only enables the “No new delete files validation check” for UPDATE and MERGE operations. DELETE operations do not enable this check.

That results in the following counterexample:

  1. Writer-0: A single row ['jack', 'red', 'A'] is added to data-1 in an AppendFiles op. Run through to commit of metadata-1 with snapshot-1.
  2. Writer-0: Step=StartUpdateOperation.
    • The operation begins, but no scan has been kicked-off yet.
    • SET Color = “blue” WHERE Name = “jack”.
    • RowDelta op with scan snapshot id = snapshot-1.
  3. Writer-1: Step=StartDeleteOperation.
    • The operation begins, but no scan has been kicked-off yet.
    • WHERE Name = “jack”.
    • RowDelta op with scan snapshot id = snapshot-1.
  4. Writer-0: Step=ReadDataFiles.
    • Performs a scan, finds row ['jack', 'red', 'A'] in data-1 to update.
    • Determines that data-1 pos 0 must be invalidated with a delete file.
  5. Writer-1: Step=ReadDataFiles.
    • Performs a scan, finds row ['jack', 'red', 'A'] in data-1 to delete.
    • Determines that data-1 pos 0 must be invalidated with a delete file.
  6. Writer-0: Step=WriteDataFiles.
    • Writes row ['jack', 'blue', 'A'] to data-2.
    • Writes [data-1, pos 0] to delete file delete-1.
  7. Writer-1: Step=WriteDataFiles.
    • Writes [data-1, pos 0] to delete file delete-2.
    • This means data-1 has two delete files referencing it, though none is committed yet at this point.
  8. Writer-0: Step=WriteMetadataFiles.
    • Refresh metadata. Remains at metadata-1.
    • Perform validation - passes.
    • Write a new data manifest manifest-2 with data-1 EXISTING and data-2 ADDED.
    • Write a new delete manifest manifest-3 with delete-1 ADDED.
    • Write a new manifest list manifest-list-2 with manifest-2 and manifest-3.
    • Write a new metadata file metadata-2 with snapshot-2 containing manifest-list-2.
  9. Writer-0: Commit.
    • Swaps metadata-1 for metadata-2.
  10. Writer-1: Step=WriteMetadataFiles.
    • Refresh metadata to metadata-2.
    • Perform validation. Checks that the data file data-1 has not been deleted since snapshot-1 was written. It has not. Does not check for conflicting deletes as that is only enabled in Spark for UPDATE and MERGE commands.
    • Write a new data manifest manifest-4 with data-1 EXISTING and data-2 EXISTING.
    • Write a new delete manifest manifest-5 with delete-1 EXISTING and delete-2 ADDED.
    • Write a new manifest list manifest-list-3 with manifest-4 and manifest-5.
    • Write a new metadata file metadata-3 with snapshot-3 containing manifest-list-3.
  11. Writer-1: Commit.
    • Swaps metadata-2 for metadata-3.

At this point, the table files look as follows:

Fig 2. A simplified view of the files of the table at the end of the counterexample.

A read of snapshot-3 would return one row: [‘jack’, ‘blue’, ‘A’]. However, whether we evaluated the total ordering as UPDATE then DELETE, or DELETE then UPDATE; no row matching Name = “jack” should be present in snapshot-3.

This violates the snapshot isolation where two operations based on the same table version were both able to commit changes to the same row. Snapshot isolation should prevent the second commit from succeeding.

If the “No new delete files validation” had been enabled, writer-1 would have seen delete-1 and aborted. Also, Serializable isolation would have detected the conflict in this case, due to the “Added data files validation”, though that check is not guaranteed to prevent this kind of anomaly.

I have confirmed the behavior using some throwaway unit tests within the Core module of Apache Iceberg, but the Spark tests do not offer the ability to custom set the “snapshot for validation, aka the scan snapshot” that is needed to simulate concurrent operations. I did manage to verify that Spark SQL DELETE commands hit the code path where the necessary validation checks get enabled (though only for UPDATE and MERGE commands and not DELETE). So I’m reasonably confident that this is a real omission in the code. I have raised an issue in the GitHub repo.

Conclusions

Apache Iceberg passes model checking when deletes enable the necessary validation checks. I am still limited by the very small state spaces that Fizzbee is capable of checking, but hopefully that limitation will be fixed soon and I can rerun the spec with larger state spaces to gain more confidence.

Iceberg includes all the necessary checks; it’s up to the compute engine to correctly enable them and ensure the data conflict filters are used correctly to reduce false positive conflicts in multi-writer topologies. The safety violation found by Fizzbee shows that Iceberg’s approach of leaving validation checks to be enabled by the compute engine is a little risky. It provides more flexibility but also places some of the burden of correctness of the Iceberg layer on the compute engine.

This concludes the table format consistency series where I have covered four table formats (Apache Iceberg, Apache Hudi, Apache Paimon and Delta Lake). While they all do things a little differently, many of the core mechanics involved can be generalized to the same or similar behaviors. Paimon is probably the most different as it’s a narrower use case built around streaming with Apache Flink.

I hope this series helps people attain a deeper understanding of how these table formats work and also lowers the barrier to entry for new code contributors to these projects.

Series links: