How to Lose Messages on a RabbitMQ Cluster

 RabbitMQ chaos testing with Blockade, Docker, Python and some Bash

RabbitMQ chaos testing with Blockade, Docker, Python and some Bash

In my RabbitMQ vs Kafka series Part 5 post I covered the theory of RabbitMQ clustering and some of the gotchas. In this post we'll demonstrate the message loss scenarios described in that post using Docker and Blockade. I recommend you read that post first as this post assumes understanding of the topics covered.

Blockade is a really easy way to test out how distributed systems cope with network partitions, flaky networks and slow networks. It was inspired by the Jepson series. In this post we'll either be killing off nodes, partitioning the cluster, introducing packet loss or slowing down the network. So with Blockade, some bash and python scripts we’ll test out some failure scenarios.

See the Blockade documentation regarding installation. I am running this code on Fedora 28 with the latest version of Docker CE and RabbitMQ as of the date of this post. Also I am using the Pika python library version: 0.12 with Python version 3.66.

Cluster Creation

Check out the blockade.yml in the Github repo. One limitation is that Blockade uses the "links" attribute to provide visibility between containers and we cannot have circular references. So we can't have each node see every other node. Instead I have setup nodes 2 and 3 to be linked to node 1. Then once the cluster is up I update the hosts file of each container via a shell script, that way we achieve the required visibility we need.

$ blockade up
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION
rabbitmq1       00f983c8f4a7    UP      172.17.0.2      NORMAL
rabbitmq2       285363a0b042    UP      172.17.0.3      NORMAL
rabbitmq3       57ed8b61bea3    UP      172.17.0.4      NORMAL

$ bash update-hosts.sh 
Acquiring container ids and ip addresses
Updating hosts of rabbitmq1
Updating hosts of rabbitmq2
Updating hosts of rabbitmq3

I have created a reset-cluster.sh script that executes those two commands, I run it between each scenario to create a new cluster.

In each scenario I create a queue with an ha-params=all policy plus additional settings according to the scenario. The master node is always located on broker 3. In the example below I connect to broker 1 to create a queue "test1" whose master is located on broker 3.

$ python create-ha-queue.py rabbitmq1 test1 rabbit@rabbitmq3

In each scenario we will send a load of messages and print out for each 10000 messages sent, how many got a success or failure response. When fire-and-forget is used, a success response is deemed message publication without an exception. 

No Retries

None of the scenarios below use retries. That means that final counts of messages in the queue will not include duplicates. In some cases that means that the number of positively acknowledged messages is lower than the send count, as we never try to resend a message.

Scenario 1 - Fire-and-forget with a queue master fail-over due to a failed node

Queue: test1
Master on: rabbitmq3
Policy: ha-params=all, ha-promote-on-failure=always

In this scenario we send 100000 messages without publisher confirms to a mirrored queue with ha-params=all. Messages are sent to broker 1 and during message transmission I kill broker 3 which is the node with the queue master, triggering a fail-over.

First I create a queue and the HA policy (corresponding to the two 201 responses). Then I initiate the sending of messages.

$ python create-ha-queue.py rabbitmq1 test1 rabbit@rabbitmq3
<Response [201]>
<Response [201]>

$ python fire-and-forget.py rabbitmq1 100000 test1
Connected to 172.17.0.2
Success: 10000 Failed: 0
Success: 20000 Failed: 0
...

Meanwhile, I send a kill -9 command to kill the rabbitmq-server process in the container.

$ bash kill-node.sh rabbitmq3
Killing 657

The fire-and-forget.py script continues to send messages to broker 1.

$ python fire-and-forget.py rabbitmq1 100000 test1
Connected to 172.17.0.2
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent 100000 messages
100000 success responses
99838 in the queue
168 messages lost

168 messages were lost in the fail-over. We had no connection error because we were connected to broker 1, it was routing the messages to broker 3. When broker 3 went down it failed-over to broker 1.

Scenario 2 - Fire-and-forget with a connection failure

Queue: test1
Master on: rabbitmq3
Policy: ha-params=all, ha-promote-on-failure=always

This time we'll connect to broker 2 and the master will be on broker 3. We'll kill broker 2 during message transmission, this should result in only a connection failure.

$ python fire-and-forget.py rabbitmq2 100000 test1
Connected to 172.17.0.3
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Connection closed.
Connected to 172.17.0.4
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Sent 99999 messages
84074 messages in the queue
15925 messages lost

