Processing Pipelines Series - TPL Dataflow - Alternate Scenario

Code for this post is on Github here.

In the last post we built a TPL Dataflow pipeline based on the scenario from our first post in the series. Today we'll build another pipeline very similar to the first but with different requirements around latency and data loss.

In the first scenario we could not slow down the producer as slowing it down would cause data loss (it read from a bus that would not wait if you weren't there to consume the data). We also cared a lot about ensuring the first stage kept up with the producer and successfully wrote every message to disk. The rest was best effort, and we performed load-shedding so as not to slow down the producer.

Alternate Scenario - Back Pressure

The new scenario is that we can slow down the producer as much as we want. It reads from a durable queue that is for our purposes unbounded. We need that the whole pipeline processes every message and if one stage goes slow then we need the whole pipeline to slow down to accomodate it. Latency is not an issue. For this scenario we need back-pressure that can flow from the leaves all the way back to the producer.

We need to replace the BroadcastBlocks with something else as they act as bulk heads, preventing pressure from passing them. We'll replace them with TransformManyBlocks and a couple of wrapper classes for our DecodedMessage class.

Fig 1. The pipeline with BroadcastBlocks

TransformMany as a BroadcastBlock

The basic idea is that for each DecodedMessage that the block consumes, it outputs a list of RoutedMessage, one for each branch at the fork.

We also have to do a broadcast of batches of messages. So we create a TransformManyBlock that receives RoutedMessage[] and outputs a RoutedBatch.

public class RoutedMessage
{
    public RoutedMessage(int routeKey, DecodedMessage message)
    {
        RouteKey = routeKey;
        Message = message;
    }

    public int RouteKey { get; set; }
    public DecodedMessage Message { get; set; }
}

public class RoutedBatch
{
    public RoutedBatch(int routeKey, IEnumerable<DecodedMessage> messages)
    {
        RouteKey = routeKey;
        Messages = messages;
    }

    public int RouteKey { get; set; }
    public IEnumerable<DecodedMessage> Messages { get; set; }
}

Then when we link the block to multiple downstream blocks, we use a lambda to filter on the RouteKey property.

msgBranchBlock.LinkTo(realTimeFeedBlock, linkOptions, routedMsg => routedMsg.RouteKey == 1); // route on the key
msgBranchBlock.LinkTo(oneSecondBatchBlock, linkOptions, routedMsg => routedMsg.RouteKey == 2); // route on the key
msgBranchBlock.LinkTo(thirtySecondBatchBlock, linkOptions, routedMsg => routedMsg.RouteKey == 3); // route on the key

This is how we can perform a broadcast and keep the back-pressure that we want.

Next we need the producer to slow down when the block it posts to has full buffers.

private void ConsumeWithBackPressure(ITargetBlock<RawBusMessage> target, CancellationToken token, TimeSpan interval)
{
    long lastTicks = 0;
    while (!token.IsCancellationRequested)
    {
        _counter++;
        var reading = _dataBus.Read();

        var message = new RawBusMessage();
        message.Data = reading.Data;
        message.ReadingTime = new DateTime(reading.Ticks);
        message.Counter = _counter;

        if (lastTicks < reading.Ticks)
        {
            // keep trying to post the message until it gets accepted
            while (!target.Post(message))
                Thread.Sleep((int)interval.TotalMilliseconds);
        }

        lastTicks = reading.Ticks;

        Thread.Sleep(interval);
    }
}

We basically keep trying to post it until the receiving block has space in its buffer.

The code to set up the pipeline is as follows:

public async Task StartPipelineWithBackPressureAsync(CancellationToken token)
{
    _decoder.LoadSensorConfigs();

    // Step 1 - Configure the pipeline

    // make sure our complete call gets propagated throughout the whole pipeline
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

    // create our block configurations
    var largeBufferOptions = new ExecutionDataflowBlockOptions() { BoundedCapacity = 6000 };
    var smallBufferOptions = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 };
    var parallelizedOptions = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100, MaxDegreeOfParallelism = 4 };
    var batchOptions = new GroupingDataflowBlockOptions() { BoundedCapacity = 1000 };

    // create some branching functions for our TransformManyBlocks
    // DecodedMessage gets tranformed into 3 RoutedMessage for the first three way branch
    Func<DecodedMessage, IEnumerable<RoutedMessage>> messageBranchFunc = x => new List<RoutedMessage>
        {
            new RoutedMessage(1, x),
            new RoutedMessage(2, x),
            new RoutedMessage(3, x)
        };

    // DecodedMessage[] gets tranformed into a RoutedBatch for the final branch
    Func<RoutedMessage[], IEnumerable<RoutedBatch>> batchBranchFunc = x => new List<RoutedBatch>
        {
            new RoutedBatch(1, x.Select(c => c.Message).ToList()),
            new RoutedBatch(2, x.Select(c => c.Message).ToList())
        };

    // define each block
    var writeRawMessageBlock = new TransformBlock<RawBusMessage, RawBusMessage>(async (RawBusMessage msg) =>
    {
        await _messageFileWriter.WriteAsync(msg);
        return msg;
    }, largeBufferOptions);

    var decoderBlock = new TransformManyBlock<RawBusMessage, DecodedMessage>(
        (RawBusMessage msg) => _decoder.Decode(msg), largeBufferOptions);


    var msgBranchBlock = new TransformManyBlock<DecodedMessage, RoutedMessage>(messageBranchFunc, largeBufferOptions);

    var realTimeFeedBlock = new ActionBlock<RoutedMessage>(async (RoutedMessage routedMsg) => 
            await _realTimeFeedPublisher.PublishAsync(routedMsg.Message), largeBufferOptions);

    var thirtySecondBatchBlock = new BatchBlock<RoutedMessage>(9000, batchOptions);
    var thirtySecondStatsFeedBlock = new ActionBlock<RoutedMessage[]>(async (RoutedMessage[] batch) =>
            await _statsFeedPublisher.PublishAsync(batch.Select(x => x.Message).ToList(), TimeSpan.FromSeconds(30)), smallBufferOptions);

    var oneSecondBatchBlock = new BatchBlock<RoutedMessage>(300, batchOptions);
    var batchBroadcastBlock = new TransformManyBlock<RoutedMessage[], RoutedBatch>(batchBranchFunc, smallBufferOptions);

    var oneSecondStatsFeedBlock = new ActionBlock<RoutedBatch>(async (RoutedBatch batch) =>
            await _statsFeedPublisher.PublishAsync(batch.Messages.ToList(), TimeSpan.FromSeconds(1)), smallBufferOptions);

    var dbPersistenceBlock = new ActionBlock<RoutedBatch>(async (RoutedBatch batch) => 
            await _dbPersister.PersistAsync(batch.Messages.ToList()), parallelizedOptions);

    // link the blocks to together
    writeRawMessageBlock.LinkTo(decoderBlock, linkOptions);
    decoderBlock.LinkTo(msgBranchBlock, linkOptions);
    msgBranchBlock.LinkTo(realTimeFeedBlock, linkOptions, routedMsg => routedMsg.RouteKey == 1); // route on the key
    msgBranchBlock.LinkTo(oneSecondBatchBlock, linkOptions, routedMsg => routedMsg.RouteKey == 2); // route on the key
    msgBranchBlock.LinkTo(thirtySecondBatchBlock, linkOptions, routedMsg => routedMsg.RouteKey == 3); // route on the key
    thirtySecondBatchBlock.LinkTo(thirtySecondStatsFeedBlock, linkOptions);
    oneSecondBatchBlock.LinkTo(batchBroadcastBlock, linkOptions);
    batchBroadcastBlock.LinkTo(oneSecondStatsFeedBlock, linkOptions, routedMsg => routedMsg.RouteKey == 1); // route on the key
    batchBroadcastBlock.LinkTo(dbPersistenceBlock, linkOptions, routedMsg => routedMsg.RouteKey == 2); // route on the key

    // Step 2 - Start consuming the machine bus interface (the producer)
    var consumerTask = _dataBusReader.StartConsuming(writeRawMessageBlock, token, TimeSpan.FromMilliseconds(10));

    // Step 3 - Keep going until the CancellationToken is cancelled.
    while (!token.IsCancellationRequested
        && !oneSecondStatsFeedBlock.Completion.IsCompleted
        && !dbPersistenceBlock.Completion.IsCompleted
        && !realTimeFeedBlock.Completion.IsCompleted
        && !thirtySecondStatsFeedBlock.Completion.IsCompleted)
    {
        await Task.Delay(500);
    }

    // Step 4 - the CancellationToken has been cancelled and our producer has stopped producing
    // call Complete on the first block, this will propagate down the pipeline
    writeRawMessageBlock.Complete();

    // wait for all leaf blocks to finish processing their data
    await Task.WhenAll(oneSecondStatsFeedBlock.Completion,
        thirtySecondStatsFeedBlock.Completion,
        dbPersistenceBlock.Completion,
        realTimeFeedBlock.Completion,
        consumerTask);

    // clean up any other resources like ZeroMQ for example
}

So we can that TPL dataflow can easily support both the message rate control mechanisms of back-pressure and load-shedding. Next up Reactive Extensions (Rx.NET).

Code is on Github. Posts in Series: