Understanding Apache Hudi's Consistency Model Part 3 — Jack Vanlightly

Understanding Apache Hudi's Consistency Model Part 3

In part 1 we built a logical model for how copy-on-write tables work in Apache Hudi, and posed a number of questions regarding consistency with regard to types of concurrency control, timestamp monotonicity and more. In part 2 we studied timestamp collisions, their probabilities and how to avoid them (and be conformant to the Hudi spec). In part 3, we’ll be focusing on the results of model checking the TLA+ specification, and answering those questions.

This TLA+ specification only models the logic I have explained so far:

  • Commit action types only.

  • COW tables.

  • Uses a fixed-size pool of file groups, with lazy assignment. 

  • No table services (cleaning, compaction, clustering etc).

  • Only single primary key operations. Instead of rows, the data model is based on simple KV pairs, where the key presents the primary key and the value represents the non-PK column values.

The specification has the following parameters:

  • Writers. E.g. {w1, w2}.

  • Keys. E.g. {k1, k2}

  • Values. E.g. {A, B}

  • MonotonicTs

    • True = Monotonic ts provider: such as ZK.

    • False = Non-deterministic local clock: issues values between 1 and a monotonic value.

  • ConcurrencyControl

    • 0 = None

    • 1 = Optimistic

    • 2 = Pessimistic

  • PutIfAbsentSupport.

    • True = Instant and file slice files cannot be overwritten (and will cause the writer to abort on trying).

    • False = Instant and file slice files can be silently overwritten.

  • Primary key conflict check. 

    • True = Primary key conflicts that would result in duplicates are detected at index update time (before commit).

    • False = No primary key conflict detection is performed.

  • Salt

    • Yes. All instant and file slice identifiers include a unique salt to avoid file overwriting.

    • No.

The specification has one important invariant, ConsistentRead, that checks each committed KV pair op (insert/update/delete) is forever readable with the value associated to that commit (at a compatible timestamp).

I have summarized the results into two categories:

  • V5 spec conformant configurations

  • V5 spec non-conformant configurations

Hudi v5 spec conformant configurations

The Hudi spec makes it clear that timestamps must be monotonic, so all the configurations below use monotonic timestamps. Locking is recommended for multi-writer scenarios, so the configurations include optimistic and pessimistic concurrency control. Finally, the primary key conflict detection to avoid duplicates is optional, so I have configurations with and without it.

  Id     Locking     Timestamps   PK conflict check     Consistent    
  ---- ------------- ------------ ------------------- ---------------- 
   1   Optimistic    Monotonic    YES                 OK              
   2   Optimistic    Monotonic    NO                  Duplicate rows  
   3   Pessimistic   Monotonic    YES                 OK              
   4   Pessimistic   Monotonic    NO                  Duplicate rows 

These results correlate to the guarantees listed in the Hudi docs:

There were no surprises, which was good to see. 

ACID guarantees in compliant implementations

As we covered in part 1, atomicity and durability are trivially true. Model checking now gives us the results to determine whether Hudi supports consistency and isolation too.

When optional primary key conflict detection is implemented and enabled, then full ACID guarantees are delivered. However, without primary key conflict detection, we get a failure of isolation resulting in primary key duplication across file groups. It only occurs when two or more concurrent operations insert the same primary key in different file groups. The last write to the the primary-key-to-file-group mapping index wins. In an OLTP system this kind of isolation issue might only result in a lost write/update, but in Hudi it results in a consistency problem as the orphaned row can still be read in the wrong file group. Using the primary key conflict check in multi-writer scenarios solves the problem.

Hudi v5 spec non-conformant configurations

The following configurations do not conform to the Hudi v5 spec (all bets are off). However, there are countermeasures that exist that can make non-conformant configurations safe, namely PutIfAbsent storage support and the use of salts in instant and file slice file names. For completeness, we’ll look at safe and unsafe non-conformant configurations.

  Id   PutIfAbsent   Salt       CC         Timestamps     PK check              Consistent             
 ---- ------------- ------ ------------- --------------- ---------- ---------------------------------- 
   5   Any           Any    No locking    Any             Any        Fail (lost write)                 
   6   NO            NO     Optimistic    Non-monotonic   YES        Fail (lost write - ts collision)  
   7   NO            NO     Pessimistic   Non-monotonic   YES        Fail (lost write - ts collision)  
   8   YES           NO     Optimistic    Non-monotonic   YES        OK                                
   9   YES           NO     Pessimistic   Non-monotonic   YES        OK                                
  10   NO            YES    Optimistic    Non-monotonic   YES        OK                                
  11   NO            YES    Pessimistic   Non-monotonic   YES        OK                                

The only cases of data loss are related to non-conformant configurations. We also see that we can get away with non-monotonic timestamps if we use storage that supports PutIfAbsent or use salts. However, not doing concurrency control with multiple writers is never safe.

If you want to look at some traces of safe and unsafe configurations, then read on, else feel free to jump to the end. 

Let’s dive into some of these scenarios to understand why each is safe or unsafe.

Some pass/fail scenarios

