Kafka KIP-966 - Fixing the Last Replica Standing issue

The Kafka replication protocol just got a new KIP that improves its durability when running without fsync. As I previously blogged, Why Kafka Doesn’t Need Fsync to be Safe, there are distributed system designs that exist which allow for asynchronous storage engines. Being asynchronous means that the system can reap performance benefits which are not available to a synchronous storage engine.

Note:  I’m not referring to the file system API being synchronous or asynchronous but whether records are flushed to disk before acknowledgement or not. An asynchronous storage engine may keep records in memory at the time of acknowledgement whereas a synchronous storage engine will have already flushed records to disk.

Recovery as a strategy

Replicated storage systems which do not fsync before acknowledgement can hold committed data across multiple nodes but on any given node some small portion may only exist in memory. Some of these systems hold this data in the memory of the process itself while others like Apache Kafka keep it in the OS page cache. The page cache is a safer place to store unflushed data as the process, such as a Kafka broker, can perish but the page cache data is not lost and will eventually be flushed by the OS. Systems that keep unflushed data in process buffers on the other hand can lose data when the process itself crashes.

When a Kafka broker shuts down, as part of the controlled shutdown sequence it flushes all unflushed data to disk. This is known as a clean shutdown. When a broker is not able to flush its data to disk on shutdown, such as a crash, a SIGKILL or a power outage then this is known as an unclean shutdown. Not all unclean shutdowns will result in the loss of unflushed data, that requires an OS error that kills both the broker and the page cache.

The basic idea of recovery rather than fsync is to ensure that when a broker experiences an unclean shutdown the broker is able to recover any lost data from its peers. The critical parts of the replication protocol algorithm are that:

  1. A broker can detect it experienced an unclean shutdown.

  2. A broker which restarts after an unclean shutdown is not able to attain leadership of any given partition until it is guaranteed that it has recovered any/all lost data of that partition.

While brokers experience unclean shutdowns, we’re really talking about partitions here as they are the unit of replication. The instance of a partition hosted on a broker is known as a replica. A partition with a replication factor of 3 will have three replicas hosted on three different brokers.

The following property describes the need for a replica without the complete committed log to be unable to attain leadership by way of a clean election:

ISR Completeness property: Every member of the ISR must host the complete committed log.

The ISR acts as the quorum for replication (a record must be replicated to all members before acknowledgement) and as a leader candidate pool (also known as an election quorum) for the controller to elect a new leader from. If the above invariant were ever violated, it would mean that the leader candidate pool would contain a replica which does not contain the complete committed log. When a replica is elected as leader but the replica does not contain the complete committed log, then it forces its followers to truncate their logs to match its own - this turns a local data loss issue into a global one. The ISR Completeness invariant is critical.

The idea of the recovery strategy is that after an unclean shutdown, all the replicas hosted on that broker lose any leadership, are removed from their ISRs and become followers which must catch-up to their respective partition leader before being re-added to the ISR and becoming candidates for leadership. Kafka is not a majority quorum system, and so as long as there is a functioning leader, follower replicas can heal themselves.

However, it turns out that Kafka has an edge case where this invariant does indeed get violated and it is an edge case which is closed by KIP-966. The problem is the Last Replica Standing issue.

The Last Replica Standing issue

Rule number one of leader elections in Kafka is that the controller can only select a replica in the ISR as leader. When brokers become unavailable (and get fenced by the controller) their replicas get removed from their ISRs and if followers fall really behind they also get removed.

But what happens if the only replica left standing is the leader? At this point the ISR contains only one replica - the leader. What happens if the leader fails? The ISR does not shrink to empty as then the controller would have no leader candidates to choose from. The last replica standing remains in the ISR even if it has become unreachable (due to being offline or a network issue). The controller must wait for that replica to come back online so it can elect it leader again and from there the leader can expand the ISR as followers catch-up to it.

The problem occurs when the last replica standing experiences an OS error that causes the broker and the page cache to go kaput. If some of the unflushed data included committed data (records below the high watermark) then we have a problem. When the replica restarts, it gets re-elected as leader and the followers truncate their logs to match the leader - wiping out the remaining copies of the affected committed records.

This issue affects already established data. As soon as the ISR shrinks below Min ISR, any new acks=all requests are immediately negatively acknowledged and any pending requests on disk are also negatively acknowledged. The at-risk data is the data that was recently acknowledged but has not been flushed to disk on the leader (which is the last replica standing and which experiences the lossy unclean shutdown).

This is where KIP-966 comes into play.

KIP-966 - Eligible Leader Replicas + Unclean Recovery

KIP-966 has two main components:

  1. Eligible Leader Replicas - enables clean elections despite unclean broker shutdowns.

  2. Unclean Recovery - can provide a best-case recovery fallback.

Eligible Leader Replicas (ELR)

