More failure scenarios! See part 1 for the first seven scenarios.
Scenario 8 - Unclean Partition Fail-Over with Acks=1
In this scenario we’ll simulate one and both followers of a 3 replica partition getting out-of-sync followed by a fail-over.
We create a topic with unclean.leader.election.enable=true. Then we’ll use Blockade to slow the network down for one or both followers causing them to lag behind so far that they get removed from the ISR. We’ll then kill the leader and see a fail-over. When only one follower is out-of-sync we’ll see a fail-over to the in-sync follower, when both are out-of-sync we see a an unclean fail-over. In the last part we saw that fail-over to in-sync followers for acks=1 messages incurred little or no message loss. When all the followers get out-of-sync we should see much greater message loss.
I have added some extra properties to the network element of the blockage yml:
network:
slow: 250ms 50ms 25% distribution normal
This will add 250ms of additional delay to all outgoing packets, +- 50 ms with +-25% variance on the delay on the packet before. These delays will get activated on my containers of choice using the Blockade command slow.
I create the cluster and the topic.
(terminal 1)
$ blockade up
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 c00633b3c1eb UP 172.17.0.3 NORMAL
kafka2 525dc216f128 UP 172.17.0.4 NORMAL
kafka3 ccd622ea2142 UP 172.17.0.5 NORMAL
zk1 b7abcafb4f24 UP 172.17.0.2 NORMAL
$ bash update-hosts.sh
Acquiring container ids and ip addresses
Updating hosts of kafka1
Updating hosts of kafka2
Updating hosts of kafka3
$ bash create-topic-unclean-failover.sh kafka1 test1
Created topic "test1".
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 3,3,1 Isr: 2,3,1
Clean Fail-Over
In our first test we’ll just slow the network for kafka1 which hosts one of the followers. We’ll demonstrate how if there is still a follower in the ISR, the cluster will preferentially fail-over to that one.
(terminal 1)
$ blockade slow kafka1
$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 c00633b3c1eb UP 172.17.0.3 SLOW
kafka2 525dc216f128 UP 172.17.0.4 NORMAL
kafka3 ccd622ea2142 UP 172.17.0.5 NORMAL
zk1 b7abcafb4f24 UP 172.17.0.2 NORMAL
In another shell I start a bash script that prints out the ISR of the topic every second.
(terminal 3)
$ bash monitor-topic.sh kafka3 19094 test1
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
...
Now we start our producer-silent.py and send 500000 messages over a short period. The producer-silent.py only prints out the final result (part 1 showed in great detail the output during a fail-over so we’ll keep the output of part 2 a little more concise). Having a higher number messages than in part 1 gives the slow node time to get out-of-sync.
(terminal 2)
$ python producer-silent.py 500000 0.0001 1 test1
The ISR monitor script will tell me when kafka1 has been removed from the ISR. At that point I kill kafka2 (the leader).
(terminal 3)
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3
Now we kill the leader.
$ blockade kill kafka2
We see that the leadership failed-over to kafka3, which was in the ISR.
(terminal 3)
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 3 Replicas: 2,3,1 Isr: 3
The python producer-silent.py script completes with the output:
(terminal 2)
Sent: 500000
Delivered: 494944
Failed: 5056
Now we print out the high watermark.
$ bash print-hw.sh kafka1 19092 test1
test1:0:494939
The fail-over was a “clean” fail-over and see see similar results to our clean fail-over in part 1, just 5 messages lost.
Unclean Fail-Over
Now we’ll repeat the entire process again but slow down both followers. The only difference to the above commands is that we slow down both followers and wait for both to be removed before killing the leader.
In the new cluster, kafka2 is again the leader of partition 0. You can see from the monitor-topic.sh script how the ISR shrinks to the leader and then the fail-over occurs.
(terminal 3)
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,1,3 Isr: 2
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 1 Replicas: 2,1,3 Isr: 1
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 1 Replicas: 2,1,3 Isr: 1
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 1 Replicas: 2,1,3 Isr: 1
Topic:test1 PartitionCount:1 ReplicationFactor:3 Configs:unclean.leader.election.enable=true
Topic: test1 Partition: 0 Leader: 1 Replicas: 2,1,3 Isr: 1,3
The producer-silent.py outputs:
Sent: 500000
Delivered: 495103
Failed: 4897
The high watermark is:
$ bash print-hw.sh kafka3 19094 test1
test1:0:350728
We received 495103 acknowledgements but only 350728 messages were persisted meaning we lost 144375 acknowledged messages. This was only possible because we created the topic with the unclean.leader.election.enable=true topic config. Without it, on killing the leader the topic would have been unavailable until the leader came back.
If we had not allowed unclean leader election the next big question now is: What happens if my lost leader cannot be recovered? With RabbitMQ we also can prevent fail-over to an unsychronized mirror and with RabbitMQ if the data for master cannot be recovered we lose the entire queue! Is Kafka any better? Well currently the answer is not really.
If you bring back the node that hosted the leader partition, but without its data, it can cause the entire cluster to crash! There is an open ticket here regarding the issue. I encountered this bug myself while testing out this functionality. See the scenario 10.
Scenario 9 - Unclean Partition Fail-Over with Acks=all
You might be thinking that acks=all makes for fool-proof durability, especially after looking at part 1 where using acks=all avoided all data loss scenarios. Well, now we’re going to see the limit of acks=all.
We rerun scenario 8 with two changes:
we’ll use acks=all instead.
the network delay will be increased to 10 seconds (else the followers don’t go slow enough to be removed from the ISR).
For brevity I’ll just show the output of the producer-silent.py script.
Sent: 500000
Delivered: 495776
Failed: 4224
$ bash print-hw.sh kafka1 19092 test1
test1:0:301252
We lost 194524 messages even with acks=all! But why? Because when the ISR shrunk to just the leader, acknowledgements continued to be sent to the producer. Even when the ISR consists of just one node, the leader, acks=all messages will be acknowledged.
We could have avoided data loss in two ways:
Use the default unclean.leader.election.enable=false. This would have made the partition be unavailable until the broker (and its data could come back). But we’ve seen that the whole cluster can go down of the lost broker loses all its data (mentioned in scenario 8 and covered in scenario 10).
Or we could prevent the partition getting into the situation where it has no redundancy by setting min.insync.replicas to 2 or more. This refuses writes when the ISR consists of a single node. The topic will be unavailable but we get better redundancy.
Scenario 10 - Exiting because log truncation is not allowed
The steps are the same as scenario 8 except:
we create a topic with default setting of false for unclean.leader.election.enabled
uncomment the “volumes” property in the blockade.yml so that the kafka data is written to your linux host.
Once the leader is the only replica in the ISR, kill that node as in scenario 8. But this time the monitor script will show the leader replica as -1.
Now delete all the contents of /volumes/kafka/03/data (this would be if kafka3 were the node you killed). Then start-up the node you killed:
$ bash start-node.sh kafka3
Now see the other two nodes shutdown. If you inspect the logs you’ll see the error detailed in the JIRA ticket linked to above.
$ blockade status
NODE CONTAINER ID STATUS IP NETWORK PARTITION
kafka1 9247ec85b957 DOWN UNKNOWN
kafka2 84e908777f84 DOWN UNKNOWN
kafka3 263e940652c6 UP 172.17.0.5 NORMAL
zk1 86421d8612fa UP 172.17.0.2 NORMAL
In the logs of kafka2:
2018-09-19 21:31:36,936] ERROR [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Exiting because log truncation is not allowed for partition test1-0, current leader's latest offset 0 is less than replica's latest offset 29413 (kafka.server.ReplicaFetcherThread)
In the logs of kafka1:
[2018-09-19 21:31:36,814] ERROR [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] Exiting because log truncation is not allowed for partition test1-0, current leader's latest offset 0 is less than replica's latest offset 51540 (kafka.server.ReplicaFetcherThread)
This is a pretty serious bug as a problem with a single partition can take down a cluster. A good reason to use min.sync.replicas=2 so that you never end up with a situation like this.
Scenario 11 - Slow and Flaky Network (No message loss)
Just like with RabbitMQ I introduce 250ms of delay to all outgoing packets for all nodes in the cluster.
$ blockade slow --all
Then send 100000 messages.
$ python producer-silent.py 100000 0.0001 1 test1
Sent: 100000
Delivered: 100000
Failed: 0
No errors reported by the client.
$ bash print-hw.sh kafka1 19092 test1
test1:0:100000
No messages lost.
Next we speed up the network and instead introduce flakiness of 2% packet loss.
$ blockade flaky --all
Send 100000 messages again.
$ python producer-silent.py 100000 0.0001 1 test1
Sent: 100000
Delivered: 100000
Failed: 0
No errors reported.
$ bash print-hw.sh kafka1 19092 test1
test1:0:200000
Conclusions
I was surprised to encounter the “exiting because log truncation is not allowed” error. It just goes to show that Kafka, despite being a very mature project, is still encountering bugs that affect stability. You can follow this bug here: https://issues.apache.org/jira/browse/KAFKA-3955.
In general we have seen that publishing messages with acks=all is the remedy to most message loss scenarios, but it also has limits. When a partition has an ISR consisting of only the leader it only takes one node failure to produce data loss. For those wanting to avoid that scenario there’s min.insync.replicas=2 (or more depending on your replication factor). Reducing the risk of lack of redundancy is the best antidote to lossy fail-overs.
If you know of more message loss scenarios feel free to email me or message @vanlightly on Twitter. I suspect the next step would be to start trawling the open Jira issues, but I have other projects I want to spend time on, like Rebalanser and looking at Apache Pulsar.