Reliability - Custom Retries - NServiceBus with RabbitMq Part 6

NServiceBus provides really nice customisation features for your recoverability functionality. We already saw how you can customise the immediate and delayed retries. In this post we'll look at implementing our own custom policy which allows us to make decisions based on the exceptions that are thrown in the message handlers.

The NServiceBus documentation on customising your recoverability policy is well explained and so I recommend you read their documentation first. 

Now having read their docs and you understand how to customise a policy, let's look at taking it further.

Transient By Default or Persistent By Default?

All the documentation and code samples demonstrate having to prove an exception is persistent and assuming that otherwise it must be transient. This means the code samples lean towards performing retries by default and under specific circumstances sending a message to the error exchange. This will be what you want in many scenarios but in other scenarios you might want to be more conservative about your retries.

You can easily reverse that bias and move from the code:


RecoverabilityAction MyCustomRetryPolicy(RecoverabilityConfig config, ErrorContext context)
{
    if (context.Exception is MyBusinessException)
        return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
    
    return DefaultRecoverabilityPolicy.Invoke(config, context);
}

To a MoveToError by default


RecoverabilityAction MyCustomRetryPolicy(RecoverabilityConfig config, ErrorContext context)
{
    if (context.Exception is MyBusinessException)
        return DefaultRecoverabilityPolicy.Invoke(config, context);
    
    return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
}

Per Message Policy

The documentation shows examples of a single policy that applies to all messages but perhaps you want different policies to be executed depending on the message type.

You can access that information via a header in the message called "NServiceBus.EnclosedMessageTypes" which contains the full message type name and assembly information. So for example the PlaceOrder command has a value of "Messages.Commands.PlaceOrder, Messages, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null".

This can let you customise how you deal with exceptions. Perhaps some messages need to be treated differently.

Transient Error Detection

An SqlException might be a deadlock (transient) or an incorrect syntax error (persistent), or a myriad of other error types. All of which are transient, semi-transient or persistent. Using the exception type is not enough to identify if it is transient or persistent.

So we really need some extra functionality that we can plug into NServiceBus to give us some more fine control over transient error detection and response.

A few weeks ago I developed a proof of concept transient error and retry policy library, like a simpler version of Polly, but specifically for RabbitMq. It allows you to define what is transient/persistent, what to do on each type of error (retry, send to error) and also a circuit breaker. So I was really happy to see that NServiceBus had something similar. So I have taken part of that code I wrote and modified it to play nicely with NServiceBus.

Basically you can create multiple Fail Policies and for each policy you:

  • define if the policy treats exceptions as transient by default or persistent by default
  • define rules for whether exceptions are transient, semi-transient or persistent by either using the exception type or defining one or more custom error classifiers.

I have built a couple of error classifiers, one for SqlExceptions and another for FTP errors.


public class SqlErrorClassifier : IErrorClassifier
{
    public ErrorCategory GetCategory(Exception ex)
    {
        if (ex.GetType().Equals(typeof(SqlException)))
        {
            SqlException sqlEx = (SqlException)ex;

            if (sqlEx.Number == 1205 // 1205 = Deadlock
                || sqlEx.Number == -2 // -2 = TimeOut
                || sqlEx.Number == -1 // -1 = Connection
                || sqlEx.Number == 2 // 2 = Connection
                || sqlEx.Number == 53 // 53 = Connection
                )
            {
                return ErrorCategory.Transient;
            }
            else
            {
                return ErrorCategory.Persistent;
            }
        }

        // if it isn't an SqlException then this detector cannot give an opinion
        return ErrorCategory.Unknown;
    }
}

public class FtpErrorClassifier : IErrorClassifier
{
    public ErrorCategory GetCategory(Exception ex)
    {
        if (ex.GetType().Equals(typeof(WebException)))
        {
            var webException = (WebException)ex;

            var ftpResponse = ((FtpWebResponse)webException.Response);
            if (ftpResponse.StatusCode == FtpStatusCode.ConnectionClosed)
            {
                return ErrorCategory.Transient;
            }
            else if (ftpResponse.StatusCode == FtpStatusCode.ServiceNotAvailable
                || ftpResponse.StatusCode == FtpStatusCode.ServiceTemporarilyNotAvailable)
            {
                return ErrorCategory.SemiTransient;
            }
            else
            {
                return ErrorCategory.Persistent;
            }
        }

        // if it isn't a WebException then this detector cannot give an opinion
        return ErrorCategory.Unknown;
    }
}

