Custom Direct Topology - NServiceBus with RabbitMq Part 3 — Jack Vanlightly

Custom Direct Topology - NServiceBus with RabbitMq Part 3

In this post we'll look at how we can customise the Direct Routing Topology. The source code is on Github.

Create a new solution folder and called it RabbitMqTransportCustomDirectTopology and add the four projects named as follows: Rabbit.Custom.ClientUI, Rabbit.Custom.Sales, Rabbit.Custom.Billing and Rabbit.Custom.Shipping.

Custom Direct Routing Topology

The documentation wasn't 100% clear to me so I experimented with this a bit and ended up having to read the NServiceBus.RabbitMq source code to confirm things.

The documentation seems to say that you can use the UseDirectRoutingTopology overload to customise the routing key and exchange name with any type of message.


transport.UseDirectRoutingTopology(
    routingKeyConvention: (eventType) => "Rabbit.Custom.Sales",
    exchangeNameConvention: (address, eventType) => "Sales");
    

In this case I want the Rabbit.Custom.ClientUI to send the PlaceOrder command to the Sales exchange. I want to be able to use a specific exchange instead of the default exchange. But you can't, it just gets ignored. I looked at the source code and found that the Send method in the DirectRoutingTopology class hard-codes the exchange name as String.Empty.


public void Publish(IModel channel, Type type, OutgoingMessage message, IBasicProperties properties)
{
    channel.BasicPublish(ExchangeName(), GetRoutingKeyForPublish(type), false, properties, message.Body);
}

public void Send(IModel channel, string address, OutgoingMessage message, IBasicProperties properties)
{
    channel.BasicPublish(string.Empty, address, true, properties, message.Body);
}

Notice that the Publish (for events) method calls the ExchangeName() method and the GetRoutingKeyForPublish(type) method for the routing key. These methods are actually the Func<string, Type, string> and Func<Type, string> that you pass in the UseDirectRoutingTopology method call.

So we can't customise the direct routing topology for sending commands. But we can customise the routing key and exchange for events.

In the following example we have a different routing key naming convention and we don't use the amq.topic exchange but the Orders topic exchange.

In the Rabbit.Custom.Sales project change the UseDirectRoutingTopology() to:


transport.UseDirectRoutingTopology(
    routingKeyConvention: (eventType) => 
    {
        if (eventType.UnderlyingSystemType.Equals(typeof(OrderPlaced))) // for publishing
            return "Order.Placed";
        else if (eventType.UnderlyingSystemType.Equals(typeof(OrderCancelled))) // for publishing
            return "Order.Cancelled";
        else
            return "Order.Unroutable";
    },
    exchangeNameConvention: (address, eventType) =>
    {
        return "Orders";
    });
    

The routing key and exchange conventions apply to both publishing events and subscribing to events. For publishing it determines which exchange the message is sent to and what the routing key is. For subscribing to events, it determines which exchange the endpoint queue binds to and what the binding key is.

Note that we use the routing key Order.Unroutable in case the event type is different. We'd need to set up a subscriber of that routing key. Alternatively just throw an exception in the else block.

Now in the Rabbit.Custom.Billing project use the following:


transport.UseDirectRoutingTopology(
    routingKeyConvention: (eventType) =>
    {
        if (eventType.UnderlyingSystemType.Equals(typeof(OrderPlaced))) // for queue binding
            return "Order.Placed";
        else if (eventType.UnderlyingSystemType.Equals(typeof(OrderCancelled))) // for queue binding
            return "Order.Cancelled";
        else if (eventType.UnderlyingSystemType.Equals(typeof(OrderBilled))) // for publishing
            return "Order.Billed";
        else
            return "Order.Unroutable";
    },
    exchangeNameConvention: (address, eventType) =>
    {
        return "Orders"; // for publishing and queue binding
    });
    

Here we specify the routing key and exchange for all events, both those that we published and those that we subscribe to.

Finally in the Rabbit.Custom.Shipping project use the following:


transport.UseDirectRoutingTopology(
    routingKeyConvention: (eventType) =>
    {
        if (eventType.UnderlyingSystemType.Equals(typeof(OrderPlaced))) // for queue binding
            return "Order.Placed";
        else if (eventType.UnderlyingSystemType.Equals(typeof(OrderCancelled))) // for queue binding
            return "Order.Cancelled";
        else if (eventType.UnderlyingSystemType.Equals(typeof(OrderBilled))) // for queue binding
            return "Order.Billed";
        else
            return "Order.Unroutable";
    },
    exchangeNameConvention: (address, eventType) =>
    {
        return "Orders"; // for queue binding
    });
    

This will create the following routing topology:

Unfortunately, you can't customise the exchanges further, for example creating one exchange for cancellation events and another for the rest. The reason is that even though the exchangeNameConvention: (address, eventType) => func has two parameters, they are hard coded to null in the NServiceBus.RabbitMq library, so the event type cannot be deduced.

Next we'll look at defining our own fully customised routing topology by implementing a class that implements IRoutingTopology.