Massive data loss as the publisher had sent almost all the messages before any TCP connection failure could be detected.

Scenario 3 - Publisher confirms with queue fail-overdue to a failed node (no message loss)

Queue: test1
Master on: rabbitmq3
Policy: ha-params=all, ha-promote-on-failure=always

In this scenario we send 100000 messages while connected to broker 2. The queue master is on broker 3. During message transmission we kill broker 3 causing a fail-over but no connection failure.

This time I’m using the Pika asynchronous publisher with publisher confirms selected. The publisher limits itself to 10000 messages in flight (unacknowledged messages).

$ python send-with-confirm.py 1 100000 test1
Attempting to connect to 172.17.0.3
Connection open
Success: 10000 Failed: 0
Success: 20002 Failed: 0
Success: 30003 Failed: 0
Success: 40001 Failed: 0
Success: 50000 Failed: 0
Success: 60001 Failed: 0
Success: 70000 Failed: 0
Success: 80008 Failed: 0
Success: 90002 Failed: 0
Waiting an extra 5 seconds for last acks. Pending: 100000 Current: 90009
Success: 100000 Failed: 0
Results -------------
Success: 100000
Failed: 0
No Response: 0
100000 messages in the queue
0 messages lost

We fail in our mission to lose messages. When using publisher confirms, RabbitMQ ensures that the master and all mirrors have persisted the message to disk. This means that any acknowledged write has been durably replicated to the mirrors, which means that fail-overs produce no message loss.

Scenario 4 - Publisher confirms with connection failure only (no message loss)

Queue: test1
Master on: rabbitmq3
Policy: ha-params=all, ha-promote-on-failure=always

In this scenario we connect to broker 2 and start sending messages. The master is on broker 3. When a connection fails it will attempt to reconnect and continue sending messages.

Midway through sending the messages I kill the node that it is connected to. But the queue master is on another node so no fail-over occurs.

$ python send-with-confirm.py 1 100000 test1
Attempting to connect to 172.17.0.3
Connection open
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Connection closed. Reason: error(104, 'Connection reset by peer')
Attempting to connect to 172.17.0.3
Attempting to connect to 172.17.0.4
Connection open
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Waiting an extra 5 seconds for last acks.
Results -----------
Success: 90000
Failed: 0
No Response: 10000
90000 messages in the queue
0 messages lost

We see that we got 90000 acknowledgements and there are 90000 messages in the queue. 10000 messages received no ack at all. This is because I limited the number of unacknowledged messages at a time to 10000. All the unacknowledged messages at the time of the connection failure could not be acknowledged.

Again we failed to produce message loss. Publisher confirms are doing their job.

Scenario 5 - Rolling Upgrade with Manual Synchronization and ha-promote-on-shutdown=always

Queue: test1
Master on: rabbitmq3
Policy: ha-params=all, ha-promote-on-failure=always, ha-promote-on-shutdown=always

In this scenario we create a mirrored queue with manual synchronization and ha-promote-on-shutdown=always. As usual, the master is on broker 3. We publish 10000 messages, then restart broker 1 (to apply some patches to the server). Then publish another 10000 messages and restart broker 2 to apply the same patches. At this point we have the master with 20000 messages, broker 1 with 10000 and broker 2 with 0 messages. Then we restart broker 3 that has the queue master.

$ python send.py rabbitmq2 10000 test1
10000 messages sent
10000 messages in the queue
$ bash restart-node.sh rabbitmq1
rabbitmq1 restarted
$ python send.py rabbitmq2 10000 test1
10000 messages sent
20000 messages in the queue
$ bash restart-node.sh rabbitmq2
rabbitmq2 restarted
$ python send.py rabbitmq2 10000 test1
10000 messages sent
30000 messages in the queue
$ bash restart-node.sh rabbitmq3
rabbitmq3 restarted
$ python get-message-count.py test1
20000

At the end I call a python script that retrieves the message count of the test1 queue. We have lost 10000 messages. The reason is that when we restarted the docker container of the queue master, and because we allow for a fail-over to an unsynchronized mirror, it failed over to the oldest mirror which was on broker 1. Broker 1 was never synchronized after restart and so lost the very first 10000 messages that we sent at the start.

Rolling upgrades without performing synchronization and with ha-promote-on-shutdown=always will lose messages.

