How to Lose Messages on a Kafka Cluster - Part 2

More failure scenarios! See part 1 for the first seven scenarios.

Scenario 8 - Unclean Partition Fail-Over with Acks=1

In this scenario we’ll simulate one and both followers of a 3 replica partition getting out-of-sync followed by a fail-over.

We create a topic with unclean.leader.election.enable=true. Then we’ll use Blockade to slow the network down for one or both followers causing them to lag behind so far that they get removed from the ISR. We’ll then kill the leader and see a fail-over. When only one follower is out-of-sync we’ll see a fail-over to the in-sync follower, when both are out-of-sync we see a an unclean fail-over. In the last part we saw that fail-over to in-sync followers for acks=1 messages incurred little or no message loss. When all the followers get out-of-sync we should see much greater message loss.

I have added some extra properties to the network element of the blockage yml:

network:
  slow: 250ms 50ms 25% distribution normal

This will add 250ms of additional delay to all outgoing packets, +- 50 ms with +-25% variance on the delay on the packet before. These delays will get activated on my containers of choice using the Blockade command slow.

I create the cluster and the topic.

(terminal 1)
$ blockade up
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION
kafka1          c00633b3c1eb    UP      172.17.0.3      NORMAL
kafka2          525dc216f128    UP      172.17.0.4      NORMAL
kafka3          ccd622ea2142    UP      172.17.0.5      NORMAL
zk1             b7abcafb4f24    UP      172.17.0.2      NORMAL
$ bash update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
$ bash create-topic-unclean-failover.sh kafka1 test1
Created topic "test1".
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 2    Replicas: 3,3,1    Isr: 2,3,1

Clean Fail-Over

In our first test we’ll just slow the network for kafka1 which hosts one of the followers. We’ll demonstrate how if there is still a follower in the ISR, the cluster will preferentially fail-over to that one.

(terminal 1)
$ blockade slow kafka1
$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION
kafka1          c00633b3c1eb    UP      172.17.0.3      SLOW
kafka2          525dc216f128    UP      172.17.0.4      NORMAL
kafka3          ccd622ea2142    UP      172.17.0.5      NORMAL
zk1             b7abcafb4f24    UP      172.17.0.2      NORMAL

In another shell I start a bash script that prints out the ISR of the topic every second.

(terminal 3)
$ bash monitor-topic.sh kafka3 19094 test1
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2,3,1    Isr: 2,3,1
...

Now we start our producer-silent.py and send 500000 messages over a short period. The producer-silent.py only prints out the final result (part 1 showed in great detail the output during a fail-over so we’ll keep the output of part 2 a little more concise). Having a higher number messages than in part 1 gives the slow node time to get out-of-sync.

(terminal 2)
$ python producer-silent.py 500000 0.0001 1 test1

The ISR monitor script will tell me when kafka1 has been removed from the ISR. At that point I kill kafka2 (the leader).

(terminal 3)
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
        Topic: test1    Partition: 0    Leader: 2    Replicas: 2,3,1    Isr: 2,3

Now we kill the leader.

$ blockade kill kafka2

We see that the leadership failed-over to kafka3, which was in the ISR.

(terminal 3)
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 3    Replicas: 2,3,1   Isr: 3

The python producer-silent.py script completes with the output:

(terminal 2)
Sent: 500000
Delivered: 494944
Failed: 5056

Now we print out the high watermark.

$ bash print-hw.sh kafka1 19092 test1
test1:0:494939

The fail-over was a “clean” fail-over and see see similar results to our clean fail-over in part 1, just 5 messages lost.

Unclean Fail-Over

Now we’ll repeat the entire process again but slow down both followers. The only difference to the above commands is that we slow down both followers and wait for both to be removed before killing the leader.

In the new cluster, kafka2 is again the leader of partition 0. You can see from the monitor-topic.sh script how the ISR shrinks to the leader and then the fail-over occurs.

(terminal 3)
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2,1,3    Isr: 2,1,3
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2,1,3    Isr: 2,1,3
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2,1,3    Isr: 2
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2,1,3    Isr: 2
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2,1,3    Isr: 2
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2,1,3    Isr: 2
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 2    Replicas: 2,1,3    Isr: 2
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 1    Replicas: 2,1,3    Isr: 1
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 1    Replicas: 2,1,3    Isr: 1
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 1    Replicas: 2,1,3    Isr: 1
Topic:test1    PartitionCount:1    ReplicationFactor:3    Configs:unclean.leader.election.enable=true
    Topic: test1    Partition: 0    Leader: 1    Replicas: 2,1,3    Isr: 1,3

The producer-silent.py outputs:

Sent: 500000
Delivered: 495103
Failed: 4897

The high watermark is:

$ bash print-hw.sh kafka3 19094 test1
test1:0:350728

We received 495103 acknowledgements but only 350728 messages were persisted meaning we lost 144375 acknowledged messages. This was only possible because we created the topic with the unclean.leader.election.enable=true topic config. Without it, on killing the leader the topic would have been unavailable until the leader came back.

