Sending Messages in Bulk and Tracking Delivery Status - RabbitMq Publishing Part 2

The code of this blog post can be found on Github.

This is a console application that will create an exchange and queue for you, and allow you to send messages in bulk with message delivery status tracking.

First we'll look at the following design decision you will likely encounter when performing reliable bulk send operations.

Performance/Duplication Trade-Off

When sending messages in a bulk operation you want both decent performance and want best effort reliability - you have two conflicting concerns. When I say reliability I meant that you need every message to get delivered and you ideally want to avoid message duplication.

The problem is that during a connection failure, the more unacknowledged messages you have, the more message duplicates you'll have if you retry those messages that never got acknowledged. But for performance, you want to maximise your unacknowledged messages in flight.

RabbitMq can send a single ack for thousands of messages. It will set the "multiple" flag on the ack. This means that all messages have been acknowledged up to and including that sequence number (delivery tag) and since the sequence number of the last ack (or 1 if this is the first ack). So if you call BasicPublish 100,000 times and then call WaitForConfirms, RabbitMq may only send a handful of acks to cover all of the messages. This is great for performance as we have possibly hundreds or thousands of messages in flight pending response.

But this becomes a problem when a connection failure occurs as those hundreds of messages in flight with no ack are now a headache. Did they get delivered or not? Probably some made it and some didn't. If you resend those messages then you will guarantee that all end up in a queue but will likely end up with many that now have two copies in the queue.

So you could call WaitForConfirms after each message. We only have one message in flight at a time and so if a connection fails, we have a smaller problem on our hands. We will have a maximum of one message without an ack. But performance is terrible!

So you can call WaitForConfirms periodically, to create message batches. The size of the batch depends on your situation and how you want to trade-off performance against possible message duplication on republishing messages.

Message Tracking

BasicPublish is asynchronous. The acknowledgements and returns come via event handlers. So how do we correlate BasicPublish calls to event handler firings?

We do it via a combination of sequence number (Delivery Tag) and MessageId. BasicAcks and BasicNacks return a Delivery Tag (sequence number) but BasicReturn only returns the message (with a MessageId), not the Delivery Tag. So in order to track BasicReturn we ensure that each message has a MessageId property that we can use for tracking purposes.

In the code in GitHub we have an IMessageTracker<T> and a concrete MessageTracker<T>. What this class does is store all the messages to be sent, each wrapped in a MessageState<T> class, which contains information such as the send status, send count etc.

We have different send statuses covering the failure scenarios we discussed in the previous part.


public enum SendStatus
{
    PendingSend, // have not sent the message yet
    PendingResponse, // sent the message, waiting for an ack
    Success, // ack received
    Failed, // nack received
    Unroutable, // message returned
    NoExchangeFound // 404 reply code
}

We pass the MessageTracker<T> to each event handler and the event handler just needs to set the status by either using the Delivery Tag or the MessageId.


private void AckCallback<T>(BasicAckEventArgs ea, MessageTracker<T> messageTracker)
{
    messageTracker.SetStatus(ea.DeliveryTag, ea.Multiple, SendStatus.Success);
}

private void NackCallback<T>(BasicNackEventArgs ea, MessageTracker<T> messageTracker)
{
    messageTracker.SetStatus(ea.DeliveryTag, ea.Multiple, SendStatus.Failed);
}

private void ReturnedCallback<T>(BasicReturnEventArgs ea, MessageTracker<T> messageTracker)
{
    messageTracker.SetStatus(ea.BasicProperties.MessageId,
        SendStatus.Unroutable,
        string.Format("Reply Code: {0} Reply Text: {1}", ea.ReplyCode, ea.ReplyText));
}

private void ModelShutdown<T>(ShutdownEventArgs ea, MessageTracker<T> messageTracker)
{
    if (ea.ReplyCode != 200)
        messageTracker.RegisterChannelClosed("Reply Code: " + ea.ReplyCode + " Reply Text: " + ea.ReplyText);
}

Now that the event handlers, we open a connection and channel, register the event handlers and then iterate over the list and send the messages. 

For each MessageState<T> object, we create the RabbitMq message, get the next sequence number (delivery tag) and register that with the MessageState<T> object and then send the message. In the exception handling we also set the correct states.


// create the RabbitMq message from the MessagePayload (the Order class)
var messageJson = JsonConvert.SerializeObject(messageState.MessagePayload);
var body = Encoding.UTF8.GetBytes(messageJson);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.MessageId = messageState.MessageId;
properties.Headers = new Dictionary<string, object>();

if (messageState.SendCount > 0)
    properties.Headers.Add("republished", true);

// get the next sequence number (delivery tag) and register it with this MessageState object
var deliveryTag = channel.NextPublishSeqNo;
messageTracker.SetDeliveryTag(deliveryTag, messageState);
messageState.Status = SendStatus.PendingResponse;
messageState.SendCount++;

// send the message
try
{
    channel.BasicPublish(exchange: exchange,
                            routingKey: routingKey,
                            basicProperties: properties,
                            body: body,
                            mandatory: true);

    if (counter % messageBatchSize == 0)
        channel.WaitForConfirms(TimeSpan.FromMinutes(1));
}
catch (OperationInterruptedException ex)
{
    if (ex.ShutdownReason.ReplyCode == 404)
        messageTracker.SetStatus(messageState.MessageId, SendStatus.NoExchangeFound, ex.Message);
    else
        messageTracker.SetStatus(messageState.MessageId, SendStatus.Failed, ex.Message);
}
catch (Exception ex)
{
    messageTracker.SetStatus(messageState.MessageId, SendStatus.Failed, ex.Message);
}

 

