Verifying Kafka transactions - Diary entry 2 - Writing an initial TLA+ spec — Jack Vanlightly

Verifying Kafka transactions - Diary entry 2 - Writing an initial TLA+ spec

Strap in and fire up your nerd engines—this one's gonna be intense!

In my previous diary entry, I explained I would begin the formal modelling of Kafka transactions by implementing a tiny initial piece of the protocol - how a producer obtains a producer id. In this post I will discuss some aspects of the TLA+ specification I have written of this initial model.

Note: I will assume you know the basics of TLA+ in this post. If you don’t know much or anything about it then stop reading this and go and read the following (or this post will make no sense to you):

This is a diary of what I have had to consider, such as some decisions about how to model the actors and communication, how to specify liveness properties etc. It doesn’t explain or show a lot of the TLA+ itself, you can read the spec for that.

I won’t lie, it gets a bit involved already. But if you can follow this entry then you’re in the great position to follow things as they get more complex. This post is laying a lot of foundations for what will come later.

First key decisions

TLA+ represents the world as a set of variables, and a sequence of steps that mutate those variables.

Given the simple model I want to implement, I have some key decisions to make:

  • How to represent communication between actors? I decided on explicit message passing, where the network is represented as a set of inflight messages. Being a set it is unordered, which will place greater demands on the protocol design.

  • How do I represent the transaction log? Remember that the transaction log is a Kafka topic. I wrote the Kafka replication protocol spec, which is large and complex. Do we want to extend that spec to include transactions? No. It would be too big, complex and the state space would be impractical. Instead, I will use the powers of TLA+ to choose my abstractions. I decided to represent each transaction log partition as a simplified abstraction of an unreplicated log (a TLA+ sequence), that mimics the external visible properties of a Kafka topic (durable, ordered, high watermark (HWM) is monotonic).

  • Should the TCs communicate with the transaction log also using message passing? I decided no, as the Kafka replication protocol spec already does this. I will keep the transaction log simple. TCs can directly append to the log. In the same atomic action, the HWM advances monotonically and the associated TC takes corresponding action.

  • Should I make the entire processing of a PidInitRequest one atomic action? I decided to break it into two atomic pieces: 1) Reject the request (sending an InitPidResponse) if it doesn’t meet conditions, and write to the transaction log if it does. 2) The transaction log partition HWM advances, and the TC sends the InitPidResponse. Much of the logic of the transaction coordinator is based on this two step transaction state change commit process, so it seems like a good idea to replicate that.

What variables should I create?

The initial spec maintains the following key variables, where each is a map (known as a function in TLA+) whose keys (domain in TLA+) are the identities of the actors.

  • client: Stores per client local state.

  • tc_txn_metadata: stores (per TC) current committed transaction metadata.

  • tc_txn_transition: stores  (per TC) transaction metadata that is pending commit (i.e. it was written to the txn log but not committed yet).

  • tc_part_metadata: stores  (per TC) metadata of the txn log partitions “owned” by the TC.

  • txn_log: The data of each transaction log partition.

txn_log_hwm: The high watermark of the txn log partition. Like the commit index in a Raft log.

Fig 1. The variables client, tc_txn_metadata, tc_txn_transition, tc_part_metadata, txn_log, txn_log_hwm.

I broke down the logic into 4 actions:

  1. SendInitPidRequest(client_id, broker_id)

  2. ReceiveInitPidRequest(broker_id, client_id)

  3. CompleteInitPidRequest(broker_id, partition)

  4. ReceiveInitPidResponse(client_id)

We can map those actions onto our sequence diagram. In the diagram below I model a Send action as delivering a message to a network buffer on the destination, and a Receive action as pulling a message from that buffer to be processed.

Fig 2. The four atomic actions and how they map to the sequence diagram of the last diary entry.

I could have split ReceiveInitPidRequest and CompleteInitPidRequest into further atomic boundaries, but this specification is already going to be a beast and I think this is enough.

There are a number of model parameter constants that determine how many of each type of actor are modeled:

  • TxnLogPartitions: the set of txn log partitions. For example, {p0}

  • Brokers: the set of brokers. For example, {b1, b2}

  • Clients: the set of clients. For example, {c1 , c2}

  • TransactionIds: the set of TIDs. For example {tid1}.

The Next state formula below says that in any given increment of time, one or more actors may be involved in one of these four actions:

Next ==
    \/ \E c \in Clients :
        \/ ReceiveInitPidResponse(c)
        \/ \E b \in Brokers :
            \/ SendInitPidRequest(c, b)
            \/ ReceiveInitPidRequest(b, c)
    \/ \E b \in Brokers, p \in TxnLogPartitions:
        \/ CompleteInitPidRequest(b, p)

TLC, the model checker, can explore all aspects of this initial piece of the transactions puzzle with very small model parameters, such as Clients={c1, c2}, Brokers = {b1, b2}, TxnLogPartitions={p1}, TransactionIds={tid1}. That would be enough for:

  • Sending an InitPidRequest to the wrong broker (two brokers, one TID).

  • Sending an InitPidRequest to the right broker:

    • CASE 1 - No existing TID metadata exists. The TC creates new TID metadata ( generates a PID, increments the producer epoch from -1 to 0) and sends an OK InitPidResponse.

    • CASE 2 - Existing TID metadata exists, and no pending transition exists. It reuses the existing PID and increments the producer epoch and sends an OK InitPidResponse.

    • CASE 3 - Existing TID metadata exists and there is a pending transition in progress (because two clients and one already sent a request). Replies with ConcurrentTransactions.

You can run the spec in VS code by:

  1. Installing the VS Code TLA+ extension.

  2. Configure the model parameters in the cfg file.

  3. Comment out the liveness checking section in the cfg, and uncommenting the “without liveness” section. See the cfg file.

  4. Check model with TLC. You must use the -deadlock parameter as clients stop doing anything once they have a PID, which TLC will interpret as a deadlock.

Example arguments are: -workers 4 -deadlock -fpmem 0.75

This tells TLC to use 4 dedicated threads, 75% of available memory, and a "deadlock" will not trigger a counterexample.

Next we need to look at liveness.

Liveness

Because we are keeping things so simple, we can set a simple liveness property: “Eventually, every client will enter the HasPid state”. But liveness can be tricky, so it’s worth covering it in some more detail.

The “Eventually, every client will enter the HasPid state” property should be true because a client in the Ready state will send an InitPidRequest to a random broker. Upon a successful response, the client will transition to HasPid (and then do nothing forever). But upon an error response, the client transitions back to Ready, from there it will send an InitPidRequest to another random broker. This should repeat until a client succeeds in obtaining a PID, even if a different client already obtained a PID using the same TID, the later client will simply get an incremented epoch, which fences the earlier client. Fencing is not included in this initial model though.

Fig 3. A history where a client must retry its InitPidRequest, ulitmately resulting in both clients obtaining a PID, though with one client with a now fenced epoch.

We express this liveness property as follows:

EventuallyAllClientsGetPid ==
    <>[](\A c \in Clients : client[c].state = HasPid)

<> means eventually, [] means always, or in other words, what is in the parentheses eventually becomes true and remains true forever afterwards. In the parentheses we say that every client is in the HasPid state. 

If the model checker finds a history where a client never obtains a PID, then the liveness property gets violated. This is great because initially, specs usually have a number of bugs that prevent the model checker from reaching valid reachable states. This was the case with this spec, I had a few silly bugs that prevented clients from reaching the HasPid state. The lesson here is that liveness properties make for good tests.

So far so good. Now we get to the tricky part: fairness.

Fairness places some constraints on how a system makes progress. It is valid for a system that can make progress to do nothing. The model checker will explore the entire state space and in that state space, there is a state where a TC never received a message that a client sent it. The message wasn’t lost, only the TC never got around to processing it. In other words, just because something can happen, doesn’t mean it will happen. If we don’t specify “fairness” then the model checker will fail our liveness property.

Let’s specify no fairness, and use the smallest possible model parameters:

  • TxnLogPartitions = {p0}

  • Brokers = {b1}

  • Clients = {c1}

  • TransactionIds: {tid1}

We run the model checker, and it says our EventuallyAllClientsGetPid is violated, with the following trace:

  1. Initial state

  2. SendInitPidRequest(c1, b1)

  3. ReceiveInitPidRequest(b1, c1)

  4. CompleteInitPidRequest(b1, p1)

At this point, there is a message inflight, destined for client c1, but it never processes it.

So let’s set some fairness. There are two types:

  • Weak fairness, which means that if an action is enabled forever, it will eventually take place. This is the most common form of fairness.

  • Strong fairness, which means that if an action is enabled infinitely often, then it will eventually take place. Imagine an action gets enabled then disabled then enabled, repeatedly, then strong fairness says that it must eventually happen.