Case 1 - No concurrency control  (Non-conformant and unsafe)

Parameters:

  • Writers={w1, w2}

  • Keys={k1, k2}

  • Values={A, B}

  • FileGroupCount=1

  • MonotonicTs=TRUE

  • ConcurrencyControl=0 (none)

  • KeyConflictCheck=TRUE

  • PutIfAbsentSupported = any (set to TRUE or FALSE)

This configuration is guaranteed to avoid timestamp collisions but experiences a lost write.

Fig 1. The issue is that concurrent operations of the different primary keys were mapped to the same file group, and both writers read the timeline at the same time, not finding any existing file slice. This led to the second operation not merging the contents of the first operation, resulting in a lost write of primary key k1.

Case 2 - OCC, two keys, PK conflict detection enabled (Conformant & safe)

Parameters:

  • Writers={w1, w2}

  • Keys={k1}

  • Values={A, B}

  • FileGroupCount=1

  • MonotonicTs=TRUE

  • ConcurrencyControl=1 (optimistic)

  • KeyConflictCheck=TRUE

  • PutIfAbsentSupported = any

This is the same as case 1, except we use optimistic concurrency control. This time, key actions are wrapped in a lock, leading to the second operation failing its OCC check.

Fig 2. The concurrency control check of w2 scanned the timeline and found the completed instant of w1, which touched the same file group as the operation of w2. Writer w2 had no merge target for its upsert, so used a timestamp of 0 for the check. The completed instant of w1 had a higher timestamp than 0, so a conflict was detected.

Case 3 - OCC, one key, PK conflict detection disabled (Conformant - duplicate keys)

Parameters:

  • Writers={w1, w2}

  • Keys={k1}

  • Values={A}

  • FileGroupCount=2

  • MonotonicTs=TRUE

  • ConcurrencyControl=1 (optimistic)

  • KeyConflictCheck=FALSE

  • PutIfAbsentSupported = any

Without the PK conflict detection, two concurrent inserts of the same key by different writers can cause the same key to be written to two separate file groups, despite OCC.

Fig 3. If PK conflict detection had been used, w2 would have seen that a mapping now existed for key k1, which conflicted with its own assignment, and it would have failed the check and aborted. Because it didn’t do that, it overwrote the mapping of w1 and orphaned the row in file group 1.

When a duplicate of a primary key exists in a file group that doesn’t correspond to the index, it is still readable, as long as its file slice remains referenced from the timeline. The interesting question is how does such an orphaned row, which is still readable, eventually get filtered out? Presumably merges of the file slice into new file slices would preserve the row.

Case 4 - OCC, non-monotonic timestamps, PutIfAbsentSupport=false (Non-conformant, unsafe)

Parameters:

  • Writers={w1, w2}

  • Keys={k1}

  • Values={A, B}

  • FileGroupCount=1

  • MonotonicTs=FALSE

  • ConcurrencyControl=1 (optimistic)

  • KeyConflictCheck=TRUE

  • PutIfAbsentSupported = FALSE

In the TLA+ specification, a non-monotonic timestamp is issued non-deterministically with any value between 1 and a monotonic value (which includes duplicate timestamps that would collide). When doing brute force checking, the model checker actually explores all timestamp values between 1 and the lowest monotonic value for each operation.

Fig 4. Both writers chose timestamp ts=1. While the OCC check prevented the second operation from completing, it did not prevent the file slice of the first op being overwritten with the file slice of the second op (as the file names were exactly the same).

Case 5 - OCC, non-monotonic timestamps, PutIfAbsentSupport=true (Non-conformant, safe)

Parameters:

  • Writers={w1, w2}

  • Keys={k1}

  • Values={A, B}

  • FileGroupCount=1

  • MonotonicTs=FALSE

  • ConcurrencyControl=1 (optimistic)

  • KeyConflictCheck=TRUE

  • PutIfAbsentSupported = TRUE

The second write to 1.commit.requested failed as it already existed, and w2 aborted early.

Fig 5. Writer w2 aborts on its put-if-absent of the instant 1.commit.requested.

On timeline order not matching insertion order

Going back to the beginning of this analysis in part 1, I said I wasn’t sure if the v5 Hudi spec talking about monotonic timestamps meant insertion time or issuance time. Having gone through the process of modeling Hudi in TLA+, from a correct perspective the most important thing is that timestamps shouldn’t collide, at least when using a storage service that does not support PutIfAbsent. 

However, what happens if two writers obtain timestamps that are monotonic at the time of issuance but the operations are executed out of order? 

The answer is that everything is ok, as long as you have chosen one of the compliant, safe configurations. 

Example: Out of order, same primary key

The following operations are carried out, note the insertion order does not match the ts order:

  • Op 1, k1=A, ts=1, insertion order=1

  • Op 2, k1=B, ts=2, insertion order=3

  • Op 3, k1=C, ts=3, insertion order=2