So if you need to perform rolling upgrades of a cluster and have very large queues and cannot afford the long unavailability period while synchronization occurs, then choose the default ha-promote-on-shutdown=when-synced. This will make the queue unavailable while the queue master is offline, but should be shorter than a full synchronization. Of course, after your rolling upgrade without sycnhronization you will have lost all redundancy for your queue. RabbitMQ clustering does not play nicely with large queues.

Another strategy could be do the rolling upgrade over a period of days, with no synchronization. If the queues are being actively consumed then the queues will naturally synchronize as the original tail of the queue becomes the head. Once a restarted server has naturally synchronized, you can work on the next server. This way you maintain availability and redundancy through the upgrade period. The downside is the long upgrade period.


Scenario 6 - Network Partition with Ignore Mode

Queue: test1
Master on: rabbitmq3
Policy: ha-params=all, ha-promote-on-failure=always
Broker config: cluster_partition_handling=ignore

In this scenario we’ll publish 10000 messages to the queue. Then we’ll partition off broker 3. We have multiple publishers, each connected to a different broker. During the partition all brokers in the cluster remain available for reads and writes. Now the cluster is in a split-brain and the mirror on broker 1 gets promoted, leaving us with two masters.

10000 more messages get sent via connections to broker 1 and 2, while 5000 get published to broker 3. Now the master on broker 1 has 20000 messages and the master on broker 3 has 15000 messages.

We resolve the partition and as the administrator we choose broker 3 as the victim and restart it.

Note that I leave a few seconds between running each command. After starting the network partition, I left it about a minute for the cluster to detect the partition and promote the mirror.

$ python send.py rabbitmq2 10000 test1
Connected to 172.17.0.3
Sent 10000 messages
10000 messages in the queue
$ blockade partition rabbitmq3
$ python send.py rabbitmq2 10000 test1
Connected to 172.17.0.3
Sent 10000 messages
20000 messages in the queue
$ python send.py rabbitmq3 10000 test1
Connected to 172.17.0.4
Sent 5000 messages
15000 messages in the queue
$ blockade join
$ bash restart-node.sh rabbitmq3
$ python get-message-count.py test1
15000

At the end we are left with a healed cluster that lost the 5000 writes to rabbitmq3 during the partition. But we remained available.

Also note that if we had had active consumers connected to broker 3 the message loss would have been significantly less. In fact, we could have orchestrated it so that no messages were lost by doing the following:

  1. Detect partition

  2. Prevent applications from publishing to minority side of the partition

  3. Ensure consumers consume all messages from the minority side

  4. Restart minority side node(s)

Obviously, this can introduce some message duplication on the consumer side as any messages that existed in the queue before the partition will be delivered and consumed twice. But we should avoid message loss.

Scenario 7 - Network Partition with Autoheal Mode

Queue: test1
Master on: rabbitmq3
Policy: ha-params=all, ha-promote-on-failure=always
Broker config: cluster_partition_handling=autoheal

This scenario is exactly like scenario 6 except as the administrator we have no power to choose which node(s) to restart. We cannot perform any nifty orchestration of publishers and consumers to reduce message loss. Instead, the cluster decides for itself which side of the partition must throw away its data. This mode is nice for availability and low overhead administration, but potentially worse for data loss.

$ python send.py rabbitmq2 10000 test1
Connected to 172.17.0.3
Sent 10000 messages
10000 messages in the queue
$ blockade partition rabbitmq3
$ python send.py rabbitmq2 5000 test1
Connected to 172.17.0.3
Sent 5000 messages
15000 messages in the queue
$ python send.py rabbitmq3 10000 test1
Connected to 172.17.0.4
Sent 10000 messages
20000 messages in the queue
$ blockade join
$ python get-message-count.py test1
15000

In this scenario the minority side got more messages, but the autoheal chose the larger side of the partition as the winner and so we ended up with 15000 not 20000 messages. We lost the 10000 messages written to the master on broker 3.

Scenario 8 - Network partition with Pause Minority Mode and Publisher Confirms (no data loss)

Queue: test1
Master on: rabbitmq3
Policy: ha-params=all, ha-promote-on-failure=always
Broker config: cluster_partition_handling=pause_minority

In this scenario the cluster is configured with pause minority mode. That means the node(s) on the minority side of the partition pause themselves, disconnecting clients and refusing new connections. Once the partition is resolved the node rejoins the cluster again.

$ python send.py rabbitmq2 10000 test1
Connected to 172.17.0.3
Sent 10000 messages
10000 messages in the queue
$ blockade partition rabbitmq3
$ python send.py rabbitmq3 10000 test1
Could not connect. Trying again in 5 seconds
Connected to 172.17.0.2
Sent 10000 messages
20000 messages in the queue
$ blockade join
$ python get-message-count.py test1
20000

After the partition is created, the mirror on broker 1 is promoted to master. The send.py tries to connect to broker 3 but fails to connect to it (as it is paused) so tries another node and connects to broker 1 and sends the 10000 messages. At the end we have 20000 messages. No data loss and just a short period of unavailability while the cluster sorted itself out after the partition started.

But, perhaps we are missing something here. What happens if the partition occurs while we are sending messages? Let’s try it again, but when sending 100000 messages with confirms.

$ python send-with-confirm.py rabbitmq3 100000 test1
Attempting to connect to 172.17.0.4
Connection open
Success: 10000 Failed: 0
Success: 20131 Failed: 0
Success: 30009 Failed: 0
Connection closed. Reason: error(104, 'Connection reset by peer')
Attempting to connect to 172.17.0.4
Failed to connect. Will retry in 5 seconds
Attempting to connect to 172.17.0.2
Connection open
Success: 71382 Failed: 0
Success: 81274 Failed: 0
Waiting an extra 5 seconds for last acks.
Results -------------
Success: 89986
Failed: 0
No Response: 10014
90204 messages in the queue
0 messages lost

At around the 30000 message mark I isolated broker 3 using Blockade. It hung for about 60 seconds and then the connection to broker 3 was lost. It then tried to reconnect but failed (as broker 3 was paused) and then connected to broker 1 and continued sending messages.

It didn’t receive a response for 10014 messages which closely corresponds to the 10000 limit on messages in flight I have configured the publisher with.

Interestingly we see that no messages were lost and that some messages for which we never received an ack for did get persisted successfully. So we see that a combination of publisher confirms, and pause minority enabled both data safety and availability.



Scenario 9 - Network partition with Pause Minority Mode and Fire-and-forget

Queue: test1
Master on: rabbitmq3
Policy: ha-params=all, ha-promote-on-failure=always
Broker config: cluster_partition_handling=pause_minority

In this scenario we’ll send 100000 messages without confirms while connected to broker 3. Midway through publishing we’ll isolate broker 3.

$ python fire-and-forget.py rabbitmq3 100000 test1
Connected to 172.17.0.4
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Connection closed.
Could not connect. Trying again in 5 seconds
Connected to 172.17.0.2
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Sent 99999 messages
60810 messages in the queue
39189 messages lost


We see massive data loss. I isolated broker 3 at about the 30000 message mark and it had sent another 30000 messages before the connection got closed. It then connected to broker 1 and carried on publishing. All told we lost about 40% of the messages. Sending messages without confirms to a paused side of a partition looks to be worse than a failed node.

Scenario 10 - Queue Synchronization (Potential Data Loss and Unavailability)

In this scenario I have a queue with 1 million messages totalling 1Gb of disk space. Queues can definitely get larger. I then restarted a node to test how long synchronization would take. It took 3 minutes. All this is running on my Fedora laptop on Docker containers. Over a network it might take a lot longer, but then you’ll probably have beefy servers and a fast network so it may well be shorter. I am doing this on my laptop which has 8Gb of memory and 2 cores (it’s a VM).

The default synchronization works by synchronizing 4096 messages at a time. Larger batches could speed up synchronization. I updated the policy with the setting ha-sync-batch-size=50000 and performed the same test again. This time the queue synchronized in 1:25 minutes.

Next I increased the batch size to 500000 and it ran in 36 seconds. I was curious to see if I could trigger the documented network partition behaviour by putting an even larger batch size. So I set the policy to 1000000 batch size. This time it completed in 56 seconds, not quite enough to trigger the partition behaviour.

So I sent an additional 1000000 messages to the queue and set the policy to a 2000000 batch size. This time the queue simply got stuck in synchronization and became unavailable. Broker 3 had reached its memory limit and ceased sending messages to the mirror - causing synchronization to stall. Cancelling the synchronization did not work and the queue remained stuck in syncing mode, there is an open Github issue for this. The only way I could resolve the issue was to lower the batch size back to 50000 and restart broker 3.