You set up your policies. In this case I have one policy for the PlaceOrder command and another for the CancelOrder command.


static void SetUpPolicies()
{
    FailPolicy.CreatePolicyWithName("Messages.Commands.PlaceOrder")
        .PersistentByDefault()
        .ClassifyAsSemiTransient<FraudDetectionUnavailableException>()
        .AddTransientClassifier(new SqlErrorClassifier());

    FailPolicy.CreatePolicyWithName("Messages.Commands.CancelOrder")
        .TransientByDefault()
        .ClassifyAsPersistent<OrderNotFoundException>();
}

Then we create our custom recoverability policy.


static RecoverabilityAction CustomPolicy(RecoverabilityConfig config, ErrorContext context)
{
    var errorCategory = ErrorCategory.Unknown;

    if (context.Message.Headers.ContainsKey("NServiceBus.EnclosedMessageTypes"))
    {
        if(context.Message.Headers["NServiceBus.EnclosedMessageTypes"].StartsWith("Messages.Commands.PlaceOrder"))
            errorCategory = FailPolicy.GetPolicy("Messages.Commands.PlaceOrder").GetCategory(context.Exception);
        else if (context.Message.Headers["NServiceBus.EnclosedMessageTypes"].StartsWith("Messages.Commands.CancelOrder"))
            errorCategory = FailPolicy.GetPolicy("Messages.Commands.CancelOrder").GetCategory(context.Exception);
    }

    if (errorCategory == ErrorCategory.Persistent)
    {
        return RecoverabilityAction.MoveToError(config.Failed.ErrorQueue);
    }
    else if (errorCategory == ErrorCategory.SemiTransient)
    {
        return RecoverabilityAction.DelayedRetry(TimeSpan.FromSeconds(60));
    }

    // invocation of default recoverability policy
    return DefaultRecoverabilityPolicy.Invoke(config, context);
}

Using the message headers, we decide which Fail Policy to use and ask it if the error is transient, semi-transient or persistent.

When an error does not match any registered exception type and no classifiers are able to classify it, then the Fail Policy returns the default error category.

For the PlaceOrderHandler, we have set it up as persistent by default, with an SQL classifier and the exception type FraudDetectionUnavailableException used to identify exceptions as semi-transient.

The PlaceOrderHandler looks like this:


public class PlaceOrderHandler : IHandleMessages<PlaceOrder>
{
    static ILog logger = LogManager.GetLogger<PlaceOrderHandler>();

    public Task Handle(PlaceOrder message, IMessageHandlerContext context)
    {
        logger.Info($"Received PlaceOrder, OrderId = {message.OrderId}");

        LogTrace(message);

        // This is normally where some business logic would occur

        // 25% of messages always fail with a missing parameter error - persistent
        // all messages fail 10% of the time with a deadlock - transient
        // all messages fail 10% of the time with a FraudDetectionUnavailableException - transient
        var d = message.GetHashCode();
        if (d > (int.MaxValue / 4) * 3)
        {
            throw SqlExceptionCreator.NewSqlException(137); // Must declare the scalar variable - PERSISTENT
        }
        else if (new Random(Guid.NewGuid().GetHashCode()).NextDouble() > 0.90)
        {
            throw SqlExceptionCreator.NewSqlException(1205); // deadlock - TRANSIENT
        }
        else if (new Random(Guid.NewGuid().GetHashCode()).NextDouble() > 0.80)
        {
            throw new FraudDetectionUnavailableException("Fraud detection system is down for a short time"); // SEMI TRANSIENT
        }

        var orderPlaced = new OrderPlaced
        {
            OrderId = message.OrderId
        };
        return context.Publish(orderPlaced);
    }

    private void LogTrace(PlaceOrder message)
    {
        // cut out for brevity
    }
}

The CancelOrderHandler always throws a SqlException.

If you run the code you'll see how for PlaceOrder commands, 25% of messages get sent to the error exchange straight away. Then for the rest there will be some retries due to the intermittent nature of the transient exceptions.

The full code is in Github. You want to add the Rabbit.ClientUI and Rabbit.Sales.CustomRecovery projects as start up projects.

I am taking a break from writing about NServiceBus for a little while but I hope to continue this series in the near future.