Behavior:

  1. Op 1 is carried out to completion, writing file slice [file_id=1, ts=1] with k1=A.

  2. Op 2 obtains ts=2.

  3. Op 3 obtains ts=3.

  4. Op 2 pauses for a while.

  5. Op 3 goes ahead and the operation succeeds, writing file slice [file_id=1, ts=3] containing k1=C. 

  6. Op 2 resumes. It scans the timeline and determines that the merge commit timestamp is 3 which is higher than its own timestamp, so it aborts early. If the operations had interleaved in a different way, with Op 3 still completing first, then the OCC check of Op 2 would have detected the conflict, and aborted.

Where two overlapping operations are executed out of timestamp order, only one succeeds. File slices can only be committed in timestamp order when OCC is used. From a performance perspective, this means that operations that are executed in a monotonic timestamp order will have better performance due to less conflicts.

Example: Out of order, different primary keys mapping to different file groups

  • Op 1, k1=A, ts=1, fg=1, insertion order=1

  • Op 2, k2=X, ts=2, fg=2, insertion order=2

  • Op 3, k1=B, ts=3, fg=1, insertion order=4

  • Op 4, k2=Y, ts=4, fg=2, insertion order=3

First, op 1 and 2 perform upserts:

  • k1=A at ts=1 to file slice [file_id=1, ts=1]

  • k2=X at ts=2 to file slice [file_id=2, ts=2]

Then op3 and op 4 execute.

  1. Op 3 obtains ts=3.

  2. Op 4 obtains ts=4.

  3. Op 3 pauses for a while.

  4. Op 4 goes ahead and succeeds, writing file slice [file_id=2,ts=4] containing k2=Y. 

  5. Op 1 resumes. It scans the timeline and determines that the merge commit timestamp is 2. It looks for a file slice merge target of file_id=1 with a timestamp <= 2. It finds [file_id=1,ts=1] with k1=A. It writes a new file slice, [file_id=1, ts=3] with k1=B. Its concurrency control check passes as there is no completed instant in the timeline that touches the same file group with a ts > 1. Op 1 succeeds.

Where two disjoint operations are executed out of order, both succeed. 

But what about consistency across keys? If a client had been repeatedly retrieving all keys the whole time at ts=3 or at ts=4 would the results have been consistent?

At ts=3, a reader would have seen the following results as it repeated its query over and over:

  1. k1=null, k2=null

  2. k1=A, k2=null

  3. k1=A, k2=X

  4. k1=B, k2=X

At ts=4, a reader would have seen the following results as it repeated its query over and over:

  1. k1=null, k2=null

  2. k1=A, k2=null

  3. k1=A, k2=X

  4. k1=A, k2=Y <- sees k2=Y of ts=4 before k1=B of ts=3.

  5. k1=B, k2=Y

In the ts=4 case, a reader saw k2=Y before k1=B. This is ok because the two operations overlapped and therefore any chosen total order of materializing these operations is valid (which is what we are seeing here). Multiple clients reading at the same timestamp would have seen the same total order.

Conclusions

This analysis has been of limited scope, but so far the results of model checking the TLA+ specification correspond to the guarantees discussed in the multi-writer section of the Apache Hudi documentation concurrency control. ACID is supported with Hudi spec compliant configurations, and when primary key conflict detection is used in multi-writer scenarios.

I’ve focused a lot on multi-writer scenarios in this analysis because I’m a distributed systems engineer and I wanted to understand how Hudi handles this. Yet the single writer setup is the more common case.

Regarding multi-writer scenarios, the Apache Hudi v5 spec clearly states that timestamps should be monotonic. From my analysis, the most important thing is that timestamps should not collide and there are multiple options for doing this. If you use a storage service that supports PutIfAbsent, then it's a solved problem. Otherwise, if you’re on S3, then you’ll need a source of monotonic timestamps. Given that distributed locking is definitely needed for correctness in a multi-writer set up anyway, something like DynamoDB or ZooKeeper can do locks and monotonic counters. The performance impact of using such a system for timestamps and locking should be minimal, given that the number of ops per second should be much lower than say a Kafka topic or an OLTP database table. The cost of loading the timeline, reading and writing Parquet files should massively outweigh the costs of obtaining timestamps and acquiring/releasing locks.

Delta Lake and Apache Hudi are very similar on this point, with both taking the write-ahead-log (WAL) approach, and both requiring monotonic identifiers for WAL entries. Databricks uses a lightweight coordination (presumably locking) service when using S3 to ensure that id conflicts do not occur (due to its lack of PutIfAbsent support). Databricks notes that load on this coordination service is low due to the relatively low write rate on lakehouse tables.

The next steps, if I dedicate more time to this, would be to model Merge-On-Read (MOR) tables and also table services (compaction, clustering, cleaning etc). There are definitely more data consistency questions I have about how these additional features work safely. So far, my results correlate with the guarantees in the Hudi documentation, so I have no reason to expect I would find issues. Even so, it would be a useful exercise to understand Hudi internals further.

I’ll finish by just saying that I find TLA+ an excellent tool for learning existing systems. Going through the modeling process really makes things concrete for me, and I end up with a far deeper understanding of the target systems afterwards. TLA+ isn’t just for design time.

Series links:

Share