Obviously, setting a batch size that big is pretty extreme. Just be aware that you can get yourself in trouble by setting it too high. You might be using the 4096 default but with 1Mb messages for example, that could be enough to trigger this.

You can really do some damage by adding a new HA policy with automatic synchronization if you have many large queues that are currently not mirrored. Imagine you have a bunch of multiple Gb queues that are not mirrored, but you do have a cluster. If you add an ha-mode=all, ha-sync-mode=automatic policy which matches all your queues, just watch as your cluster maxs out the CPUs and memory usage performing synchronization to multiple mirrors for multple queues at the same time. This was the only way I managed to trigger the network partition behaviour.

There are additional gotchas around synchronization that I haven’t covered and don’t understand yet myself. It definitely warrants further investigation and testing. I also need to see how lazy queues affect synchronization. I think RabbitMQ still has some maturing to do in this area.

Scenario 11 - Flaky Network Dropping 2% of Packets without Publisher Confirms (no message loss)

This is a high packet loss scenario. Blockade uses netem which can add delays, introduce packet loss and other issues to outgoing packets. This means that the packet loss will affect publisher confirms and message replication between master and mirrors. In theory TCP should handle it for RabbitMQ.

You can set the packet loss % in the blockade.yml then start up the cluster. Once started run the command:

$ blockade flaky -all
$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
rabbitmq1       79ba7d9f0e4b    UP      172.17.0.2      FLAKY                 
rabbitmq2       3790e40cadf2    UP      172.17.0.3      FLAKY                 
rabbitmq3       382348a8bb74    UP      172.17.0.4      FLAKY

We’ll send 100000 messages without publisher confirms and see if we end up with 100000 messages.

$ python fire-and-forget.py rabbitmq1 100000 test1
Connected to 172.17.0.2
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent 100000 messages
100000 messages in the queue
0 messages lost

We ended up with no message loss. So I ran it again with 10% packet loss. The result this time was that the cluster became sporadically unavailable and it took about 3 minutes to replicate all messages to the mirrors. But again no messages were lost. RabbitMQ is able to cope with even very high packet loss though obviously latency increases a lot with high packet loss.


Scenario 12 - Slow Network (No message loss)

In this scenario we added outgoing packet delays of 75ms +-10ms, with up to 25% variation on the previous delay.

$ blockade slow -all
$ blockade status
NODE            CONTAINER ID    STATUS  IP              NETWORK    PARTITION  
rabbitmq1       79ba7d9f0e4b    UP      172.17.0.2      SLOW                 
rabbitmq2       3790e40cadf2    UP      172.17.0.3      SLOW                 
rabbitmq3       382348a8bb74    UP      172.17.0.4      SLOW


Again we sent 100000 messages without confirms and saw all 100000 in the queue. Again, it took about 3 minutes for the replication to complete.

$ python fire-and-forget.py rabbitmq1 100000 test1
Connected to 172.17.0.2
Success: 10000 Failed: 0
Success: 20000 Failed: 0
Success: 30000 Failed: 0
Success: 40000 Failed: 0
Success: 50000 Failed: 0
Success: 60000 Failed: 0
Success: 70000 Failed: 0
Success: 80000 Failed: 0
Success: 90000 Failed: 0
Success: 100000 Failed: 0
Sent 100000 messages
100000 messages in the queue
0 messages lost

I performed the same test again with 250ms (+- 50ms) delays. This time it took 10 minutes to replicate all the messages to the mirrors, but by the end there were 100000 messages in the queue.

Conclusions

  • Publisher confirms are a must if you value data safety.

  • Pause minority with publisher confirms will protect you from data loss whether the queue is being actively published to and consumed or not.

  • Rolling upgrades when you have large queues can be tricky. I would be interested in hearing other people’s strategies around this.

  • Beware of queue synchronization of large queues. Do testing in a replica of your production cluster to understand the failure modes and periods of unavailability.

  • If your messages are large, and you plan on performing synchronization if the need occurs, then check your ha-sync-batch-size and make sure it cannot cause a problem such as a stuck synchronization.

  • Flaky and slow networks alone shouldn’t be enough to cause message loss. Though I am sure that if you combine another event with a network issue then that could combine to make a message loss scenario. Foremost in my mind is the affect of packet loss or a slow network during a large synchronization.

  • Flaky and slow networks can cause considerable throughput slow down as replication to mirrors slows down considerably. Latency on publisher confirms can increase considerably also.

Are there more scenarios we should consider? Please let me know, thanks!