In this post, we will explore Apache Iceberg's concurrency control and data conflict checks that provide compute engines mechanisms for offering transactions with Serializable and Snapshot Isolation. We will focus on multi-writer scenarios, as we have throughout this table-format series.
Iceberg correctness in multi-writer scenarios
Iceberg supports multiple writers concurrently writing to the same table (and across tables though that is out of scope for this post). The simplified model of a write follows a number of steps:
The specifics of these steps depend on the type of write operation being carried out and there a few different operation types.
Data conflict check + metadata file commit
The foundation of Iceberg consistency lies in the data conflict check and the atomic metadata file commit.
The metadata file commit is the compare-and-swap operation where a writer provides the current metadata location and the new location of the metadata file it has just written. The catalog must perform an atomic CAS for the commit to be safe. If the current metadata file location provided by the writer does not match the current location known by the catalog, the commit is rejected. This ensures that commits can only be successful if they have written their new metadata based on the current metadata, and not a stale version. If a writer based their metadata on a stale metadata file and was able to commit, any committed changes that occurred in the actual current metadata version would be lost.
The key thing to understand here is that the version of the metadata used when performing a scan of the table and the version used as the basis for writing the new metadata can be different. While the operation is in the reading and writing phase, another operation could commit a new snapshot - this snapshot will be loaded in step 2 above. This doesn’t always lead to a conflict, as we’ll see next.
However, in a different interleaving, writer 2 refreshes its metadata before writer 1 commits, and so its metadata commit fails (as its refreshed metadata is stale).
In the case of rejection, the writer does not need to abort the operation, it is possible to retry the commit as long as none of the data changes of its operation conflict with any snapshots committed after its scan snapshot.
When a writer experiences a metadata commit rejection, the rejected writer needs to retry the commit by:
Refreshing its metadata.
Rerunning the data conflict check.
Writing the metadata files again, based on the refreshed metadata.
Performing the catalog commit again, using the refreshed metadata location as the current location.
But what happens if there is a data conflict?
The data conflict check (which is a suite of metadata-based validation checks) attempts to determine whether:
there are definite file conflicts, such as two operations attempting to logically delete the same data file.
there are potential data conflicts, based on row filters (more on that soon)
Going back to the two writers' sequences, let’s imagine that the operations of writer 1 and 2 do conflict. In this interleaving, writer 2 does not detect a conflict the first time around, but after the metadata commit rejection, the second data conflict check (based on the refreshed metadata of snapshot-2) fails.
This high-level process is followed for all operation types, but the details of the steps vary by operation (which we’ll look at soon).
Data conflict filters
The data conflict check is a set of metadata-based validation checks (using the term in the codebase). Many of the validation checks can include an optional data conflict filter which is a row-based predicate. Without this filter, some checks are overly broad and lead to conflicts being detected when in reality, no actual conflicts have occurred. For example, one of the validation checks looks for added files that could conflict with the operation to commit. Without a data conflict filter, the validation will fail if any data file has been added in a post-scan snapshot. With data conflict filters in place, two operations that touch disjoint sets of data can pass validation, and avoid the need for one operation to abort.
To my knowledge, data conflict filters are unique to Apache Iceberg. They work by filtering manifest entries that cannot contain data matched by the filter. Compute engines can push down some query predicates into Iceberg itself so that the Iceberg planning phase can prune data files. The compute engine can set these query filters as data conflict filters.
Validation checks typically read the manifests of each post-scan snapshot, looking for entries (data/delete files) that could violate the validation rule in question. This manifest entry read process applies the data conflict filters to reduce the possible manifest entries to evaluate against the rule.
Manifest entries have two important pieces of information that are used when evaluating them against the filter:
The partition spec.
The min/max column statistics for each column.
The manifest entry filtering is applied as follows:
If the data conflict filter contains all of the partition spec columns, then the manifest file itself is skipped if its partition spec values do not match the filter. For example, if the manifest entry is of the partition “color=red” but the filter is color == “blue” then the entry is skipped.
Each manifest entry (of a manifest not skipped based on partitions), is evaluated by comparing the filter predicate to the column statistics. If the predicate potentially matches the upper and lower bounds defined in the column stats, then the manifest entry is a match. If not a match, then the entry is skipped.
The same process also applies to delete files, however, with some differences:
Manifest entries of delete files do not store column statistics of the table columns like data files do. Therefore only partition spec-based predicates can be applied to these manifest entries.
Manifest entries of delete files store the data file path and position in column stats for “delete file path” and “position” columns, but only if the delete file references a single data file. I haven’t yet seen any compute engines make use of these column stats.
When compute engines set a data conflict filter, they reduce the probability of data conflicts being raised, though the effectiveness is very much dependent on partitioning and good data clustering. Apache Spark sets the data conflict filter as all predicates that can be pushed down into Iceberg.
Operations and their data conflict checks
In part 1, we covered copy-on-write, merge-on-read, and compactions. In this part, we’ll do the same, but refer to them by the names of the interfaces of these operations in the Iceberg API module.
There are four operation types in Iceberg:
APPEND: The operation only adds data files.
OVERWRITE: The operation performs row-level updates/deletes. OVERWRITE can be COW or MOR.
REPLACE: A compaction operation that replaces a set of files with new ones (logically the same data, just rewritten as a set of fewer files, possibly clustered).
DELETE: The operation only deletes files.
These operation types map to four operations that I want to cover in this post (there are a couple more but I want to limit the scope a little). For this analysis, I focus on the following four:
AppendFiles (API module)
APPEND type
Implementations: FastAppend and MergeAppend in the Core module. The MergeAppend performs extra work of merging manifest files to keep the number of manifest files in check, avoiding the need for manifest compactions.
OverwriteFiles (API module):
OVERWRITE type.
Copy-on-write row level update/delete.
Implementation: BaseOverwriteFiles (Core module).
RowDelta (API module)
OVERWRITE type.
Merge-on-read row level update/delete.
Implementation: BaseRowDelta (Core module).
RewriteFiles (API module)
REPLACE type.
Implementation: BaseRewriteFiles (Core module).
Each operation implements data conflict checks differently, but they all share the common high-level logic for the commit process that I covered at the top.
These operations allow compute engines to register the files they have added and logically deleted as part of the read/write phases and finally invoke the commit() method (where steps 2-5 are carried out within the Iceberg library).
AppendFiles operation
With some simplification, the compute engine calls the appendFile(DataFile file) method for each data file it has written, then calls commit().
This is the simplest operation as it only involves adding new data files and no data conflict checks are performed.
Iceberg does not provide primary keys and makes no guarantees against duplication. For example, if two writers executed the same INSERT INTO Favourites(Name, FavColor, FavLetter) VALUES(‘jack’, ‘red’, ‘A’) command, then the table would end up with two identical rows. Because Iceberg does not support primary keys natively, append operations do not check for conflicts as it is assumed that no other operation can conflict with it. The opposite may not be true, other operations can still consider that an append operation can conflict with them.
OverwriteFiles operation (copy-on-write)
The OverwriteFiles operation allows the compute engine to add and logically delete data files. With some simplification, the compute engine can call:
addFile(DataFile file) for each data file it has written and wants to be added to the new snapshot (we’ll call this the added-set).
deleteFile(DataFile file) for each data file it wants to logically delete in the new snapshot (we’ll call this the deleted-set).
Finally, the engine calls commit() where the added-set and the deleted-set are validated and committed.
There are a number of possible data conflicts between concurrent writers:
Two OverwriteFiles operations are updating a row in the same data file. Both operations logically delete the same file, but each writes a different new version of that file. This conflict is also possible between an OverwriteFiles and a RewriteFiles (compaction).
An OverwriteFiles operation doing an update or delete conflicts with a RowDelta operation doing an update or delete. The RowDelta commits first, then the OverwriteFiles operation deletes the data file that the RowDelta’s delete file references.
Let’s look at the first example. The scene below describes how snapshot-1 was the current snapshot when the compute engine kicked off two concurrent OverwriteFiles operations carrying out UPDATE commands (operations A and B). Both commands update different rows of data file data-1 using copy-on-write. Operation A commits, and Operation B is about to commit.
If Operation B performed no data conflict checks at all and, therefore, committed successfully, it would result in an inconsistent manipulation of the metadata files. Data-1 would not be a member of the dataset while data-2 and data-3 would both be members of the dataset, creating duplication and conflicting values for ‘Jack’ and ‘Sarah’.
The reason for this is that in this interleaving, operation B refreshed its metadata after operation A committed, so it could see snapshot-2. It based its changes on snapshot-2 which deleted data-1. When Operation B wrote its metadata, it filtered out data-1 completely as it was deleted in snapshot-2 and it created a new manifest file for data-3. Its manifest-list file would include manifest files that listed data-2 and data-3.
If this is not clear then Part 1 goes into great detail about how manifest files and manifest-list files are manipulated.
The conflict detection of OverwriteFiles would detect and prevent this and other conflicts. OverwriteFiles offers three data conflict checks to the compute engine:
Fail missing delete paths validation, enabled if either of the following two checks is enabled.
No new deletes for data files validation, via method validateNoConflictingDeletes().
Added data file validation, via method validateNoConflictingData().
Fail missing delete paths validation
I’ll start with this check as it is the primary defense against OverwriteFiles/OverwriteFiles conflicts and OverwriteFiles/RewriteFiles conflicts. It is enabled when either of the other two validation checks are enabled.
This check is performed in the metadata writing phase (after the main validation phase), upon reading the physical manifests and converting them to logical manifests. As part of this physical-to-logical read process, any manifest entry with the status ADDED or EXISTING that is a member of the operation's deleted-set is converted to DELETED. Once the logical manifests have been loaded, this conflict detection check ensures that all data files of the deleted-set appear as entries with the DELETED status in the logical manifest files, if it fails the validation check.
In another example, we see how the check fails:
In our operation A/B example, this check prevents Operation B from committing. Operation B has a deleted set of [data-1]; however, when it reads the logical manifests, data-1 is filtered out as it was already deleted in snapshot-2. This check fails as data-1 is not listed in any logical manifest. Thus, an inconsistent commit is avoided.
No New Deletes for Data Files Validation
This check iterates over the post-scan snapshots and checks whether any delete files match all of the following:
Delete file sequence number > scan snapshot sequence number (i.e. added after the scan snapshot)
Added as part of a DELETE or OVERWRITE type of operation.
Matches an optional data conflict filter.
References a data file that was logically deleted by this operation.
The data conflict filter is only effective for delete files if the filter includes the partition spec columns. This allows for delete files of partitions that do not match the filter to be excluded.
OverwriteFiles can (logically) delete data files, but they don’t add delete files. Another operation could add new delete files if it’s a RowDelta operation (which is merge-on-read). Remember that compute engines such as Apache Spark allow for UPDATE, DELETE and MERGE commands to be configured separately to use COW or MOR. This check prevents a COW row-level change (OverwriteFiles) from conflicting with a committed MOR operation (RowDelta).
Added data file validation
This check iterates over the post-scan snapshots and checks whether any manifest entries match all of the following:
Status=ADDED
Added as part of an APPEND or OVERWRITE type of operation (but not REPLACE). Remember that REPLACE is only used by compactions which do not change data logically, they only optimize the physical storage.
Match any (optional) data conflict filters.
If any manifest entries match, a conflict is detected, and the operation must abort. The data conflict filter should be used with this check; otherwise, any added file will cause this check to fail. Spark only enables this check for Serializable isolation to avoid concurrent operations making changes to data that match the same query filters.
Enabling these checks
The compute engine can enable these checks via the following methods on the OverwriteFiles interface:
validateNoConflictingData()
validateNoConflictingDeletes()
Apache Spark enables both with Serializable isolation but only validateNoConflictingDeletes() with Snapshot Isolation. Only one of these is required to enable the “Fail missing delete paths validation” check, which is critical for OverwriteFiles to avoid conflicts with other OverwriteFiles or RewriteFiles (compaction). validateNoConflictingDeletes() is critical for OverwriteFiles to avoid conflicts with RowDelta operations.
RowDelta operation (merge-on-read)
The RowDelta operation allows the compute engine to add data files and add delete files (position or equality deletes). With some simplification, the compute engine can call:
addRows(DataFile file) for each data file it has written and wants to be added to the new snapshot (we’ll call this the added-set).
addDeletes(DeleteFile deletes) for each delete file it wants to add in the new snapshot (we’ll call this the delete-set).
Finally, the engine calls commit() where the added-set and the delete-set are validated and committed.
There are a number of possible data conflicts between concurrent writers:
This operation has created a delete file that references a data file that another operation, such as an OverwriteFiles or a RewriteFiles (compaction) has logically deleted.
Two RowDelta operations modify the same row. One deletes the row and the other updates the row. We end up with two delete files pointing to the same original row and one new data file with the updated row. The delete operation is effectively a no-op.
Let’s look at the first conflict, where DELETEs are configured to use COW and UPDATEs are configured to use MOR.
If operation B did not perform any data conflict checks then the table would end up as follows:
But, there are data conflict checks to prevent this and other conflicts from happening.
Data files exist validation, always enabled.
No new delete files validation, via method validateNoConflictingDeleteFiles().
Added data files validation, via method validateNoConflictingDataFiles().
Data files exist validation
This check iterates over the post-scan snapshots and checks whether any data files match all of the following:
Status=DELETED
Deleted as part of a REPLACE or OVERWRITE type of operation.
If the compute engine calls validateDeletedFiles() then it also includes manifests added by a DELETE type of operation.
Is referenced by a delete file being added by this operation.
If any manifest entry matches the above, then the validation check fails.
This validation check would fail for our operation A/B example above. Operation B references data-1 in its delete file, but data-1 is not a member of the dataset in snapshot-2, therefore the check would fail.
No new delete files validation
This check iterates over the post-scan snapshots and checks whether any delete files (position or equality deletes) match all of the following:
Status=ADDED
Delete file sequence number > scan snapshot sequence number (i.e., added after the scan snapshot). We only care about new deletes, but sequence numbers place bounds on which rows an equality delete applies to—an equality delete with a higher sequence number than the sequence number of the scan snapshot may invalidate the results of the read phase.
Added as part of a OVERWRITE or DELETE type of operation.
Match any (optional) data conflict filters.
The data conflict filter is only effective for delete files if the filter includes the partition spec columns. This allows for delete files of partitions that do not match the filter to be excluded.
If any delete files match the above, then the validation check fails. Without a data conflict filter, this validation will fail if any other RowDelta operation has committed a delete file in a post-scan snapshot. This seriously limits the concurrency of RowDelta operations. The data conflict filter can only filter delete files based on the partition spec, so getting partitioning right for a multi-writer topology is important.
Without this check it is possible for the “undelete” case I listed earlier. Basically, two RowDelta operations, one is a position delete (though it also applies to equality deletes) and one is an update, both targeting the same row. With the following interleaving:
Writer 1 starts an update, UPDATE Fruits SET FavFruit = ‘banana’ WHERE Name = ‘jack’.
Writer 2 starts a delete DELETE FROM Fruits WHERE Name = ‘jack’.
Writer 1 adds delete-1 which invalidates the existing “jack” row in data-1. It also adds data-2 with [“jack”, “banana”].
Writer 2 adds delete-2 which invalidates the existing “jack” row in data-1.
Writer 1 refreshes metadata, performs the conflict check, writes metadata and commits metadata-2 with snapshot-2.
Writer 2 refreshes metadata and sees snapshot-2. It does not perform the No new delete files validation check and sees no conflict. It commits snapshot-3.
At this point, a reader could still read [“jack”, “banana”] of data-2, despite the delete having supposedly deleted all rows WHERE Name = ‘jack’. Whether we consider the total ordering of operation A then B, or B then A, no “jack” row should exist in the final table version.
Added data file validation
This is exactly the same check that OverwriteFiles can call.
RewriteFiles operation (compaction)
Two validation checks are performed, which we have already covered:
Fail missing delete paths validation, always enabled.
No new deletes for data files validation, always enabled.
This prevents conflicts with OverwriteFiles, RowDelta and other RewriteFiles operations.
Summary
This post has not been 100% comprehensive, there are a couple more operations that I have not covered, as well as some additional ways of using the operations I have covered (that include additional validations). However, covering everything is a lot of work, and I think we’ve covered a lot already.
Apache Iceberg offers the validation checks (data conflict checks) necessary for Snapshot Isolation and Serializable transaction isolation levels. These isolation levels do not necessarily apply to append operations, as these operations have no data conflict checks at all - that is, Append/Append conflicts cannot happen as Iceberg does not natively model primary key semantics. The isolation levels apply to copy-on-write, merge-on-read, and compactions.
If we use Apache Spark as a reference:
for Snapshot Isolation (SI) it enables all validation checks except Added data files validation.
for Serializable, it includes all SI checks and also enables the Added data files validation check, to ensure that a concurrent operation A cannot add data that matches the pushed-down query predicate of operation B.
I haven’t benchmarked Iceberg or any of the other table formats, though it's on my to-do list. But from a theoretical point of view, partitioning and clustering will be important for avoiding unnecessary triggering of validation (aka data conflict) errors.
Added data file validation (Serializable isolation) will trigger if any data file gets added by a concurrent operation (that matches the data conflict filters). Therefore aligning writers to partitions and including all partition columns in the data conflict filters, will be beneficial in multi-writer topologies so writers cannot conflict with each other. This is only effective if the chosen partitioning scheme is not just purely time-based. If it is time-based only, then using a Spark/Flink topology where data is shuffled by a sorting key(s) will greatly improve clustering, and therefore data conflict filters that align to the clustering can be effective when evaluated against column statistics.
No new delete files validation (Snapshot isolation, merge-on-read) will trigger if any new delete files have been added that match the data conflict filter. The filter is only effective for delete files if it contains the partition spec columns. Therefore, aligning writers to partitions should be important again. Clustering should have no benefit regarding the concurrency of this validation check. However, given the delete file manifest entries have the reference data file path in the column statistics, if the delete file only contains a single data file, then perhaps this can be used when partitioning is purely time-based. Delete files would need to only contain references to a single data file until compaction removes them. This way, Iceberg could use validation that ensures deletes files committed in a post-scan snapshot don’t touch the same files, and potentially, the same rows, as the current RowDelta operation.
Next
Next up we’ll take a look at the formal verification of Iceberg and what potential issues it discovered.
Part 2, this post, details of concurrency control and data conflict checks to allow Apache Iceberg to handle multiple concurrent writers correctly.