The multiple design defects of RabbitMQ Mirrored Queues have been well documented by the community and acknowledged by the RabbitMQ team. In an age where new messaging systems are appearing that compete in the reliable messaging space, it is critical for RabbitMQ to improve its replicated queue story in order to continue to compete in that space. Which is why it is so exciting to see that the RabbitMQ team have been working hard to deliver a new replicated queue type based on the Raft consensus algorithm. Quorum queues are still in beta and as such are subject to change before release. Likewise, their capabilities will no doubt evolve and improve over future releases. There are currently limitations to the features of Quorum Queues but if data safety is your most important requirement then they aim to satisfy your needs.
In this post we'll going to look at the design of Quorum Queues and then in a later post we'll run a series of chaos tests to test the durability claims of this new queue type.
First I'll explain how Raft works, from a high level without including all the intricacies of the protocol. I seriously recommend that you read the Raft paper and other sources of information on Raft if you want to understand the whole protocol and how it achieves its safety guarantees.
An Introduction to Raft
The Raft consensus protocol is not trivial but at the same time it is not too complex to understand. This post will not go through every aspect of Raft as the Raft paper is already as concise a document as you could get. So please do read the paper.
That said, we'll briefly cover the main points on how Raft achieves the guarantees it claims (which are now proven by TLA+).
Brief Overview of Raft
All writes go through a leader who replicates those writes to followers. Data always flows in this direction: leader -> followers. A client can contact any node, but if the node is a follower it will respond to the client informing them who the leader is so they can send their reads/writes there.
A node will remain leader as long as the followers receive its periodic heartbeat within a time period known as the election timeout. If a follower fails to receive the heartbeat in that time, then the follower will convert itself into a candidate and attempt to become the leader. A candidate becomes leader by making a RequestVote RPC to each node in the cluster and receiving a vote from the majority of the nodes.
Two things prevent multiple candidates from being elected leader at the same time:
A majority vote is required (so two concurrent candidates cannot both receive a majority vote).
The concept of a term, also known as an epoch or fencing token, is used to prevent stale requests from being accepted. Each time a new leader is elected, they increment the term by one. When an any node receives a RequestVote RPC, if the term is smaller than their currently known term, then it is rejected. This prevents stale, or zombie nodes which are out of date from gaining leadership.
When a leader fails, the followers will stop receiving this heartbeat and a leader election will occur. Likewise, if a network partition occurs, then followers on the other side of the partition from the leader will stop receiving the heartbeat and a leader election will be triggered on their side of the partition. If these followers, who now become candidates, are on the minority side of the partition then they will be unable to get elected as they require a vote from a majority of the nodes. This is how Raft avoids a split-brain.
So the heartbeat is important, but there is no Heartbeat RPC, instead there is an AppendEntries RPC that is used to replicate data from the leader to followers. This RPC also acts as the heartbeat. Each AppendEntries RPC includes the latest unreplicated data, as well as meta-data required for consistency checks (go read the paper!).
When a client makes a write to the leader, the leader appends the entry to its own log and includes that write in the next AppendEntries RPC that it makes to the followers. The leader will respond to the client once a quorum (majority) of followers have confirmed that they have committed those log entries.
So at the time that a client receives an acknowledgement of a successful write, a minority of nodes may still not have that entry. The leader will continue to try to replicate all entries that a given follower does not have committed to its local log. There are nuances here that the paper explains further.
If at this point the leader fails before having replicated all its log to a minority of nodes, then a newly elected leader must not be one of the minority nodes without the full log. We're talking about acknowledged writes here, so we know that a majority of nodes have the acknowledged entries at the time of the leader failure. If one of the followers without the full log became leader then we'd lose data. To prevent this, the last log entry index is included in Request Vote RPCs. When a node receives a RequestVote RPC from a candidate that has a lower last log entry index, the node will not vote for it. Because a candidate requires a majority to win, and the majority have the full log, then candidates without the full log cannot become the leader.
There are many more nuanced details to Raft regarding what I have already explained and other concepts such as log compaction and cluster membership changes. So go read the paper.
To summarize, acknowledged writes are guaranteed to survive as long as a quorum of nodes survive. So that means a cluster 3 can tolerate 1 node failure (leaving a quorum of 2) and a cluster of 5 can tolerate 2 node failures (leaving a quorum of 3). For this reason we do not go for even numbered clusters. A cluster of 4 can still only tolerate a single node failure and additionally can make the cluster less available under a network partition.
We have not covered read safety yet. All reads and writes go to the leader. The leader can simply read from its local state of committed entries. However, there remains the risk of a short duration split-brain where a leader, L1, is receiving reads and writes, then a network partition occurs. L1 is on the minority side. A new leader on the majority side of the partition gets elected and starts receiving reads and writes.
For a very brief period of time, L1 will not realize it is on the minority side of a partition and so if it responds immediately to all read requests, it might serve stale data. To prevent this, leaders can only serve read requests once a successful heartbeat has been acknowledged. I said earlier that there is no explicit heartbeat RPC and that it uses the AppendEntries RPC for that. When a leader has no data to replicate it simply makes the AppendEntries RPC anyway, without any entries. All live followers will acknowledge that RPC. The leader is making these AppendEntries RPCs on a short interval. When a read request comes in, it waits for the next AppendEntries RPC to be sent out and a response from a majority of nodes, before returning data to the client. This guarantees safety while increasing latency of read requests.
RabbitMQ Quorum Queues
Quorum queues have a single leader and multiple followers, using the same Raft terminology. All reads and writes go through that leader. By default, the replication factor of a quorum queue is 5 (and less if your cluster is smaller than that). So if you have a cluster of 3 then your quorum queue will have three replicas (one leader, two followers).
When a client sends a message to a quorum queue, the broker will respond once a quorum of nodes have accepted that write. When a client consumes from a queue, the queue leader will serve messages that are acknowledged by a quorum of nodes.
Synchronization was the one of the pain points of mirrored queues. With quorum queues, a fail-over will only happen to a fully synchronized follower (as per the Raft protocol dictates). New followers will be replicated to asynchronously in the background (as described by the Raft protocol), causing no unavailability of the queue. The only disruption that occurs is the time for a leader election to be triggered and a new leader to be chosen. Any messages sent during this time will need to be resent by the clients.
Each quorum queue (the leader and its followers) could constitute a single Raft cluster. So with that model if you had a hundred quorum queues then you would have a hundred Raft clusters across your RabbitMQ nodes. The RabbitMQ team had to adjust the Raft protocol a bit to adjust to this as the communication and IO overhead of many independent Raft clusters was too much. Instead the queues share the same underlying storage mechanism and RPCs at a RabbitMQ node level.
Questions and Open Topics
Could the idempotency of writes mentioned in the Raft paper be implemented by the RabbitMQ team?
An exerpt from the paper:
However, as described so far Raft can exe-
cute a command multiple times: for example, if the leader
crashes after committing the log entry but before respond-
ing to the client, the client will retry the command with a
new leader, causing it to be executed a second time. The
solution is for clients to assign unique serial numbers to
every command. Then, the state machine tracks the latest
serial number processed for each client, along with the as-
sociated response. If it receives a command whose serial
number has already been executed, it responds immedi-
ately without re-executing the request.
Such an idempotency implementation would not need to work like that. Apache Kafka for example uses the combination of a Producer Id and a monitonically increasing sequence number. The partition leader keeps track of the highest sequence number acknowledged per producer and topic and ignores messages with a lower sequence number. Also, messages are stored with the Producer Id and Sequence Number so that all partition followers can maintain that same state so that in the case of a fail-over, the deduplication logic continues to work. I blogged about Apache kafka and Apache Pulsar idempotent producers here and both worked flawlessly in the face of TCP connection failures and broker fail-overs.
I am hoping that the same would work for RabbitMQ but the various client libraries would need to be modified to support it. Hopefully it is something that the RabbitMQ team could look at in a future release.
Currently quorum queues do not react to memory alarms. Most of the restrictions on the features of quorum queues come down to the fact they implement Raft and they either cannot or have not yet been able to integrate these features with their Raft implementation.
An interesting area of investigation will be the behaviour of quorum queues under load and memory stress.
Testing the Beta Release
Implementing Raft is not trivial and there is a fair amount of scope for implementation bugs. Add to that the RabbitMQ implementation has had to deviate a little due to the performance problems of treating each quorum queue as an independent Raft cluster.
I will start running a suite of tests against the current beta release of quorum queues soon. The RabbitMQ team have already used Jepsen to perform this kind of test so hopefully I shouldn’t find any problems.
The Quorum queues feature is great news for RabbitMQ and the community. It was a necessary step for RabbitMQ to continue to play in the field of reliable messaging systems. Apache Kafka was the first big disruptor to RabbitMQ in the open source reliable messaging space and we are now seeing Apache Pulsar gaining ground. Perhaps RabbitMQ will not be able to match Apache Pulsar in terms of stability and reliability but the strength of RabbitMQ is not that it has always been the best in a specific category but that it is such a versatile messaging system with the best client library support out there. It doesn’t have to be the most reliable messaging system out there, but the table stakes in the open source reliable messaging space are higher these days and quorum queues will allow RabbitMQ to continue to compete in this area.