Custom Topology - NServiceBus with RabbitMq Part 4

In this post we'll implement a class that inherits from IRoutingTopology and that plugs into NServiceBus to create a custom routing topology. There is also the IDeclareQueues interface that we could also implement but I doubt I would ever need to. NServiceBus creates a bunch of queues in addition to the main endpoint queue, for things like retries and timeouts. I don't want to mess with these as they are important for how NServiceBus gives us the functionality that it does. So in this post we'll just be customising the RabbitMq artefacts beyond the immediate endpoint queues.

Let's look at the methods in IRoutingTopology, what they do in the conventional routing topology and when they are called.

IRoutingTopology and the Conventional Routing Topology

void Initialize(IModel channel, string main)

This is the first method to be called and is called after queue declaration (IDeclareQueues), so for the Rabbit.Billing endpoint the local situation before Initialize is called would be as follows:

 

In each of the four calls to Initialize, a queue name is passed as a parameter. A fanout exchange is declared for that queue, with the same name as the queue and an exchange-queue binding is declared. So after Initialize has been called for each of the four queues the situation is:

Next, SetupSubscription is called.

void SetupSubscription(IModel channel, Type type, string subscriberName)

This method is called once for each event handled by the endpoint. So in the case of the Rabbit.Billing endpoint, it handles thes OrderPlaced and OrderCancelled events. So the type parameter is the event type and the subscriber name is the name of the endpoint "Rabbit.Billing".

