Reliability - Default Retries - NServiceBus with RabbitMq Part 5

Errors can occur while processing messages and NServiceBus has a few different features to help you recover from failures. Retries are an obvious way of recovering from failure and NServiceBus offers two types of retries.

Immediate Retries

The event handler is executed consecutively until either the message is processed successfully or the immediate retry limit is reached. The default is 5.

Delayed Retries

Once immediate retries have been exhausted it activates the delay retry behaviour. By default it uses either:

  • the Timeout Manager (and two satellite queues: EndpointName.Timeouts and EndpointName.TimeoutsDispatcher). Prior to version 4.3 of the NServiceBus.RabbitMqTransport.

  • a series of delay exchanges and queues. depending on the version of your NServiceBus.RabbitMq library. From 4.3 of the NServiceBus.RabbitMqTransport (currently pre-release)

The default delayed retry limit is 3 with a delay period increment of 10 seconds. So that means that the first delay is 10 seconds, then 20 seconds and finally 30 seconds. Each time the delay period expires the message is placed back on the main endpoint queue. Once these retries are exhausted it will send the message to the error exchange if you have that configured.

You'll have noticed from the previous parts in the series that there is a satellite queue named EndpointName.Retries. From version 6 of NServiceBus, this is no longer used but is still declared as a way of migrating endpoints from version 5 to version 6. If you don't want that exchange and queue to be created then you'll need to implement your own custom topology, implementing the IDeclareQueues interface.

Dangers of the Default Retry Behaviour

Retries are often necessary but if you are not careful you can really shoot yourself in the foot with it. Personally I don't like the default retry behaviour because it does not take into account whether an error is transient or not. An SqlException that occurs because of a primary key violation will be attempted 24 times:

  • 1 attempt - 5 immediate retries

  • 10 second delay -> Place back on main queue

  • 1 attempt - 5 immediate retries

  • 20 second delay -> Place back on main queue

  • 1 attempt - 5 immediate retries

  • 30 second delay -> Place back on main queue

  • 1 attempt - 5 immediate retries

  • Delayed retry limit reached -> send to error exchange

It was a duplicate the first time and every time after that. Not only is this inefficient but could result in duplicate behaviour if the programmer is not careful.

Let's say you call a POST on web service A then insert a record in SQL Server then call a POST on a second web service B. The primary key violation occurs and we perform 23 more attempts and call that POST action on web service A 24 times in total.

This is a simple example and could be avoided, but it would be better to correctly identify errors that can be retried and those that cannot. If the error in this example were a deadlock or connection timeout then a retry might make more sense, though you would still end up calling that web service A again. Retries need to be treated with care! We'll look at detecting transient errors in the next part.

Timeout Manager vs Native Delay Infrastructure

Timeout Manager + SqlPersistence + Competing Consumers = Recipe For Disaster

I have spent some time looking at the source code of the Timeout Manager and SQL persistence and using SQL profiler to analyse behaviour. There is one serious gotcha that you need to be aware of. The Timeout Manager with SQL persistence is not compatible with competing consumers. The problem is that it stores messages in a table and then periodically looks for messages, but there is no blocking between reading data and deleting data so competing consumers can identify the same message and send it to the endpoint exchange at the same time. Then one consumer will delete the message from the table and the second will try to delete it and be unable to as it was already deleted. You can see that the programmer is aware of this in the comments, but with the RabbitMqTransport no rollback of this message sending can be performed at this point and so message duplication will occur.

If you don't have competing consumers then no worries. But if you do then I would recommend MSMQ persistence specifically for timeout behaviour. A bad alternative is to use InMemoryPersistence which is competing consumer safe but will result in message loss if the host dies. You could also implement your own SQL persistence that uses row locking to block concurrent access to the same message in the table. But there is a better way that NServiceBus supports in version 4.3 of the RabbitMqTransport (pre-release at time of writing).

Native RabbitMq Delay Infrastructure

RabbitMq is natively capable of delayed retries by using a secondary exchange, a message Time To Live (TTL) and a Dead Letter Exchange. You create an exchange and queue and you simply apply a message TTL limit to that queue and set it's dead letter exchange as your endpoint's main exchange.

You just need to make sure you add a retry count as a header and limit the retries in order to avoid an infinite loop. The limitation of having one wait queue is that all messages need to have the same TTL. That is because messages are only dead lettered from the head of the queue. So if you have one message with an expiry in 10 minutes and a second message behind it that expires in 1 minute, the second message will be blocked for ten minutes until the first one is dead lettered. So linear and exponential back-off are not possible with a single wait queue.

NServiceBus to the rescue! It has a novel solution to this problem explained nicely here. It is available from version 4.3 of the RabbitMqTransport which at the time of writing is still in pre-release.

A visual representation is as follows:

In this visual representation we only see one endpoint, Billing, but these delay exchanges and queues are shared infrastructure and the final nsb.delay-level exchange binds to all endpoint exchanges whose endpoints have delayed retries configured. So in the retail demo there would be 12 bindings for the Sales, Billing and Shipping endpoints (each endpoint having 4 queues).

So you might have a bottleneck here if you have retry peaks when a high message load consumer experiences a period of transient or semi-transient failures.

Code Examples

The code is in Github but we'll take a quick look at what is required for both the Timeout Manager and the native delay infrastructure.

First of all both examples trace event handler activity in a table in a local SQL Server instance. To prepare the database do the following:

  1. Create a database on a local SQL Server instance called NsbRabbitMqRecoverability

  2. In the Rabbit.Sales project, there is an sql file called run_this_first_to_create_trace_table.sql, run this.

Timeout Manager with SQL Persistence Version

The Rabbit.Sales project is a copy of the previous incarnation of this project in the topologies part of this series. We just need to modify it to configure the retries, set up SQL persistence and perform the event handler tracing.

In the Program.cs in the AsyncMain method add the following


var recoverability = endpointConfiguration.Recoverability();
recoverability.Delayed(
    customizations: delayed =>
    {
        delayed.NumberOfRetries(20);
        delayed.TimeIncrease(TimeSpan.FromSeconds(1));
    });

recoverability.Immediate(
    customizations: imm =>
    {
        imm.NumberOfRetries(0);
    });

var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
var connection = @"Data Source=(local);Initial Catalog=NsbRabbitMqRecoverability;Integrated Security=True";
persistence.SqlVariant(SqlVariant.MsSqlServer);
persistence.ConnectionBuilder(
    connectionBuilder: () =>
    {
        return new SqlConnection(connection);
    });
    

For this example we are disabling the immediate retries, we only want to see the delayed retry behaviour. The delayed retries start at 1 second and increment by 1 second for 20 retries.

Now go to the OrderPlacedHandler and make it as follows:


public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
{
    static ILog logger = LogManager.GetLogger<PlaceOrderHandler>();

    public Task Handle(PlaceOrder message, IMessageHandlerContext context)
    {
        logger.Info($"Received PlaceOrder, OrderId = {message.OrderId}");

        LogTrace(message);

        // This is normally where some business logic would occur
        throw new Exception("An exception occurred in the handler.");

        var orderPlaced = new OrderPlaced
        {
            OrderId = message.OrderId
        };
        return context.Publish(orderPlaced);
    }

    private void LogTrace(PlaceOrder message)
    {
        using (var sqlConn = new SqlConnection("Server=(local);Database=NsbRabbitMqRecoverability;Trusted_Connection=true;"))
        {
            sqlConn.Open();
            using (var command = sqlConn.CreateCommand())
            {
                command.CommandText = @"INSERT INTO [dbo].[RetriesTrace]
    ([OrderId]
    ,[AppEntryTime])
VALUES
    (@OrderId
    ,@TimeNow)";

                command.Parameters.Add("OrderId", SqlDbType.Int).Value = message.OrderId;
                command.Parameters.Add("TimeNow", SqlDbType.DateTime).Value = DateTime.Now;

                command.ExecuteNonQuery();
            }
        }
    }
}

Now we're ready to test it out. Make sure the Rabbit.ClientUI and Rabbit.Sales projects are both start up projects and run them. You can keep track of the event handler executions and the actual real delay times by executing the following query:


WITH CTE AS (
SELECT ROW_NUMBER() OVER (PARTITION BY OrderId ORDER BY RetriesTraceId) AS RowNo
	  ,[RetriesTraceId]
      ,[OrderId]
      ,[AppEntryTime]
      ,[DbEntryTime]
  FROM [NsbRabbitMqRecoverability].[dbo].[RetriesTrace]
)

SELECT CTE1.*, DATEDIFF(SECOND, CTE2.AppEntryTime, CTE1.AppEntryTime) AS SecondsDelay
FROM CTE as CTE1
JOIN CTE as CTE2 ON CTE1.OrderId = CTE2.OrderId
	AND CTE1.RowNo - 1 = CTE2.RowNo
    

Once all the retries have finished you should have 400 rows in the table. As you can see from the results below for the first order, the real delay periods are not exactly following the 1s, 2s, 3s, 4s...20s periods, but close enough.

 

Native Delay Infrastructure Version

Create a carbon copy of the Rabbit.Sales project and call it Rabbit.Sales.DelayInfrastructure. Add the NServiceBus.RabbitMq version 4.3 (pre-release at time of writing).

Remove the persistence configuration from the Program.cs and replace it with InMemoryPersistence. Then add:


var delayedDelivery = transport.DelayedDelivery();
delayedDelivery.DisableTimeoutManager();
delayedDelivery.AllEndpointsSupportDelayedDelivery();

Make sure you don't have the SQL persistence nuget packages added to this project.

You're set. Truncate the trace table to remove the results of the last run. Make sure the Rabbit.ClientUI and Rabbit.Sales.DelayInfrastructure are the two start up projects and run it. Press P in the ClientUI to send 20 messages that will fail.

If you go to your RabbitMq management console you'll see activity in the delay queues. You should see 400 rows in the trace table when all retries have been exhausted.

Again, not precisely incrementing in 1s intervals but close enough. After repeated tests the native RabbitMq version does seem a little more erratic but we're talking a few seconds only.

In the next part we'll look at customising our retry behaviour via the use of polices that help us identity transient and non-transient errors.