If we had not allowed unclean leader election the next big question now is: What happens if my lost leader cannot be recovered? With RabbitMQ we also can prevent fail-over to an unsychronized mirror and with RabbitMQ if the data for master cannot be recovered we lose the entire queue! Is Kafka any better? Well currently the answer is not really.

If you bring back the node that hosted the leader partition, but without its data, it can cause the entire cluster to crash! There is an open ticket here regarding the issue. I encountered this bug myself while testing out this functionality. See the scenario 10.

Scenario 9 - Unclean Partition Fail-Over with Acks=all

You might be thinking that acks=all makes for fool-proof durability, especially after looking at part 1 where using acks=all avoided all data loss scenarios. Well, now we’re going to see the limit of acks=all.

We rerun scenario 8 with two changes:

  • we’ll use acks=all instead.

  • the network delay will be increased to 10 seconds (else the followers don’t go slow enough to be removed from the ISR).

For brevity I’ll just show the output of the producer-silent.py script.

Sent: 500000
Delivered: 495776
Failed: 4224

$ bash print-hw.sh kafka1 19092 test1
test1:0:301252

We lost 194524 messages even with acks=all! But why? Because when the ISR shrunk to just the leader, acknowledgements continued to be sent to the producer. Even when the ISR consists of just one node, the leader, acks=all messages will be acknowledged.

We could have avoided data loss in two ways:

  • Use the default unclean.leader.election.enable=false. This would have made the partition be unavailable until the broker (and its data could come back). But we’ve seen that the whole cluster can go down of the lost broker loses all its data (mentioned in scenario 8 and covered in scenario 10).

  • Or we could prevent the partition getting into the situation where it has no redundancy by setting min.insync.replicas to 2 or more. This refuses writes when the ISR consists of a single node. The topic will be unavailable but we get better redundancy.

Scenario 10 - Exiting because log truncation is not allowed

The steps are the same as scenario 8 except:

  • we create a topic with default setting of false for unclean.leader.election.enabled

  • uncomment the “volumes” property in the blockade.yml so that the kafka data is written to your linux host.

Once the leader is the only replica in the ISR, kill that node as in scenario 8. But this time the monitor script will show the leader replica as -1.

Now delete all the contents of /volumes/kafka/03/data (this would be if kafka3 were the node you killed). Then start-up the node you killed:

$ bash start-node.sh kafka3

Now see the other two nodes shutdown. If you inspect the logs you’ll see the error detailed in the JIRA ticket linked to above.

$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION
kafka1          9247ec85b957    DOWN                    UNKNOWN
kafka2          84e908777f84    DOWN                    UNKNOWN
kafka3          263e940652c6    UP      172.17.0.5      NORMAL
zk1             86421d8612fa    UP      172.17.0.2      NORMAL

In the logs of kafka2:

2018-09-19 21:31:36,936] ERROR [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Exiting because log truncation is not allowed for partition test1-0, current leader's latest offset 0 is less than replica's latest offset 29413 (kafka.server.ReplicaFetcherThread)

In the logs of kafka1:

[2018-09-19 21:31:36,814] ERROR [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Exiting because log truncation is not allowed for partition test1-0, current leader's latest offset 0 is less than replica's latest offset 51540 (kafka.server.ReplicaFetcherThread)


This is a pretty serious bug as a problem with a single partition can take down a cluster. A good reason to use min.sync.replicas=2 so that you never end up with a situation like this.


Scenario 11 - Slow and Flaky Network (No message loss)

Just like with RabbitMQ I introduce 250ms of delay to all outgoing packets for all nodes in the cluster.

$ blockade slow --all

Then send 100000 messages.

$ python producer-silent.py 100000 0.0001 1 test1
Sent: 100000
Delivered: 100000
Failed: 0

No errors reported by the client.

$ bash print-hw.sh kafka1 19092 test1
test1:0:100000

No messages lost.

Next we speed up the network and instead introduce flakiness of 2% packet loss.

$ blockade flaky --all

Send 100000 messages again.

$ python producer-silent.py 100000 0.0001 1 test1
Sent: 100000
Delivered: 100000
Failed: 0

No errors reported.

$ bash print-hw.sh kafka1 19092 test1
test1:0:200000

Conclusions

I was surprised to encounter the “exiting because log truncation is not allowed” error. It just goes to show that Kafka, despite being a very mature project, is still encountering bugs that affect stability. You can follow this bug here: https://issues.apache.org/jira/browse/KAFKA-3955.

In general we have seen that publishing messages with acks=all is the remedy to most message loss scenarios, but it also has limits. When a partition has an ISR consisting of only the leader it only takes one node failure to produce data loss. For those wanting to avoid that scenario there’s min.insync.replicas=2 (or more depending on your replication factor). Reducing the risk of lack of redundancy is the best antidote to lossy fail-overs.

If you know of more message loss scenarios feel free to email me or message @vanlightly on Twitter. I suspect the next step would be to start trawling the open Jira issues, but I have other projects I want to spend time on, like Rebalanser and looking at Apache Pulsar.