This proposal separates the replication and election quorums. In this proposal:

  • the replication quorum is the ISR known to the leader replica.

  • the election quorum is the ISR known to the controller plus the ELR (only known to the controller).

Because the election quorum in Kafka is a curated pool of replicas, I will use the term leader candidate pool to refer to the ISR + ELR.

The ELR part of the proposal can be summarized as follows, each rule below acts as foundation for the next:

  1. Ensure that the high watermark can only advance when: ISR >= min.insync.replicas (Min ISR).

  2. Ensure the leader candidate pool does not shrink to below the Min ISR under normal conditions.

  3. Allow the leader candidate pool to eject a member that has experienced an unclean shutdown and is suspect until proven safe.

We modify the prior property to be:

Leader Candidate Completeness property: Every leader candidate must host the complete committed log.

The basic idea is that when the high watermark is blocked from progressing because the ISR is smaller than Min ISR, we can freeze the leader candidate pool at that point. If the ISR is of size Min ISR and replica R of the ISR goes offline, the ISR shrinks below Min ISR. At this point we know that the leader replica is unable to advance the high watermark because of the condition: ISR >= Min ISR. This means that replica R remains complete (hosts all committed records) despite not being a member of the ISR anymore. When the broker that hosts replica R goes through the start-up sequence and becomes unfenced by the controller, replica R will become a leader candidate (obeying the Leader Candidate Completeness property). 

This proposal basically maintains a leader candidate pool such that we know all its members are complete (host all committed records) even if some may be fenced. When a replica gets fenced, it is removed from the ISR. If this removal causes the ISR to drop smaller than the Min ISR, then that replica is added to the ELR. As soon as the ISR reaches Min ISR again, the ELR is emptied.

The concrete rules that classify a member as a leader candidate are now:

  • Is a member of the ISR

  • Or is an unfenced member of the ELR.

Note that with this proposal the ISR and ELR can both shrink to empty.

How it works

We have a partition with a replication factor of 3 and a Min ISR of 2.

  1. Leader = R1, ISR = [R1, R2, R3], ELR=[].

  2. R3 becomes unreachable, is fenced and is removed from the ISR. The ISR < Min ISR condition is not met, so R3 is not added to the ELR.

    • Leader = R1, ISR = [R1, R2], ELR=[]

  3. R1 receives a produce request and replicates it to R2. R1 advances the high watermark to cover the records of the request and sends a positive acknowledgement to the producer.

  4. R2 becomes unreachable and is removed from the ISR. Given that the ISR is too small for the high watermark to advance, it is safe to add R2 to the ELR.

    • Leader = R1, ISR = [R1], ELR=[R2].

  5. R1 is the sole member of the ISR then an OS error kills R1. R1 is removed from the ISR and because there is no leader now, the high watermark cannot advance and so it is safe to add R1 to the ELR.

    • Leader = NoLeader, ISR = [], ELR=[R1, R2].

  6. R1 did in fact lose the committed records of req1 from step 3. It restarts but is able to signal to the controller that it is suspect due to an unclean shutdown. The controller removes R1 from the ELR (and so R1 is not in the candidate pool).

    • Leader = NoLeader, ISR = [], ELR=[R2].

  7. R2 becomes reachable again, gets unfenced and the controller elects it as leader because it is a member of the ELR. It moves the replica from the ELR to the ISR.

    • Leader = R2, ISR = [R2], ELR=[].

  8. R1 becomes a follower, catches up to the new leader (obtaining the records it lost) and is re-added to the ISR.

    • Leader = R2, ISR = [R1, R2], ELR=[].

Replica completeness guarantee

A replica either has a completeness guarantee or it does not. When a replica is guaranteed to be complete, it is a member of either the ISR or ELR. When a replica has no guarantees of completeness it is a member of neither.

Fig 1. The view of a replica from the perspective of the controller.

Let’s look at how a replica can transition between leadership and followership,  and completeness and not:

  • Case A. An in-sync follower (member of the ISR) is fenced and removed from the ISR. However, the ISR is now < Min ISR and the combined size of ISR + ELR is also < Min ISR. Therefore the replica is added to the ELR because we know the leader cannot advance the high watermark and this replica will remain complete.

  • Case B. An unfenced follower is a member of the ISR or ELR and is chosen by the controller to be the leader. If the follower was in the ELR, it is transferred to the ISR.

  • Case C. The leader is fenced, removed as leader and removed from the ISR. The ISR is now < Min ISR and a new leader cannot advance the high watermark. Also the combined size of the ISR + ELR is < Min ISR therefore the former leader is added to the ELR.

  • Case D. The leader is fenced, removed as leader and removed from the ISR. Another leader candidate is elected and the new ISR >= Min ISR. Because the new leader can advance the high watermark without this replica, this replica has no completeness guarantee and is not added to the ELR.

  • Case E. A follower which is not a member of either the ISR or ELR has caught up to the leader and the leader adds the out-of-sync follower to the ISR.

  • Case F. The replica is removed from the ISR but the size of the ISR remains equal to or larger than the Min ISR and therefore it is not added to the ELR and has no completeness guarantee.

  • Case G. The replica is a member of the ELR. The ISR expands to reach Min ISR so all ELR members lose their completeness guarantee and the ELR is emptied.