Once all messages have been sent and either all acknowledgements have been received or the time period for waiting for acknowledgements has expired, we pass back the MessageTracker class as the IMessageTracker<T> to the calling code. The calling code can inspect this tracker to know the final message send status of each message.

There are some extra complications regarding tracking when performing retries. We register each message and its sequence number (delivery tag) with the tracker. But when we create a new channel to resend messages on, all delivery tags are reset. But because we always have a MessageId we can always correct the delivery tag registrations.

Another thing to remember is that when an unroutable message is returned, the BasicReturn is fired first and then an ack is sent, firing the BasicAck second. So if the current status is unroutable then we need to make sure that we don't overwrite that status with Success (ack). This is handled inside the MessageTracker.

Setting a Republished Custom Header

When a retry is performed, we add a custom header "republished". Avoiding duplicate messages is not possible unless you are willing to accept message loss, and message deduplication has many issues also. So the best method of avoiding double processing of messages is to make all message processing idempotent. This may not always be possible as a default behaviour due to issues such as performance (and budget). So setting a republished header can be useful so that the consumer can apply idempotent logic only on messages that have the republished header or redelivered flag. These two are indicators that potentially, this is a duplicate message.

Each message is wrapped in a MessageState<T> class. One of its properties is the SendCount. We increment this property before each call to BasicPublish. When we create the message, if the SendCount is greather than 0 then we add the "republished" header.


var messageJson = JsonConvert.SerializeObject(messageState.MessagePayload);
var body = Encoding.UTF8.GetBytes(messageJson);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.MessageId = messageState.MessageId;
properties.Headers = new Dictionary();

if (messageState.SendCount > 0)
	properties.Headers.Add("republished", true);

 

Message Batch Size

The SendMessagesAsync method in the BulkMessagePublisher class, has a parameter for the message batch size. This refers to when WaitForConfirms is called. The value depends on how you want to balance performance with message duplication on retries.

Testing the Code

Make sure you have RabbitMq installed locally first. Just run the console application and it will create the exchange and queue required. Then you just enter:

  • Number of messages to send
  • Exchange name - enter "order" if you want to test normal message sending and a different exchange name if you want to test a 404 no exchange error.
  • Routing key - enter "new" if you want to test normal message sending, or a different value if you want to test for unroutable messages
  • Message batch size

Then if all goes well, the messages will be sent and you will see how many succeeded, how many retries were performed and how many duplicate messages were created.

Testing Connection Failures and Retries

Send 100000 messages, and then go to the RabbitMq management web console. Go to connections, select the open connection and then click on Force Close.

Find the new connection and repeat this force close again and then wait for the send operation to finish.

Notice that 3 attempts were made due to the two connection failures. We republished 164 messages that were never acknowledged of which 96 ended up as duplicates on the queue.

However, we were able to send all 100000 messages successfully.

Let's do it again but this time we'll keep killing the connection until we exhaust the retries. In this example I tried to send 500,000 in batches of 100 messages, then I forced closed the connection four times.

RetriesExhausted.PNG

So we see that:

  • 4 attempts were made
  • 295 messages were republished, of which 278 became duplicates - so 17 messages were preserved
  • 105215 succeeded, 89 remain unacknowledged, 1 failed as the channel had already failed and 394695 never got sent at all

 

 

No Exchange

This time send 10 messages to the exchange "abc" and we will see the following:

Unroutable - No queues that match

This time send 10 messages to the order exchange but with the routing key "old".

Performance and Message Batch Size

Finally lets compare the performance of sending 10000 messages in a single batch, in 100 batches and 10000 batches (WaitForConfirm after each message is sent).

 One batch, 1415 milliseconds

One batch, 1415 milliseconds

 100 batches of 100 messages, 2210 milliseconds

100 batches of 100 messages, 2210 milliseconds

 10000 batches (one message per batch), 54378 milliseconds

10000 batches (one message per batch), 54378 milliseconds

So the performances degradation was not linear. There wasn't much between having one batch and one hundred batches. But going to calling WaitForConfirms after each message was sent degraded performance significantly.

Next Steps

Play with the code, do some debugging to see for yourself all the behaviours of the C# RabbitMq client under different failure scenarios. One extra scenario you can test is putting a message limit on the "order.new" queue and send 100000 messages. All will send successfully and then if you go to the management web console and view some messages, the OrderId property will be 90001 and above. The first 90000 messages will have been removed and dropped silently. Then try adding a deadletter exchange and queue and this time you'll see 90000 messages appear in your deadletter queue.

In Part 3 we'll look at how to deal with unroutable messages.

Update - June 10th 2017

I have since made a code change to take into account the fact that publisher confirms are not guaranteed to be delivered back to the client in the correct order. Now instead of setting message delivery status as confirms come in, I defer that step to the end once all confirms have arrived, avoiding the confirm reception order issue.

However, this lack of guaranteed receipt of confirms in the correct order does introduce one issue when you open a short-lived channel to publish a batch of messages: if you receive a "multiple" confirm that covers the last message sent, we cannot know definitively that this applies to all messages since the last confirm sequence number. Why? Because there could be a confirm, a nack for example, that has not arrived yet that covers part of that sequence number space in between. The only workaround that I see is to add a safety time period in order to catch this rare event. It's up to you if you think this rare event is worthy of taking into account and whether your messages are important enough. Either way, the example code offers a safety period for this rare circumstance.