Verifying Kafka transactions - Diary entry 5 - AddPartitionsToTxn in TLA+ — Jack Vanlightly

Verifying Kafka transactions - Diary entry 5 - AddPartitionsToTxn in TLA+

Note, that I am reverse engineering a TLA+ specification from the Kafka codebase, and during this process I may make mistakes or misunderstand some aspects of the protocol, which will be corrected later. I am also mixing up KIPs, which I will tease apart later (when I have a deeper understanding of everything).

I have written the second iteration of the TLA+ specification which now includes:

  • The AddPartitionsToTxn request/response.

  • Transaction coordinator (TC) elections so that leadership of a transaction log partition can now move between brokers.

  • Producer fencing and some of the abort transaction logic.

Note: Messages, entries and records all basically are the same thing. I will tend to use the term record.

The AddPartitionsToTxn request tells the TC that one or more regular topic partitions (not txn log partitions) have been enlisted in a transaction and upon transaction commit or abort, a transaction marker record should be written to each of those partitions. The marker records form part of the record visibility logic. One thing to remember about Kafka is that it is optimized for high throughput, so batches of records get written to the log whether the transaction commits or aborts. The transaction controls the visibility of these records , rather than controlling whether they get written to the log or not.

For any given client starting a transaction, that gives us the current happy path:

Fig 1. The happy path for a producer obtaining a PID and adding a first set of partitions to a transaction.

Once a client has added all topic partitions, it stops doing anything further (as no other logic is modeled yet).

Of course, things can go wrong, after this happy path, a different producer could send an InitPidRequest for the same Transaction Id (TID), and cause the original producer to get fenced and its open transaction aborted.

Fig 2. The path followed when another producer obtains a PID using the same Transaction Id as the first producer (from the happy path).

There are other unhappy paths such as a txn log partition election causing NotCoordinator errors. This only causes a hiccup as a client with an open transaction only needs to find the new coordinator and resume its transactional activities using that new TC.

The specification has already grown a lot bigger, with the next state formula as follows:

Next ==
\/ \E c \in Clients :
\* InitPidRequest -----
\/ SendInitPidRequest(c)
\/ \E b \in Brokers :
\/ ReceiveInitPidRequest(b, c)
\/ ReceiveInitPidResponse(c, b)
\* AddPartitionsToTxnRequest -----
\/ SendAddPartitionsToTxnRequest(c)
\/ \E b \in Brokers :
\/ ReceiveAddPartitionsToTxnRequest(b, c)
\/ ReceiveAddPartitionsToTxnResponse(c, b)
\/ \E b \in Brokers, p \in TxnLogPartitions:
\* Txn log entries get committed, and callbacks executed -----
\/ CommitTxnLogAppend(b, p)
\* TC elections -----
\/ ElectLeader(b, p)
\/ BecomeFollower(b, p)
\/ \E b \in Brokers, tid \in TransactionIds :
\* Newly elected leaders (TCs) complete partially executed transactions -----
CompletePartialTxn(b, tid)
view raw entry2_next hosted with ❤ by GitHub

We now have:

  • InitPidRequest/response

    • Includes producer fencing via producer epoch incrementing.

    • Aborting the transaction of a fenced producer.

  • AddPartitionsToTxnRequest/response

    • Includes rejected fenced producers.

  • Txn log partition elections

    • Elects a TC as the new leader of a txn log partition.

    • Make the current leader a follower, causing error responses to be sent to clients who had uncommitted entries in-progress with that TC.

The ReceiveInitPidRequest action is starting to get large, and will get larger, as it can kick-off a number of actions related to producer fencing and aborting transactions in progress.

Decisions made in this iteration

  1. How do I model callbacks?

  2. How to model complex subsystems? For example, each txn log partition exercises the Kafka replication protocol. I don’t want to include the replication protocol in this spec. TC elections are controlled by the KRaft controller, I don’t want to model Raft is this spec.

  3. How do I design the spec to support effective liveness properties while also limiting the state space?

  4. What safety properties exist at this early stage (given it still doesn’t include producer writing records yet)?

How do I model callbacks/futures?

In the Scala code, when a TC writes a log entry, it defines the code that should be executed once the log entry has been committed (such as sending a reply, updating the transaction metadata of that TID). This code is executed only after the entry gets committed or fails, and likely by a different thread. There are a number of things that could have changed by the time that the log entry has been committed or failed to commit. It could have succeeded, or the TC may have lost leadership of that txn log partition, or the producer might have been fenced, and so on. There are also a number of different actions that might need to be carried out: send a response to the client, update txn metadata, write a txn marker, append another entry to the txn log partition, etc.

In programming, we can write code that will execute later, based on the results of some asynchronous work via callbacks, promises/futures, completable futures, coroutines, and so on. TLA+ has none of that. So I decided to write enough data into each txn log entry, such that the callback logic defined in a separate action, could use it to carry out the necessary work, such as sending replies to clients or mutating state. Some of this data is the success or fail response to be sent to a producer by the callback.

With that in mind this spec has the action CommitTxnLogAppend, which replicates a log entry to follower replicas, advances the HWM and executes the associated callback logic of the committed entry. The CommitTxnLogAppend has sub-formulas for:

  • HandleInitPid

  • HandleAddPartitions

  • HandlePrepareAbortOrCommit

  • HandleCompleteAbortOrCommit

Therefore we can expect CommitTxnLogAppend to grow quite large due to the number of different callbacks to be included in it. Also the ReceiveInitPidRequest action will be a large one as it can be quite a large perturbation (aborting transactions and such).

How to model complex subsystems?

One thing that is nice about TLA+ is that I can choose my level of abstraction. The abstraction level I have chosen for the txn log partitions is one of a single array (known as a sequence in TLA+) per TC. I know from my Kafka replication protocol specification that a partition has the following key properties:

  • Temporal ordering.

  • Supports MinISR-1 failures without data loss of committed entries.

  • The (effective) HWM is monotonic.

I can simply make this replicated array behave in a simple way in accordance with those properties. With that in mind, the txn log partitions are modeled as follows:

  • When the TC needs to write a log entry, it writes the entry to its local partition array.

  • The CommitTxnLogAppend action atomically replicates one entry of a local txn log partition to all follower replicas, and advances the HWM.

  • Leader elections just happen. Leaders who become followers truncate their local log (a txn log partition) to the HWM. This is safe in this super simple abstraction as the HWM is monotonic in this spec.

It is extremely simplified. I may make it more sophisticated later, if I see a reason to do so. But for now, keeping it as simple as possible.

Likewise, the KRaft controller is massively abstracted. It is not a replicated component in the spec and simply behaves in a way such that any state mutations it is responsible for are consistent. In this case, it can elect any TC that acts as a txn log partition follower, as the new leader. It can do this because replication and HWM advancement are atomic, so it can never choose the wrong TC as a partition leader.

How do I design the spec to support effective liveness properties?

I wrote about the nuances of liveness in my two part blog post: 

In part 2, I wrote about how to constrain an infinite state space without breaking liveness properties. My general approach is “Control the stones (perturbations), not the ripples (recovery)”. The way I generally limit a state space is by limiting the number of times certain actions can occur. For example, in a Raft spec, how many elections can be triggered, and how many entries can be written. However, in the case of the former, the number of elections, this can actually cause liveness properties to break. For example, if a Raft cluster is functional but a number of nodes trigger elections at the same time which ends in a draw, then another election is *required* in order for the cluster to make progress. If you limit the number of election triggerings, then liveness properties can get violated.

My approach to this problem is to only limit actions that cause perturbations, and never actions that cause recovery. In the case of Raft, an election is a form of recovery. Therefore the way I modeled Raft was to limit the number of times a node could crash, or visibility of a node to be lost, then let the resulting elections run until a new leader was elected.

Coming back to the Kafka txns spec. Without controls, it has an infinite state space for two reasons:

  1. Each time a txn log partition election occurs, the leader epoch (aka the coordinator epoch) of the partition gets incremented. It can be incremented infinitely via infinite elections.

  2. When more than one client is modeled, the two clients can end up in a battle where they constantly cause each other to get fenced, causing the producer epoch to grow infinitely.

I avoid this with the following controls:

  • I limit the number of txn log partition elections. I don’t yet model a TC crashing or dying, so elections are not recovery, but perturbations.

  • When a client receives an error response with the ProducerFenced code, it stops forever. This prevents an infinite client battle. Clients can still retry after other types of error, such as ConcurrentTransactions or NotCoordinator, as these can be classified as recovery. This still allows for testing multiple clients attempting to execute transactions concurrently, just include enough clients in the Clients constant for the number of txn attempts that you want.

With these controls, I have the following liveness properties:

  1. Every client eventually gets a PID at some point.

  2. At least one client eventually adds all topic partitions to a transaction. Some clients may eventually terminate early due to fencing, but at least one client should manage to successfully add all the partitions to the transaction.

What safety properties can be modeled at this early stage?

Producers do not yet write any records to topic partitions. But we can verify the following properties:

  • No two TCs can be the leader of the same txn log partition with the same coordinator epoch. Two separate TCs can both believe they are the leader of the same txn log partition, but they must have different epochs.

  • No two clients can obtain the same PID and producer epoch (only the same PID and different epochs).

What’s next?

Next I need to implement the same logic in the Fizzbee spec.

After that, I think I will add the following protocol pieces:

  • Producers (clients) will be able to write records to topic partitions.

  • The Log Stable Offset (LSO), which controls record visibility of a topic partition.

  • Transaction markers, which control record visibility.

I will leave out clients ending transactions, and only implement the abort transaction when fencing a producer.

The TLA+ specification for this diary entry: kafka_transactions.tla

Share