This is a good moment to point out why I give the actions some arguments specifying which clients, brokers and partitions are involved in the action.

Imagine that the non-deterministic choice of clients, brokers and partitions were placed inside the actions, rather than outside. In such a case the next state formula would look like this:

Next ==
    \/ SendInitPidRequest
    \/ ReceiveInitPidResponse
    \/ ReceiveInitPidRequest
    \/ CompleteInitPidRequest

Then inside say SendInitPidRequest, it would begin with:

SendInitPidRequest ==
    \E c \in Clients, b \in Brokers :
        <the rest of the action>

If we made each of these weakly fair:

Fairness ==
    /\ WF_vars(SendInitPidRequest)
    /\ WF_vars(ReceiveInitPidResponse)
    /\ WF_vars(ReceiveInitPidRequest)
    /\ WF_vars(CompleteInitPidRequest)

… we’d have a big problem with larger model parameters. For example, with one client and two brokers, the model checker would say our liveness property is violated. Why? Because of the following cycle:

  • Step 1: c1 sends TC (of b2) an InitPidRequest.

  • Step 2: TC of b2 receives the InitPidRequest, but it is the wrong TC. It sends an InitPidResponse with error NotCoordinator.

  • Step 3: c1 receives the error InitPidResponse and transitions back to Ready state.

  • (back to step 1)

c1 basically never chooses the right broker to send its InitPidRequest to, doggedly sticking with the wrong one. While theoretically unlikely, this does constitute a valid infinite loop which violates the liveness property.

There could be other examples in the future. Imagine that there are 2 clients each with a different TID with no possibility to conflict. Without weak fairness that includes the client ids, we could have a situation where c1 and c2 have enabled actions, but only c1 makes progress. Some action of c2 while enabled never gets executed. This would also likely violate a liveness property.

That is why we need to specify liveness in terms of the clients, brokers and partitions that are involved - to ensure all get a fair chance of progress. To make each action weakly fair with respect to the clients, brokers and partitions, we write the following.

Fairness ==
    /\ \A c \in Clients :
        /\ WF_vars(ReceiveInitPidResponse(c))
    /\ \A b \in Brokers, c \in Clients :
        /\ WF_vars(SendInitPidRequest(c, b))
        /\ WF_vars(ReceiveInitPidRequest(b, c))
    /\ \A b \in Brokers, p \in TxnLogPartitions:
        /\ WF_vars(CompleteInitPidRequest(b, p))

The goal here is to ensure that if a client can send an InitPidRequest to a given broker, then it eventually will. i.e.

  1. It won’t be allowed to always choose the same (wrong) broker, it must eventually choose from all brokers, until it reaches the HasPid state where it ceases to do anything further.

  2. Every client, broker and partition gets the chance to make progress.

But unfortunately, we still have a problem. The model checker still says our liveness property is violated, with exactly the same trace!

The problem is that weak fairness of SendInitPidRequest(c, b) is not enough. Every time a client sends an InitPidRequest, the SendInitPidRequest action gets disabled for that client. This means that ReceiveInitPidRequest(b, c) is enabled infinitely often, not continuously. This requires strong fairness.

We replace:

  • WF_vars(ReceiveInitPidRequest(b, c))

  • with: SF_vars(ReceiveInitPidRequest(b, c))

Now the model checker finds no issues.

Running the spec

You can run it in VS Code using the TLA+ plugin. Before you run it you need to make any necessary changes to the cfg file. There are two sections that are relevant:

  • The model parameters

  • The sections WITH LIVENESS and WITHOUT LIVENESS. One or the other should be uncommented. We’ll cover that soon.

With liveness checking enabled, and setting the following model parameters, the maximum length of a trace is 18 and there are 968 unique reachable states:

\* Model parameters
CONSTANTS
    p1 = p1
    TxnLogPartitions = 
    b1 = b1
    b2 = b2
    Brokers = {b1, b2}
    c1 = c1
    c2 = c2
    Clients = {c1, c2}
    tid1 = tid1
    TransactionIds = 

You can find these parameters in the cfg file and modify them as you want. The state space rapidly grows larger with only small increases in the model parameters. For example:

  • partitions=1, TIDs=1, clients=2, brokers=2 -> Max length=18, Unique states = 968, running time=1s.

  • partitions=2, TIDs=2, clients=2, brokers=2 -> Max length=18, Unique states = 23312, running time=1s.

  • partitions=2, TIDs=2, clients=2, brokers=3 -> Max length=24, Unique states = 322596, running time=8m 33s (most of which is the final liveness check).

