RabbitMQ Work Queues: Avoiding Data Inconsistency with Rebalanser

With RabbitMQ we can scale-out our consumers by simply adding more, but we can also scale-out our queues. There are a few reasons why scaling out our queues might be preferential to simply adding more consumers to a single queue (competing consumers), one of those reasons is when using the work queue pattern.

Fig 1. Work queue with multiple competing consumers

 

Work Queues

Avoiding competing consumers on a single queue is necessary when ordering is important and/or non-concurrent processing is important. For example, let's say a worker has to perform a small set of functions over a customer. Running two work tasks related to the same customer at the same time may leave the customer data in an inconsistent state.

Or perhaps messages of a given customer sometimes come in next to each other. Let's say a create and an update. Perhaps the create function needs to perform some kind of check against another system that can be a little slow sometimes. When we have competing consumers on the same queue we could end up having the update run first and fail.

In both these cases it would be beneficial to distribute our messages across multiple queues, ensuring that messages of a given customer always go to the same queue that is consumed by a single consumer. That way we ensure no concurrent processing and no out of order processing of a given customer while scaling out.

Topologies

A naive way of modelling this pattern would be using a Topic or Direct exchange using the Customer Id as the routing key. However, if you have a large number of customers then you'd soon have thousands of queues.

Buckets

Another way would be to use consistent hashing and a modulus function to create a consistent routing key. So for example create an MD5 hash of the Customer Id and then convert that to a 64 bit integer (some of the hash is lost here). Then do modulus number of queues.

var numberOfQueues = 10;
var routingKey = BitConverter.ToInt64(CreateMD5Hash(customerId)) % numberOfQueues;

This will create ten buckets in which we can classify each customer. The distribution should be good enough to make it evenly distributed between buckets (given enough customer ids). We use this number between 0 and 9 as the routing key. Because the base is a consistent hash, customer 1000 will always be placed in the same bucket. If you change from 10 buckets to 20 however, the bucket will most likely not be the same anymore.

The below diagram shows five buckets.

Fig 2. Bucketing of Customer Id to route to a fixed number of queues.

Consistent Hash Exchange

A better way of doing it would to be to use the Consistent Hash Exchange. Instead of calculating a hash ourselves, we let the exchange do it. We simply put the Customer Id as the routing key.

Fig 3. Using a Consistent Hash Exchange to partition a single logical queue into multiple physical queues.

This way Customer 1000 always goes to the same queue as the hash of its routing key is always the same. However, if you add more queues, then Customer 1000 will most likely route to a different queue.

Limiting One Consumer Per Queue

Assigning each consumer to a given queue and ensuring that each queue does not have competing consumers can be a pain. You could do it in configuration, but that might be tricky if normally your application instances share the same configuration.

Colleagues of mine working on the .NET stack recently encountered this very problem. So I developed a library for them that performs automatic assignment of queues to consumers, ensuring two consumers cannot consume from the same queue. It also performs automatic rebalancing when the number of queues or consumers changes.

Rebalanser

You wrap your RabbitMQ code in Rebalanser and the library handles coordination with other consumer applications and assigns the queues.

The RebalanserContext offers three events that can be handled:

  • OnAssignment

  • OnCancelAssignment

  • OnError

The recommended way of integrating with Rebalanser is to create a Task (TPL) which contains its own RabbitMQ channel and optionally its own connection for each assigned queue. Ensure that the Task is cancellable via a CancellationToken. When OnAssignment fires, you set up your worker tasks.

When rebalancing is triggered, the OnCancelAssignment event handler is called first and you cancel your active consumer tasks using the CancellationTokenSource. Next OnAssignment is fired and you set up your worker tasks again with a new set of queues.

The below example console application is a member of the "CustomerWorkGroup" resource group. It starts a RebalanserContext and in the OnAssignment event handler it starts an EventingBasicConsumer in a separate Task for each assigned queue.

