Fault tolerance and High Availability are big subjects and so we'll tackle RabbitMQ and Kafka in separate posts. In this post we'll look at RabbitMQ and in Part 6 we'll look at Kafka while making comparisons to RabbitMQ. This is a long post, even though we only look at RabbitMQ, so get comfortable.
In this post we'll look at the strategies for fault tolerance, consistency and high availability (HA) and the trade-offs each strategy makes. RabbitMQ can operate as a cluster of nodes and as such can be classed as a distributed system. When it comes to distributed data systems we often speak about consistency and availability.
We talk about consistency and availability with distributed systems because they describe how the system behaves under failure. A network link fails, a server fails, a hard disk fails, a server is temporarily unavailable due to GC or a network link is lossy or slow. All these things can cause outages, data loss or data conflicts. It turns out that it is generally not possible to provide a system that is ultimately consistent (no data loss, no data divergence) and available (will accept reads and writes) under all failure modes.
We'll see that consistency and availability are at two ends of a spectrum and you'll need to choose which of those you'll optimize for. The good news is that with RabbitMQ this is a choice that you can make. It gives you the nerd knobs required to tune it for greater consistency or greater availability.
In this post we'll be paying close attention to what configurations produce data loss of acknowledged writes. There is a chain of responsibility between producers, brokers and consumers. Once a message has been handed off to a broker, it is the broker's job not to lose that message. When the broker acknowledges receipt of a message to the publisher, we don't expect that message to be lost. But we'll see that this indeed can happen depending on your broker and publisher configuration.
Single Node Durability Primitives
RabbitMQ has two types of queue: durable and non-durable. All queues are persisted to the Mnesia database. Durable queues are redeclared on node start-up and so survive a restart, system crash or server failure (as long as the data survives). This means as long as you declare your exchanges and queues to be durable, your exchange/queue infrastructure will come back online.
Non-durable queues and exchanges are deleted on start-up.
Just because a queue is durable doesn't mean its messages survive a node restart. Only messages set as persistent by their publisher will be recovered. Persistent messages do put more load on the broker, but if message loss is unacceptable then persistent messages are a no-brainer.
Clustering with Queue Mirroring
In order to survive the loss of a broker we need redundancy. We can join multiple RabbitMQ nodes into a cluster and then add additional redundancy by replicating queues across multiple nodes. That way if a single node dies we don't lose data and we stay available.
A mirrored queue has:
one master that receives all reads and writes
one or more mirrors that receive all messages and meta data from the master. These mirrors do not exist for scaling out the reading of queues but solely for redundancy.
You can make a queue mirrored by setting a policy. In that policy you can choose the replication factor and even the nodes that the queue should be hosted on. Examples are:
ha-mode: exactly, ha-params: 2 (one master and one mirror)
ha-mode: nodes, ha-params: rabbit@node1, rabbit@node2
Publisher confirms are necessary for achieving consistent writes. Without publisher confirms it is possible to lose messages. A confirm is sent to a publisher once a message has been written to disk. RabbitMQ does not write messages to disk on receipt but on a periodic basis, in the region of a few hundreds ms. When a queue is mirrored then an ack is only sent once all mirrors have also written their copy of the message to disk. This means that using confirms adds more latency, but if data safety is important to you then they are necessary.
When a broker is shutdown or dies, all the queue masters on that node go with it. The cluster then selects the oldest mirror of each master and promotes it to be the new master.
Broker 3 dies. Notice that Queue C has its mirror on Broker 2 promoted to master. Also note that a new mirror has been created for Queue C on Broker 1. RabbitMQ will always try to maintain the replication factor set out in your policies.
Next Broker 1 dies! We only have a single broker left. The Queue B mirror get promoted to master.
We bring back up Broker 1. Whether or not the data survived the loss and recovery of the broker, all mirrored queue messages are discarded on start up. This is important to note as it has implications. We'll look at those implications soon. So Broker 1 is now a member of the cluster again and the cluster tries to honor the policies and so creates mirrors on Broker 1.
In this case the loss of Broker 1 was total, the data too, so Queue D that was not mirrored was lost completely.
Broker 3 is now brought online and Queue A and B get mirrors created on it in order to satisfy their HA policies. But now all the master queues are on a single node! This is not ideal, we would prefer to have our masters distributed evenly between the nodes. There are no great options for rebalancing masters unfortunately. We'll come back to this problem later on as we need to cover queue synchronization first.
So you should now have an idea of how mirrors provide redundancy and perform fail-overs. This gives us availability in the face of node failures and also protection from data loss. But we are not finished yet because in fact it is more complicated than this.
When a new mirror is created all new messages will always be replicated to that mirror and any others. Regarding the existing data on the master, we can choose to replicate them to the new mirror so that it is a full copy of the master. We can also choose not to replicate the existing messages and let the master and the new mirror converge over time as new messages arrive at the tail and existing messages are drained from the head of the master.
This synchronization is either automatic or manual and is controlled via a queue policy. Let's look at an example.
We have two mirrored queues. Queue A has automatic synchronization and Queue B has manual synchronization. Both queues have 10 messages.
Now we lose Broker 3.
Broker 3 comes back online. The cluster creates a mirror for each queue on the new node and automatically synchronizes the new Queue A mirror with the master. The new Queue B mirror however remains empty. So we have full redundancy of Queue A and only a redundancy of one mirror for the existing messages of Queue B.
Both queues get another 10 messages delivered. Then Broker 2 fails and Queue A fails-over to the oldest mirror which is on Broker 1. No data loss occurs in the fail-over. Queue B has 20 messages in the master and only 10 in the mirror as it never got replicated those original 10 messages.
Now Broker 1 fails. Queue A has no problem failing-over to the mirror on Queue A. It does so without message loss. Queue B however has a problem. At this point we can either optimize for availability or consistency.
If we want to optimize for availability then we need that the ha-promote-on-failure policy to always. This is the default value so we can just not specify the policy at all. This basically means that we allow fail-overs to unsynchronized mirrors. Doing so will result in message loss but keeps the queue available for reads and writes.
We could also set ha-promote-on-failure to when-synced. This will prevent a fail-over and wait for Broker 1 to come back online with its data. Once back online, the queue will be available again with the master on Broker 1 with no data loss. Availability is sacrificed for data safety. Except this mode can be dangerous and can even cause total data loss, we'll look at that soon.
Right now you may be thinking "Why would I ever not use automatic synchronization?". The answer is that synchronization is a blocking operation. The master cannot perform any reads or writes during synchronization!
Let's look at an example. Now we have very large queues. Why might they be so large? There could be multiple reasons:
The queues are not being actively consumed
They are high velocity queues and consumers are running slow right now
They are high velocity queues and an outage occurred and consumers are catching up
Fig 12. Two large queues with different synchronization modes.
Now Broker 3 dies.
Broker 3 comes back online and new mirrors are created. The Queue A master starts replicating its existing messages to the new mirror and during this time the queue is unavailable. It takes two hours to replicate the data causing two hours of downtime for that queue!
However Queue B remains available throughout this period. It has sacrificed some redundancy to remain available.
Two hours later, Queue A becomes available and can start accepting reads and writes again.
This blocking behaviour during synchronization makes rolling upgrades of clusters with very large queues problematic. At some point the node hosting the master queue will need to be restarted which means either failing-over to a mirror or making the queue unavailable during the server upgrade period. If we choose a fail-over then we’ll lose messages if the mirrors are not synchronized. The default is not to fail-over to an unsynchronized mirror during the shutdown of a broker. This means that once the broker comes back up we did not lose any messages, the only impact was down-time for the queue. You can control the shutdown behaviour with the ha-promote-on-shutdown policy. You can set it to one of two values:
always = fail-over to unsynchronized mirrors enabled
when-synced = only fail-over when a synchronized mirror is available, else make the queue unavailable for reads and writes. When the broker comes online again the queue will also come back online
So either way you look at it, with large queues you have choose between data loss and unavailability.
When Availability Means Better Data Safety
There's one more complication to take into account before you make a decision. While automatic synchronization is better for data redundancy, is it still better for data safety? Sure RabbitMQ itself will be less likely to lose messages as it has better redundancy of existing messages, but what happens to new messages being sent by your publishers?
You need to think about:
Can my publisher simply return an error upstream and the upstream service or user can retry later?
Can my publisher persist the message locally or to a database so it can retry later?
If the answer is that your publisher can only drop the message then in fact availability is also better for data safety.
So it is a balancing act and the decision depends on your situation.
Problems With ha-promote-on-failure=when-synced
The idea of the ha-promote-on-failure = when-synced is that we prevent failing-over to an unsynchronized mirror and thereby avoid data loss. The queue remains unavailable for reads or writes. Instead we bring try back the lost broker with its data intact so that it can resume being the master, with no data loss.
But, and this is a big but, if the broker lost its data, then we have a big problem, we only have the following options:
Bring back the broker without its data (total data loss)
We tell the cluster to forget the lost node (using rabbitmqctl forget_cluster_node command) and start a new broker with the same hostname. While the cluster still remembers our lost node, it remembers that our queue existed and has unsynchronized mirrors. When the cluster is told to forget the lost node, that queue is forgotten as well. We must now redeclare it. We lost all data even though we had mirrors that had a partial set of the data. It would have been better to have failed-over to an unsynchronized mirror!
Shutdown mirrors and promote (unavailability)
Shutdown the brokers that have the mirrors of the lost master. Tell the cluster to forget the broker of the lost master. The cluster now promotes the mirror on the most recently stopped broker. Now we start up the stopped brokers again and we have a master, albeit that has lost messages. But we do not lose everything. We have sacrificed both availability and data safety!
Also note that if we have multiple mirrored queues of various sizes, with some that have unsynchronized mirrors then this strategy can become nightmarish. There might be all kinds of collateral damage to other mirrored queues.
So using manual synchronization (and not performing synchronization) coupled with ha-promote-on-failure=when-synced is in my opinion pretty dangerous. The docs say it exists for data safety but it is a double edged sword.
As promised, we come back to our problem of having masters concentrated on a single node or a couple of nodes. This can even happen as the result of a rolling upgrade of your cluster. In a three node cluster you'll end up with all your masters on one or two nodes.
Rebalancing masters can be difficult for two reasons:
There are no great tools for doing rebalancing
There is a 3rd party plugin for rebalancing masters that is not supported. Pivotal says "The plugin has some additional configuration and reporting tools, but is not supported or verified by the RabbitMQ team. Use at your own risk."
There is also a trick of using HA policies to move a master. Pivotal provide a script that uses this trick here. It works by:
Removing all mirrors via a temporary policy that has higher priority than the existing HA policy
Changing the temporary HA policy to use "nodes" mode specifying the node where you want the master to be migrated to.
Synchronizing the queue to force the migration
Once migration is complete, removing the temporary policy. The original HA policy now takes precendent and the desired number of mirrors are created.
The downside of this approach is that is might not be feasible if you have large queues or strict redundancy requirements.
Now let's look at how RabbitMQ clusters deal with network partitions.
Distributed system nodes are separated by network links and network links can and will go down. How often depends on your on-premise infrastructure or the reliability of your chosen cloud. Either way, distributed systems need to be able to cope with them. Again we are left with the choice of choosing availability or consistency, and again the good news is that RabbitMQ gives both options (just not at the same time).
With RabbitMQ we have two primary options:
Allow split-brain. This provides availability but can provoke data loss when the split-brain is resolved.
Disallow split-brain. This can cause some short-lived disruption to availability depending on how your clients connect to the cluster. It can also cause complete unavailability in a two node cluster.
But what is a split-brain? It is where a cluster is divided in two because the network links cut-off part of the cluster. On each side of the partition, mirrors get promoted to master. This means we end up with more than one master per queue.
So if publishers end up writing to both masters we'll have two diverging copies of the queue.
RabbitMQ provides different partition modes which favor either availability or consistency.
Ignore Mode (Default)
This mode opts for availability. When a partition happens, split-brain occurs. When the partition is resolved, the administrator has to decide which side of the partition wins. The losing side needs to be restarted and any data that only existing on that side of the partition is lost.
Now we lose Broker 3. Broker 3 seeing that the other brokers have gone, promotes its mirror to master. We now have split-brain.
The network partition gets resolved but the split-brain continues. The administrator must manually resolve the split-brain by choosing a losing side of the partition. In the below case the administrator shutdown Broker 3 and brings it back. Any messages not consumed from Broker 3 are lost when the Broker rejoins the cluster.
Throughout the network partition and afterward the cluster and that queue were available for reads and write.
This is exactly the same as Ignore mode except that the cluster itself will automatically choose a losing side of the partition. The losing side rejoins the cluster empty, losing any messages that were not consumed and only sent to that side of the partition.
If we don't want to allow a split-brain scenario, then our only option is to refuse reads and writes on the minority side of a partition. When a Broker sees that it is on the minority side of a partition it pauses itself. This means that it closes down any existing connections and refuses any new connections. It checks once per second to see if the partition has resolved itself. Once the partition has resolved itself it will unpause itself and rejoin the cluster.
Then Broker 1 and 2 are separated from Broker 3 by a network partition. Rather than promote its mirror to a master, Broker 3 pauses itself becoming unavailable.
Once the partition is resolved it rejoins the cluster.
Let's look at another example where the master is on Broker 3.
Then the same network partition occurs. Broker 3 pauses itself as it is on the minority side. On the majority side the nodes see that Broker 3 is gone and the oldest mirror between Broker 1 and 2 is promoted to master.
When the partition is resolved, Broker 3 rejoins the cluster.
What is important to realize here is that we get consistency, but we can also get availability if we can successfully route clients to the majority side of the partition. I personally would choose Pause Minority for most situations but it really depends on your particular use case.
Making sure clients can successfully connect to a node is important for availability. Let's look at our options.
Ensuring Client Connectivity
We have a few options for directing clients to the majority side of a partition or to the nodes that are live (after one node has failed). First let's remember that a given queue is hosted on a specific node, but the exchanges and policies are replicated across all nodes. Clients can connect to any node and internal routing will make sure the clients get connected to the right node. But when a node is paused it refuses connections, so clients must connect to a different node. When a node is down it cannot do much either.
Our options are:
Access the cluster via a load balancer that does simple round robin and clients perform connection retries until successful. If a node is down or paused then the connection attempts to that node will fail, but subsequent attempts will be made to other servers (in a round robin way). This would work for short lived network partitions, or a downed server that will be brought back quickly.
Access the cluster via a load balancer and remove the paused/downed nodes from the list as soon as detected. If you can make that change quickly and if clients can perform connection retry attempts then we should get continued availability.
Give each client the list of all nodes and the client randomly chooses one node when they connect. If they get a connection refused error then they switch to another node in the list until they can connect.
Use DNS to switch to accesses away from a downed/paused node. This would require a short TTL.
RabbitMQ clustering is a mixed bag. The most serious deficiencies are that:
nodes that rejoin a cluster throw away their data
synchronization is blocking and causes queue unavailability.
All the hard decisions stem from these two design decisions. If RabbitMQ could keep data around when rejoining a cluster then synchronization would be faster. If it could make synchronization non-blocking then it would better support large queues. Fixing those two problems would make RabbitMQ a much better choice for a fault tolerant and highly available messaging technology. I would hesitate to recommend RabbitMQ with clustering when:
The network is not rock solid
Storage is not rock solid
You have very large queues
Regarding settings, for high availability think about:
cluster_partition_handling=ignore or autoheal
Ensure your clients can still connect to a live node when a node goes down
For consistency (data safety) think about:
use Publisher Confirms and Manual Acknowledgements on the consumer side
ha-promote-on-failure=when-synced if publishers are able to retry later on AND if you have VERY reliable storage! Else go for always.
ha-sync-mode=automatic (but for large inactive queues you may need to consider manual mode, also think about whether unavailability might cause message loss)
Pause Minority mode
There is still more to fault tolerance and high availability, like how to safely perform administrative operations like rolling upgrades. There is also the shovel and federation which I still need to write about.
If I missed anything else out then please let me know.
See my post where I use Docker and Blockade to wreak havoc on a RabbitMQ cluster to demonstrate some message loss scenarios, based on the theory we covered in this post: How to lose messages on a RabbitMQ cluster.