The above was using 16 workers, and 32 GB.

Checking liveness is slow because the liveness checking part is single-threaded. TLC will go through periods of state exploration using all threads configured, but then go through periods of single-thread liveness checks and one final liveness check at the end that can take a very long time. 

When checking liveness we also don’t get to employ a couple of key state space reduction techniques:

  • No symmetry sets. We can tell TLC that the model parameters that are sets are symmetrical. Symmetry sets reduce the state space in TLA+ by collapsing states that are equivalent under permutations of indistinguishable elements, thus avoiding redundant exploration of identical configurations. For example, let’s say that one broker starts out as a leader. Without symmetry, TLC decides there is one initial state per broker, as b1, b2 or b3 could start out as leader. With symmetry, TLC sees that it get choose one broker as the leader in the initial state and only explore states from that one initial state, thus reducing the state space.

  • No views. A view is used to tell TLC what variables to include when computing the fingerprint of each state. We can omit certain variables that do not contribute to what we consider a unique state, and thus reduce the number of states to explore.

Reducing the state space without liveness checking

We can switch to only checking invariants and use symmetry sets for the clients, brokers, transaction ids and txn log partitions.

\* WITH LIVENESS START (uncomment blocks below) ---------- 
\* PROPERTY EventuallyAllClientsGetPid
\* SPECIFICATION LivenessSpec
\* WITH LIVENESS END ----------------------------------------

\* WITHOUT LIVENESS START (uncomment blocks below) ---------- 
SPECIFICATION Spec
\* VIEW View
SYMMETRY Symmetry
\* WITHOUT LIVENESS END --------------------------------------

This provides the following state spaces and running times:

  • partitions=2, TIDs=2, clients=2, brokers=3 -> Max length=24, Unique states = 7052, running time=10s.

  • partitions=2, TIDs=2, clients=3, brokers=3 -> Max length=37, Unique states =492040, running time=1m 6s

I can also use a View to tell TLC to not include some variables in the state fingerprints (which identify unique states). Just uncomment the “\* VIEW View” line in the cfg file.

My view includes all variables except the set of processed messages (which acts as a kind of history variable that play no role in current state). This further reduces the state space and max length. In the case of this spec, the reason for this reduction is that it allows the state space to compress the reachable states to ignore previous failed attempts at obtaining a PID. When a client sends an InitPidRequest to the wrong TC, it causes no mutation to that TC, likewise, the resultant state in the client is undifferentiated between errors of the same type but from different TCs. Whether the error was received from b1, b2 or b3 does not affect the state variables of a client or any TCs.

Ideally, we should see a state space (with one client and three brokers) that resembles something like:

Fig 4. A simplified view of a state space without the history of previously processed messages being used to compute fingerprints. Max length of 5 and 10 unique states.

But when TLC includes processed messages in the fingerprint, it elongates the state space to form chains of failed attempts that ultimately lead to a successful attempt. These chains form because the processed messages form part of the fingerprint, so some knowledge of what happened before creeps into the fingerprint of a state.

Fig 5. A simplified view of a state space with the history of previously processed messages included in state fingerprints. Max length of 11 and 33 unique states.

My model moves processed messages from the messages set to the messages_discard set. I do this as it makes it significantly easier to follow a trace as I can see exactly what message got processed.

Going back to our last runs using symmetry, if we also include the view that omits processed messages from fingerprints we get:

  • partitions=2, TIDs=2, clients=2, brokers=3 -> Max length=16, Unique states = 1839, running time=10s.

  • partitions=2, TIDs=2, clients=3, brokers=3 -> Max length=24, Unique states =37546, running time=20s

We can see that state space reduction via symmetry and views makes a big difference. As the model gets more complex, brute force checking will likely only be able to check for safety. Liveness checking is not compatible with symmetry, and can be problematic with views sometimes (as it can detect spurious cycles). Liveness will only be possible later on, using simulation mode. 

Diary entry over

There are a few things I’ve had to make decisions on for this initial spec:

  • How the actors should communicate.

  • How to represent the actor state in variables.

  • How to split up the behavior into discrete atomic actions.

  • How to set up fairness to support the desired liveness checks.

  • Use of symmetry and views for state space reduction.

In the next diary entry I will explain the first steps of my Fizzbee specification, and also make some initial comparisons to TLA+.

Share