Verifying Kafka transactions - Diary entry 1 - A first step — Jack Vanlightly

Verifying Kafka transactions - Diary entry 1 - A first step

A few days ago I started my work of formally verifying Apache Kafka transactions in both TLA+ and Fizzbee. I’ve decided to write a kind of diary of my progress so that others who are interested in formal verification of complex systems can read along and see how I go about it.

Rather than specifying a proposed system, I am reverse engineering the Apache Kafka transactions implementation into a TLA+ specification and a Fizzbee specification. This will likely result in a messier spec than if I were specifying something that was just a proposal, as I may wish to tie the spec to some aspects of the implementation.

The goals are the following:

  1. By going through the process, I will learn how Kafka transactions work at a deep level. That will be useful if/when I get involved in transactions from an engineering point of view, but also when speaking to customers and the broader market.

  2. The specs themselves will be useful for the future for people learning how they work, and for when future KIPs propose changes. We can verify the correctness of proposals before implementation.

  3. The specs may find a previously unknown protocol bug. This doesn’t always happen, but it happens quite often.

  4. Compare the experience of writing a non-trivial TLA+ specification and with that of writing a Fizzbee specification, as well as any differences in capabilities between the two.

I spent the first day reading this design doc to understand the various actors involved. The document describes in a good amount of detail:

  • The external semantics.

  • Who the actors are.

  • The message exchanges that occur.

  • The process from beginning to end of starting and completing/aborting a transaction.

  • Various mechanisms that protect consistency under various adverse conditions.

If you’re interested in that then I recommend reading the Motivation, Summary of Guarantees and Key Concepts sections of the design doc linked above.

Fig 1. A diagram from the design doc that describes principal actors with numbered references to the message exchanges.

To get started, I decided to model only the first step of a transaction: a producer obtaining a ProducerId from the Transaction Coordinator (TC) via an exhange of InitPidRequest and InitPidResponse. This first step only requires a subset of the actors:

  • Producers.

  • Transaction coordinators (one per broker).

  • Transaction log (a Kafka topic with multiple partitions).

I found some of the wording in the design doc ambiguous, so before embarking on starting the specification writing. I read the code of the transaction coordinator that pertains to handling InitPidRequests.

A simple first model

In this simple first iteration of the spec, I model the following exchanges:

Fig 2. The basic happy path and unhappy path of obtaining a PID.

The process:

  1. The producer sends an InitPidRequest with its Transaction ID (TID) to a broker. In reality, the first step is for the producer to find out which TC owns the partition that its TID maps to. But this spec omits that, allowing any producer to send an InitPidRequest to any broker. This is useful as it reduces the complexity of the spec, while also ensuring it covers cases where the InitPidRequest is sent to the wrong broker (which can happen in certain scenarios).

  2. The TC receives an InitPidRequest.

    1. Determine which txn log partition the TID maps to.

    2. If the TC is not responsible for that txn log partition, then it returns a NotCoordinator error.

    3. If the request TID already has a pending txn state transition in progress (a transition has been written to the txn log, but not committed yet), then the TC returns a ConcurrentTransactions error.

    4. Else if this TID has no current metadata, then create new metadata, with a new unique ProducerID (PID) and a producer epoch of -1, and cache it. Else it just uses the current metadata (and existing PID) that is cached.

  3. The TC creates a new metadata transition, with an incremented producer epoch. It writes this to the txn log and records that there is a transition in progress.

  4. Once the txn log acks the transition record, the TC sets the txn metadata to match the committed transition and replies to the producer with OK, the PID and the producer epoch.

Each client is modeled as a finite state machine:

Fig 3. A client modeled as a finite state machine. The circles represent the states and the arrows are the transitions.

This small part of the protocol doesn’t include producers starting a transaction, adding partitions to transactions, producers writing data records to partitions, consumers, the consumer group coordinator and so on. This small model, also does not include the following that could impact this small piece of the protocol:

  • Txn log partition leadership moving around. Leadership will be determined in the initial state and remain unchanged.

  • Txn log writes failing (i.e. writes to the txn log always eventually get committed).

  • Producer epochs do not get exhausted. A producer epoch is a short, and can wrap around. The implementation handles this by issuing a new PID when that happens. This initial model omits that logic.

We want to start simple and gradually add more of the protocol iteratively. In the next diary entry, I’ll go over the TLA+ spec of this first small model.

WARNING! I AM VERY EARLY ON IN THE PROCESS AND MAY HAVE A NUMBER OF MISCONCEPTIONS ABOUT THE PROTOCOL THAT WILL BECOME EVIDENT AS I PROGRESS. DO NOT TREAT THIS DIARY AS AN OFFICIAL GUIDE OF HOW THE PROTOCOL WORKS.

Share