I recently spoke at the RabbitMQ Summit in London about using the Consistent Hash Exchange to maintain processing order guarantees while scaling out consumers. Afterwards I was asked why I don’t opt for the Sharding Plugin instead. One of the downsides of the Consistent Hash Exchange I spoke of in the talk was that you don’t get automatic queue assignment for your consumers. The Sharding Plugin makes an attempt to address this problem but doesn’t go all the way. In this post I’ll describe my issues with the Sharding Plugin.
Setting Some Context - Routing Based on a Hash Function
Rather than just scaling out consumers, we can also scale out our queues. The downside of competing consumers is that when we process a sequence in parallel we lose ordering. The larger the prefetch, processing time and/or processing time variance, the larger the disordering of our ordered sequence. Another reason to scale out the queues is if a single queue cannot handle the message velocity.
Instead we can scale out our queues and have a single consumer per queue. We can use a hash function on the routing key to determine which of the queues a message is routed to. The idea is similar to many distributed data systems that use a hash function to partition data. The benefit of routing based on the hash of the routing key is that you can use it to achieve causal ordering. When we partition our data over multiple queues we lose total ordering, but usually we don’t need that. Causal ordering gives us all the consistency we usually need. For example we might see the following causal ordering of events: booking.created -> booking.modified -> booking.cancelled.
For example, we can send all “bookings” related events to the same Consistent Hash exchange using the Booking ID as the routing key, and all events of that booking go to the same queue. When we have a single consumer of that queue that processes those events sequentially we get ordering guarantees at the scope of a given booking.
This might be a bit of a niche use case. Although competing consumers do mess with the ordering, it probably won’t affect you in most cases. Whether this approach sounds useful to you depends on your project and what you want to optimize for.
The Consistent Hash Exchange
This exchange will route messages to bound queues based on the hash of the routing key. Each bound queue has a weight, which allows you some control over the distribution of messages, though I have always used an equal distribution.
Publishers simply publish to the exchange but the consumers must be explicitly configured to consume from specific queues. If you have ten queues then you need to setup your consumers so that all ten are covered. This can be a pain point. You must use your build or configuration system to control that. Anything that can affect the number of running consumers, like rebooting a server, auto-scaling groups etc can affect the consumption of your queues. You need to build the smarts to make sure all bound queues are always being consumed and not exceeding one consumer for a given queue.
The Sharding Plugin
The sharding plugin can also route messages by a hash function, but automates some things that the Consistent Hash exchange does not. Unfortunately it automates the easy stuff and leaves you still with the hard stuff.
Once you have enabled the plugin, you create an exchange. If you want hash based routing, you declare an x-modulus-hash exchange. Then you apply a policy to it with the field “shards-per-node”. Let’s say we have three nodes. If we set “shards-per-node” to 5 then we’ll end up with 15 queues created across those nodes.
The plugin claims to create a single logical queue from those multiple queues. You simply tell your consumers to consume from the logical name of the queue (the exchange name) and each of your consumers automatically gets assigned to one of the queues.
The creation and binding of queues is trivial, so I don’t find the automatic queue creation particularly useful. Automatically assigning a consumer to a queue is nice but it doesn’t fully solve that problem. If I have 15 queues and I start up 10 consumers, then at best 5 queues are left unconsumed. My “one big logical queue” abstraction is broken, as it does not behave like one queue, it behaves like multiple separate queues.
If I start up more consumers than queues, then queues start getting assigned twice. This might be ok for you, or it might not. When I use this pattern I normally want a single consumer per queue. I am quite happy to have multiple queues assigned to a single application, but not the other way around - think Kafka consumer groups.
I said “at best 5 queues are left unconsumed” earlier because your consumer will only be assigned to a queue local to the broker that the consumer is connected to. This means that if you have a load balancer in front of your cluster then, ensuring that all queues get consumed from is problematic. The only way to ensure that each queue is consumed from is by ensuring that the correct number of consumers connect to the right brokers. Else you end up with some queues with more than one consumer and some without any.
So it is still up to me, the system administrator, to ensure that I have the right number of consumers running and ensuring that they each connect to the brokers in the cluster such that all queues get consumed from. This is not behaving as a single logical queue and it doesn’t seem to make queue assignment any easier.
The plugin says that is does auto-scaling by automatically creating and binding new queues when an extra RabbitMQ node is added to the cluster. Except that I must not forget to scale out my consumers myself, as those new queues will be left unconsumed. Unconsumed queues are bad. Again, the declaring of new queues when I add another broker is trivial, I don’t need that automated.
So what have I really gained by using the sharding plugin? Automation of the easy stuff and still with manual effort to get the consumption right. What I really want are Kafka style consumer groups that work with the Consistent Hash exchange. If you are not sure what Kafka consumer groups are then read my RabbitMQ vs Kafka post where I introduce Kafka concepts.
Help Needed
To that end I worked a few months ago on some libraries to provide Kafka consumer group functionality to RabbitMQ. It has since gathered dust but I am still interested in developing the concept. You can see a couple of posts about my efforts here and here. In my talk I said that Pivotal had started working on this feature but this is not the case, it was a misunderstanding in a discussion I had had the day before. So there is no consumer group feature in the works.
After seeing Matteo Cafasso’s talk at the RabbitMQ Summit on building RabbitMQ plugins I am thinking about creating a plugin that could act as the consumer group controller. If anyone is interested in working on (or simply wants to use) a Kafka style consumer group feature for RabbitMQ then let me know. If there’s not much interest then I’ll move on to other projects, but if there is interest then it is something that I would be happy to work on.
Banner image credit: ESO/B. Tafreshi (twanight.org). Link to image.