Clean Election Guarantees

A clean election is one where we are guaranteed no data loss as a result. With the ELR, the controller can perform clean elections for a partition in the face of unclean shutdowns  - specifically tolerating up to Min ISR - 1 unclean shutdowns.

The following table illustrates this.

| RF | Min ISR | Tolerates unclean shutdowns |
|----|---------|-----------------------------|
| 3  | 2       | 1                           |
| 5  | 3       | 2                           |
| 6  | 4       | 3                           |

Once Min ISR concurrent unclean shutdowns have occurred the ISR and ELR will both be empty and the partition must fallback to Unclean Recovery.

Code changes

The great thing about this proposal from an engineering perspective is that it is trivial to implement. The KRaft controller is a single-threaded piece of code and managing one more set is simple and easy to test. Additionally, the ELR is usually going to be empty. The partition replicas know nothing about the ELR so the only code change required there is to add the high watermark advancement condition that the ISR must be equal to or larger than the Min ISR.

Unclean Recovery

The ELR enables clean elections under scenarios where replicas can be suspect. But when the ISR and the ELR are empty, we have no leader candidates for a clean election and we must fallback to an unclean recovery mechanism.

Unclean recovery comes in three levels of durability:

  • Proactive (optimized for availability)

  • Balanced (optimized for safety first and availability second)

  • Manual (defer to a human, who has some special recourse)

I’ll cover Balanced first as I expect it is the option that best aligns with most people’s needs.

Balanced mode

Balanced mode triggers unclean recovery when the ISR + ELR is empty and a clean election is not possible.

This mechanism has the controller inspect the logs of the all replicas that it knows could be complete but are marked as suspect by unclean shutdowns - this group is called the LastKnownELR. It is another set stored in the partition metadata. It contains the replicas that were once ELR members and could be complete but unclean shutdowns removed them.

Membership is managed as follows:

  • When an ELR member has an unclean shutdown, it is removed from ELR and added to the LastKnownELR.

  • The LastKnownELR is cleaned when ISR reaches the Min ISR as all completeness guarantees are lost.

If any of the unclean shutdowns of this group were data loss free, we are guaranteed to select a replica without losing committed records. For balanced mode to lose committed records, all members of the LastKnownELR must have lost some log suffix that included committed records. Unclean recovery in this case would select the least-bad option.

The partition will remain in recovery until these members can be contacted. So we pay some price in liveness to obtain more safety - hence this mode being named Balanced. This is not a large deviation from today where the partition also remains unavailable until the former leader comes back up.

Proactive mode

Proactive mode triggers unclean recovery when the ELR is not empty but its members are all fenced. Basically while we have replicas with a completeness guarantee they aren’t available right now.

This mechanism has the controller inspect the logs of any replicas it can and after a short time period, choose the replica with the highest epoch/offset.

This is for users who want availability above all else.

Manual mode

This mode simply has the controller not attempt any unclean recovery operation when the ISR and ELR are both empty. At this point a human operator would need to intervene.

KIP-966 conclusions

KIP-966 brings a definite durability improvement to Kafka - bringing behavior more inline with expectations by closing the Last Replica Standing issue. With the ELR and balanced recovery combined, a partition would have to experience Min ISR unclean shutdowns such that all lose local data and some portion of that data is committed. 

We have verified the design in TLA+ which you can find in my kafka-tlaplus GitHub repository though eventually it should get incorporated into the Kafka repo itself.

Once agreed on by the community this KIP will be implemented in both Apache Kafka and the Kora engine (Confluent’s cloud native Kafka engine). Note that it will only be applied to KRaft mode as we are in the process of phasing out Apache ZooKeeper. For those that require very high durability it is important to deploy Kafka clusters across failure domains such as Availability Zones to prevent correlated failures causing all broker VMs to simultaneously error/lose power. This is a best practice for highly available and durable distributed systems in any case and is how Confluent deploys the Kora engine to meet 99.99% Confluent Cloud uptime SLA.

Being a distributed system protocol, there are a few extra nuances and I recommend you read the KIP itself (which is more detailed than this post) and get involved in the discussion on the mailing list. This proposal is still a proposal and subject to change. I will try to keep this post up to date with any relevant changes.

Finally, I am working on a complete Kafka replication protocol description, based on KIP-966. This will be coming very soon.