The OnCancelAssignment event handler fires when rebalancing is triggered and calls Cancel() on the CancellationTokenSource of each worker task.

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Rebalanser.Core;
using Rebalanser.SqlServer;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace ExampleWorkerApp
{
    public class WorkerTask
    {
        public CancellationTokenSource Cts { get; set; }
        public Task Worker { get; set; }
    }

    class Program
    {
        private static List<WorkerTask> workerTasks;

        static void Main(string[] args)
        {
            Providers.Register(new SqlServerProvider("Server=(local);Database=MyWorkerApp;Trusted_Connection=true;"));
            RunAsync().Wait();
        }

        private static async Task RunAsync()
        {
            workerTasks = new List<WorkerTask>();

            using (var context = new RebalanserContext())
            {
                context.OnAssignment += (sender, args) =>
                {
                    var queues = context.GetAssignedResources();
                    foreach (var queue in queues)
                        StartConsumingQueue(queue);
                };

                context.OnCancelAssignment += (sender, args) =>
                {
                    LogInfo("Consumer subscription cancelled");
                    StopAllWork();
                };

                context.OnError += (sender, args) =>
                    LogInfo($"Error: {args.Message}, automatic recovery set to: {args.AutoRecoveryEnabled}, Exception: {args.Exception.Message}");

                await context.StartAsync("CustomerWorkGroup", new ContextOptions() { AutoRecoveryOnError = true, RestartDelay = TimeSpan.FromSeconds(30) });

                Console.WriteLine("Press enter to shutdown");
                while (!Console.KeyAvailable)
                    await Task.Delay(100);

                StopAllWork();
                Task.WaitAll(workerTasks.Select(x => x.Client).ToArray());
            }
        }

        private static void StartConsumingQueue(string queueName)
        {
            LogInfo("Subscription started for queue: " + queueName);
            var cts = new CancellationTokenSource();

            var task = Task.Factory.StartNew(() =>
            {
                try
                {
                    var factory = new ConnectionFactory() { HostName = "localhost" };
                    using (var connection = factory.CreateConnection())
                    using (var channel = connection.CreateModel())
                    {
                        channel.basicQos(1); // one message at a time
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            LogInfo($"{queueName} Received {message}");
                            // start work here -------
                            // ...
                            // ...
                            // end of work -------

                            // acknowledge message (you'll need error handling also)
                            channel.BasicAck(ea.DeliveryTag, false);
                        };
                        channel.BasicConsume(queue: queueName,
                                             autoAck: false,
                                             consumer: consumer);

                        while (!cts.Token.IsCancellationRequested)
                            Thread.Sleep(100);
                    }
                }
                catch (Exception ex)
                {
                    LogError(ex.ToString());
                }

                if (cts.Token.IsCancellationRequested)
                    LogInfo("Cancellation signal received for " + queueName);
                else
                    LogInfo("Consumer stopped for " + queueName);
            }, TaskCreationOptions.LongRunning);

            workerTasks.Add(new WorkerTask() { Cts = cts, Worker = task });
        }

        private static void StopAllWork()
        {
            foreach (var ct in workerTasks)
                ct.Cts.Cancel();
        }

        private static void LogInfo(string text)
        {
            Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss,fff")}: INFO  : {text}");
        }

        private static void LogError(string text)
        {
            Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss,fff")}: ERROR  : {text}");
        }
    }
}

Rebalanser uses SQL Server as its backend and requires three tables to function. You can host these tables in a dedicated database or add them to the database of your application. Simply add the name of the worker group to the table ResourceGroups and the list of queues that form that group:

INSERT INTO [RBR].[ResourceGroups]([ResourceGroup],[LeaseExpirySeconds], [HeartbeatSeconds])
VALUES('CustomerWorkGroup',60, 25)

Also, for each queue that your Consistent Hash Exchange has a binding to, add a record in the Resources table:
INSERT INTO [RBR].[Resources]([ResourceGroup],[ResourceName])
VALUES
('CustomerWorkGroup','CustomersWorkQueue.1'),
('CustomerWorkGroup','CustomersWorkQueue.2'),
('CustomerWorkGroup','CustomersWorkQueue.3'),
('CustomerWorkGroup','CustomersWorkQueue.4'),
('CustomerWorkGroup','CustomersWorkQueue.5')

Run your consumers and they'll automatically consume all the queues between them, ensuring that each BasicEventingConsumer processes the messages of its sole queue sequentially.

Conclusion

Data consistency is critical in any system and competing consumers on a single queue can introduce consistency issues due to overlapping work or out of order processing.

Partitioning messages across multiples queues using consistent hashing or buckets is a great solution. Rebalanser makes it easy due to dynamic queue assignment, guaranteeing one consumer per queue.

For more information about Rebalanser and how to use it see:

UPDATE

I have since moved the project here. I am working on getting the SQL Server and an Apache ZooKeeper version to a v1.0 quality status. You can track progress here: https://github.com/Rebalanser/wiki/wiki