Each time this is called, the method does the following (we'll use the OrderPlaced event as an example):

  • Declare a fanout exchange based on the name of the event type. So for OrderPlaced, it will declare the "Messages.Events:OrderPlaced" fanout exchange

  • It will recursively walk the base types of this event type, declaring fanout exchanges and binding them to the "Messages.Events:OrderPlaced" exchange

  • It will get the interface types that the OrderPlaced event implements and for each it will declare an exchange and bind it to the "Messages.Events:OrderPlaced" exchange.

  • Finally, it will bind its own "Rabbit.Billing" fanout exchange to the "Messages.Events:OrderPlaced"

Now the situation is as follows:

Send and Publish Method

The IRoutingTopology also has the Send and Publish methods. In the conventional routing topology the code looks as follows:


public void Publish(IModel channel, Type type, OutgoingMessage message, IBasicProperties properties)
{
    SetupTypeSubscriptions(channel, type);
    channel.BasicPublish(ExchangeName(type), String.Empty, false, properties, message.Body);
}

public void Send(IModel channel, string address, OutgoingMessage message, IBasicProperties properties)
{
    channel.BasicPublish(address, String.Empty, true, properties, message.Body);
}

The Publish method calls the same type subscriptions code as the SetupSubscriptions method. So in the case of the Rabbit.Billing endpoint, it publishes the OrderBilled event. This will declare the Messages.Events:OrderBilled fanout exchange and its sub classes and interfaces. Though it tracks what it has already declared, so in this case the other base type and interface exchanges were already declared in the SetupSubscriptins method, so it does not bother to do that again.

So if the Messages.Events:OrderBilled fanout exchange does not exist, then the Publish invocation will work because it ensures it exists before calling BasicPublish on the RabbitMq channel.

Notice though that for Send, no declarations take place. It assumes that there is an exchange with the name of the recipient endpoint and it will fail if that exchange does not exist (AlreadyClosedException will occur. The ShutdownReason.ReplyCode will be 404).

So after publishing an OrderBilled event the situation is as follows:

 

Last Two Methods

There is the TeardownSubscription that simply unbinds the "Rabbit.Billing" exchange from the "Messages.Events:OrderPlaced" exchange and no more.

There is the RawSendInCaseOfFailure method which I currently don't know about. I will be tackling dealing with RabbitMq failures within NServiceBus later in the series and we'll take a deep look at this.

Let's Create Our Own IRoutingTOPology Implementation

If you have been following the series, we currently have the following message passing workflow:

Now we want to add another endpoint that wants to know about all OrderPlaced and OrderCancelled events related to clients that are Gold members and part of the Insider Program where clients get access to new features first. Only 1% of the clients are both Gold and Insiders.

What we want is that when the Rabbit.CustomRouting.GoldInsiderClientServices endpoint declares its exchange, that it creates a headers exchange instead of a fanout exchange. Then when it binds its queue to that exchange it wants to add the "membership" and "insider" headers to the binding. This way it only receives the OrderPlaced and OrderCancelled events that it is interested in and we don't need to modify the general routing topology. We just customize the private exchange and binding of this endpoint.

RetailDemoGoldInsiders.png

The basic steps that we need to do are as follows:

  • Create the four endpoints we have previously been working with in a new solution folder: RabbitMqTransportCustomTopology. Also add the new Rabbit.CustomRouting.GoldInsiderClientServices console application that is identical to the Billing endpoint except for naming.

  • Create a class that implements IRoutingTopology

  • Plug it into NServiceBus

  • Change the log message in the event handlers

  • Modify the Sales endpoint to add the headers "membership" and "insider" to all OrderPlaced and OrderCancelled events

  • So we can verify that it all works, modify the ClientUI endpoint to create orders from gold, silver, insider and non insider clients.

Create the Class that Implements IRoutingTopology

We are only tweaking the conventional routing topology to change the endpoints own exchange and exchange biding. We don't need to customise the event type exchanges and bindings. But even so, we need to implement all of that. So let's copy the code from the ConventionalRoutingTopology class in the NServiceBus.RabbitMq library into our own ClientServicesRoutingTopology class. You can see the ConventionalRoutingTopology code on Github here.

Now we need to make the following customisations:

  • Create a new private method called CreateHeadersExchange. We can copy the existing CreateExchange method and change the ExchangeType.Fanout to ExchangeType.Headers.

  • In the Initialize method, call the CreateHeaderExchange method instead of the CreateExchange method when the queue name matches the endpoint name.


public void Initialize(IModel channel, string mainQueue)
{
    if (mainQueue.Equals("Rabbit.CustomRouting.GoldInsiderClientServices"))
    {
        CreateHeadersExchange(channel, mainQueue);

        var bindingArguments = new Dictionary<string, object>();
        bindingArguments.Add("x-match", "all");
        bindingArguments.Add("membership", "gold");
        bindingArguments.Add("insider", "1");

        channel.QueueBind(mainQueue, mainQueue, string.Empty, bindingArguments);
    }
    else
    {
        CreateExchange(channel, mainQueue);
        channel.QueueBind(mainQueue, mainQueue, string.Empty);
    }
}

That's it for customising our custom routing topology class.

Plug it into NServiceBus

In our Program.cs AsyncMain method, add the following code after the transport instantiation code:


transport.UseRoutingTopology(
    topologyFactory: createDurableExchangesAndQueues =>
    {
        return new ClientServicesRoutingTopology(createDurableExchangesAndQueues);
    });
    

Now we just need to make sure the right headers are added to the OrderPlaced and OrderCancelled events.

Adding the Right Headers to the Events

I have added ClientId to the PlaceOrder and CancelOrder commands, and to the OrderPlaced and OrderCancelled events. Now in the event handler for each of the two commands I use the ClientId to identify whether the client is Gold or Silver and whether they are in the Insiders Program.

In the CancelOrderHandler below, we add the right headers to the OrderCancelled event according to the ClientId.


public class CancelOrderHandler : IHandleMessages<CancelOrder>
{
    static ILog logger = LogManager.GetLogger<CancelOrderHandler>();

    public Task Handle(CancelOrder message, IMessageHandlerContext context)
    {
        logger.Info($"Received CancelOrder, OrderId = {message.OrderId}");

        // This is normally where some business logic would occur

        var orderCancelled = new OrderCancelled()
        {
            OrderId = message.OrderId,
            ClientId = message.ClientId
        };

        var publishOptions = new PublishOptions();
        publishOptions.SetHeader("insider", GetInsiderProgramValue(message));
        publishOptions.SetHeader("membership", GetMembership(message));

        return context.Publish(orderCancelled, publishOptions);
    }

    private string GetInsiderProgramValue(CancelOrder cancelOrder)
    {
        // get some data from a database or something

        if (cancelOrder.ClientId.Equals("SuperImportantClientLtd"))
            return "1";

        return "0";
    }

    private string GetMembership(CancelOrder cancelOrder)
    {
        // get some data from a database or something

        if (cancelOrder.ClientId.Equals("SuperImportantClientLtd") || cancelOrder.ClientId.Equals("AnotherSuperImportantClientLtd"))
            return "gold";

        return "silver";
    }
}

We repeat this for the PlaceOrderHandler.

Now we just need to set the ClientId in our ClientUI endpoint.

Add ClientId to Commands Sent By ClientUI


static async Task RunLoop(IEndpointInstance endpointInstance)
{
    while (true)
    {
        log.Info("Press '1' to place an order by a Gold Insider client");
        log.Info("Press '2' to place an order by a Gold Standard client");
        log.Info("Press '3' to place an order by a Silver client");
        log.Info("Press '4' to cancel an order by a Gold Insider client");
        log.Info("Press '5' to cancel an order by a Gold Standard client");
        log.Info("Press '6' to cancel an order by a Silver client");
        log.Info("Press 'Q' to quit.");
        var key = Console.ReadKey();
        Console.WriteLine();

        switch (key.Key)
        {
            case ConsoleKey.NumPad1:
                var command = new PlaceOrder { OrderId = Guid.NewGuid().ToString(), ClientId = "SuperImportantClientLtd" };
                log.Info($"Sending Gold Insider PlaceOrder command, OrderId = {command.OrderId}");
                await endpointInstance.Send(command).ConfigureAwait(false);
                break;

            case ConsoleKey.NumPad2:
                var command2 = new PlaceOrder { OrderId = Guid.NewGuid().ToString(), ClientId = "AnotherSuperImportantClientLtd" };
                log.Info($"Sending Gold Standard PlaceOrder command, OrderId = {command2.OrderId}");
                await endpointInstance.Send(command2).ConfigureAwait(false);
                break;

            case ConsoleKey.NumPad3:
                var command3 = new PlaceOrder { OrderId = Guid.NewGuid().ToString(), ClientId = "NotSoImportantClient" };
                log.Info($"Sending Silver PlaceOrder command, OrderId = {command3.OrderId}");
                await endpointInstance.Send(command3).ConfigureAwait(false);
                break;

            case ConsoleKey.NumPad4:
                var command4 = new CancelOrder { OrderId = Guid.NewGuid().ToString(), ClientId = "SuperImportantClientLtd" };
                log.Info($"Sending Gold Insider CancelOrder command, OrderId = {command4.OrderId}");
                await endpointInstance.Send(command4).ConfigureAwait(false);
                break;

            case ConsoleKey.NumPad5:
                var command5 = new CancelOrder { OrderId = Guid.NewGuid().ToString(), ClientId = "AnotherSuperImportantClientLtd" };
                log.Info($"Sending Gold Standard CancelOrder command, OrderId = {command5.OrderId}");
                await endpointInstance.Send(command5).ConfigureAwait(false);
                break;

            case ConsoleKey.NumPad6:
                var command6 = new CancelOrder { OrderId = Guid.NewGuid().ToString(), ClientId = "NotSoImportantClient" };
                log.Info($"Sending Silver PlaceOrder command, OrderId = {command6.OrderId}");
                await endpointInstance.Send(command6).ConfigureAwait(false);
                break;

            case ConsoleKey.Q:
                return;

            default:
                log.Info("Unknown input. Please try again.");
                break;
        }
    }
}

That is it. Add all five projects to the solution multiple startup projects and run them. Pressing 1 and 4 will result in a message appearing in the Rabbit.CustomRouting.GoldInsiderClientServices console window. Pressing 2, 3, 5 and 6 will create orders that get filtered out by the headers exchange.

 

With great power comes great responsibility! Be careful with this. In our example we just customised a private exchange and binding but if you go further then you need to make sure all endpoints that declare shared exchanges need to all declare them with the same properties or else you'll have problems. Also take into account that we copied and pasted the ConventionalRoutingTopology class, and the next version of NServiceBus.RabbitMq may change and cause breakages with your custom implementations. 

Find all the code on Github.