Processing Pipelines Series - TPL Dataflow

Source code for this post can be found on Github here.

TPL Dataflow is a data processing library from Microsoft that came out years ago. It consists of different "blocks" that you compose together to make a pipeline. Blocks correspond to stages in your pipeline. If you didn't read the first post in the series then that might not be a bad idea before you read on.

Blocks and Buffers

There multiple types of block available and you can implement your own blocks when the standard ones don't give you what you need. Blocks can be divided into the following categories:

  • Buffering Only

  • Execution

  • Grouping

We'll be building our pipeline with a mix of all categories. Most of our blocks will perform an action, but some need to group messages into time windows and we'll be branching our pipeline with a special type of buffer only block.

Execution Blocks

Execution blocks have two core components:

  • buffers for incoming and outgoing messages (called the Input and Output queues)

  • a Func delegate to perform some action on each message

Fig 1. Each block has a buffer and a Func delegate to perform work

Messages can be buffered when incoming or outgoing. They will be buffered on arrival when the Func delegate is running more slowly than the rate of incoming messages. Messagess will be buffered on the way out when there is no capacity in the input buffer of the next block.

The above diagram shows how incoming and outgoing messages are buffered but we configure a single buffer capacity which is shared between the input and output buffers. So you might also like to visualize the buffers like the following diagram.

Fig 2. Incoming and outgoing buffers with a shared capacity limit.

With each block we can configure:

  • The total capacity of the buffers. Buffers are unbounded by default.

  • The parallelization of the Func delegate. By default blocks process messages sequentially, one at a time.

There are additional things we can configure but we'll not cover them now as they are more advanced topics. We link blocks together to form a chain where the producer pushes work down the pipeline. There is a pull based mechansim using Receive and TryReceive but we'll be using block linking with a push mechanism in our pipeline. 

The blocks we'll use in our pipeline are:

  • TransformBlock (Execution category) - Consists of an input/output buffer and a Func<TInput, TOutput> delegate. For each message it consumes, it outputs another. You can use this block to perform transformations on the incoming messages or perform actions (like writing to a database) and forwarding the message changed or unchanged to the next block.

  • TransformManyBlock (Execution category) - Consists of an input/output buffer and a Func<TInput, IEnumerable<TOutput>> delegate. It is like the TransformBlock but it outputs an IEnumerable<TOutput> for each message it consumes.

  • BroadcastBlock (Buffering category)- Consists of a buffer of one message and a Func<T, T> delegate. The buffer gets overwritten on each new incoming message. The delegate is simply for giving you control of how to clone the message. Message transformation is not possible. The block can be linked to multiple blocks to enable branching of the pipeline. Although it only buffers one message at a time, it will always forward that message on to linked blocks (if they have buffer capacity).

  • BatchBlock (Grouping category) - You tell it the size of each batch you want and it will accumulate messages until it reaches that size and then forward it on as an array of messages to the next block.

  • ActionBlock (Execution category) - Consists of a buffer and an Action<T> delegate. These are the leaves of our pipeline, or graph if you like. They do not forward on messages to any other blocks. They only exist to perform actions on incoming messages.

There are more block types such as: BufferBlock, WriteOnceBlock, JoinBlock and BatchedJoinBlock. We won't cover those blocks as I don't need them for our scenario and we have enough to cover already.

Buffers, Back-Pressure and Load-Shedding

When an incoming buffer reaches its capacity, the outgoing buffer of the block that feeds it will start to fill. When the outgoing buffer is full, the block must pause processing until the buffer has room. This means a bottleneck in one stage can cause the buffers of all preceding blocks to fill up.

Let's say we have three blocks each with a capacity of 10. If they all start out processing at the same speed then the buffers should be mostly empty. But when Block 3 slows down, its input buffer fills. When it reaches capacity, Block 2 starts filling up its own output buffer with outgoing messages. Now the output buffer of Block 2 fills up over time and eventually block 2 must slow down to the same speed as Block 3. Block 1 now starts filling up its output buffer with outgoing messages and must slow down. Ultimately the producer sees the slow down as it cannot post to Block 1 because it's at its total buffer capacity most of the time. The producer can either decide to carry on at the same speed but discard some messages or slow down to the speed of Block 1. In this way TPL Dataflow can easily accomodate both back-pressure and load-shedding.

However, not all blocks pause when the block it feeds becomes full. The BroadcastBlock has a buffer of one message that gets overwritten by each incoming message. So if the BroadcastBlock cannot forward a message to downstream blocks then the message is lost when the next message arrives. This is load-shedding. This means that if you want a load-shedding strategy, then the BroadcastBlock is helpful, if somewhat blunt. We'll use this behaviour to our advantage in our pipeline.

But, if you want back-pressure to control flow throughout the length of your pipeline then a BroadcastBlock in the middle is going to ruin your day. You'll need to use a different mechanism for branching your pipeline. There is a nice example of using a TransformManyBlock to provide branching and maintaining back-pressure on a Microsoft forum question.

The Producer

Producers post messages to the first block. You can use the Post or SendAsync method to do this.

The Post method will instantly return a true or false. True means the message was accepted by the block (its buffer had spare capacity) and false means it refused the message (the buffer is already at capacity or the block has faulted).

The SendAsync method will return a Task<bool>, and so will block (asynchronously) until the block accepts/declines the message or the block has faulted. More on faulted blocks later. The difference between Post and SendAsync seems to be that the block can postpone a decision with SendAsync. What this postponement really means is block specific and I don't know the details right now.

If you want back-pressure to slow down your producer then you can keep trying the SendAsync/Post method in a loop until the message gets accepted.

If you simply want to discard messages when the pipeline is already full then simply ignore the result of the Post/SendAsync method and carry on.

Implementing Our Pipeline

In the last post I described the scenario. So please check that out if you didn't read that. Here is a diagram of the TPL Dataflow pipeline I created.

Fig 3 - Our pipeline with TPL Dataflow blocks

Notice that the Decode block is a TransformManyBlock. We receive 100 raw bus messages per second. Each bus message is simply a byte array which contains 100 other sensor readings in a binary encoded format. So we should get 10000 decoded messages per second arriving at our first BroadcastBlock. In this fabricated example, my fake machine bus only includes 3 sensor readings per message, so we end up with 300 decoded messages per second.

Let's look at the code of how we set that up.

// interfaces of classes that do the work
private IDataBusReader _dataBusReader;
private IMessageFileWriter _messageFileWriter;
private IDecoder _decoder;
private IRealTimePublisher _realTimeFeedPublisher;
private IStatsFeedPublisher _statsFeedPublisher;
private IDbPersister _dbPersister;

public ProcessingPipeline(IDataBusReader dataBusReader,
    IMessageFileWriter messageFileWriter,
    IDecoder decoder,
    IRealTimePublisher realTimePublisher,
    IStatsFeedPublisher s3Uploader,
    IDbPersister dbPersister)
{
    _dataBusReader = dataBusReader;
    _messageFileWriter = messageFileWriter;
    _decoder = decoder;
    _realTimeFeedPublisher = realTimePublisher;
    _statsFeedPublisher = s3Uploader;
    _dbPersister = dbPersister;
}

public async Task StartPipelineAsync(CancellationToken token)
{
    _decoder.LoadSensorConfigs();

    // Step 1 - Configure the pipeline
    // make sure our complete call gets propagated throughout the whole pipeline
    var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

    // create our block configurations
    var largeBufferOptions = new ExecutionDataflowBlockOptions() { BoundedCapacity = 60000 };
    var smallBufferOptions = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 };
    var realTimeBufferOptions = new ExecutionDataflowBlockOptions() { BoundedCapacity = 600 };
    var parallelizedOptions = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100, MaxDegreeOfParallelism = 4 };
    var batchOptions = new GroupingDataflowBlockOptions() { BoundedCapacity = 100 };

    // define each block
    var writeRawMessageBlock = new TransformBlock<RawBusMessage, RawBusMessage>(async (RawBusMessage msg) =>
    {
        await _messageFileWriter.WriteAsync(msg);
        return msg;
    }, largeBufferOptions);

    var decoderBlock = new TransformManyBlock<RawBusMessage, DecodedMessage>(
        (RawBusMessage msg) => _decoder.Decode(msg), largeBufferOptions);

    var broadcast = new BroadcastBlock<DecodedMessage>(msg => msg);

    var realTimeFeedBlock = new ActionBlock<DecodedMessage>(async 
        (DecodedMessage msg) => await _realTimeFeedPublisher.PublishAsync(msg), realTimeBufferOptions);

    var oneSecondBatchBlock = new BatchBlock<DecodedMessage>(300);
    var thirtySecondBatchBlock = new BatchBlock<DecodedMessage>(9000);
    var batchBroadcastBlock = new BroadcastBlock<DecodedMessage[]>(msg => msg);

    var oneSecondStatsFeedBlock = new ActionBlock<DecodedMessage[]>(async 
        (DecodedMessage[] messages) => await _statsFeedPublisher.PublishAsync(messages.ToList(), TimeSpan.FromSeconds(1)), smallBufferOptions);

    var dbPersistenceBlock = new ActionBlock<DecodedMessage[]>(async 
        (DecodedMessage[] messages) => await _dbPersister.PersistAsync(messages.ToList()), smallBufferOptions);
    var thirtySecondStatsFeedBlock = new ActionBlock<DecodedMessage[]>(async 
        (DecodedMessage[] messages) => await _statsFeedPublisher.PublishAsync(messages.ToList(), TimeSpan.FromSeconds(30)), smallBufferOptions);

    // link the blocks to together
    writeRawMessageBlock.LinkTo(decoderBlock, linkOptions);
    decoderBlock.LinkTo(broadcast, linkOptions);
    broadcast.LinkTo(realTimeFeedBlock, linkOptions);
    broadcast.LinkTo(oneSecondBatchBlock, linkOptions);
    broadcast.LinkTo(thirtySecondBatchBlock, linkOptions);
    oneSecondBatchBlock.LinkTo(batchBroadcastBlock, linkOptions);
    batchBroadcastBlock.LinkTo(oneSecondStatsFeedBlock, linkOptions);
    batchBroadcastBlock.LinkTo(dbPersistenceBlock, linkOptions);
    thirtySecondBatchBlock.LinkTo(thirtySecondStatsFeedBlock, linkOptions);

    // Step 2 - Start consuming the machine bus interface (start the producer)
    var consumerTask = _dataBusReader.StartConsuming(writeRawMessageBlock, token, TimeSpan.FromMilliseconds(10));

    // Step 3 - Keep going until the CancellationToken is cancelled.
    while(!token.IsCancellationRequested)
        await Task.Delay(500);

    // Step 4 - the CancellationToken has been cancelled and our producer has stopped producing
    // call Complete on the first block, this will propagate down the pipeline
    writeRawMessageBlock.Complete(); 

    // wait for all leaf blocks to finish processing their data
    await Task.WhenAll(realTimeFeedBlock.Completion,
        oneSecondStatsFeedBlock.Completion,
        dbPersistenceBlock.Completion,
        thirtySecondStatsFeedBlock.Completion,
        consumerTask);

    // clean up any other resources like ZeroMQ for example
}

The producer code looks like this:

public Task StartConsuming(ITargetBlock<RawBusMessage> target, CancellationToken token, TimeSpan interval, FlowControlMode flowControlMode)
{
    return Task.Factory.StartNew(() => Consume(target, token, interval), 
                TaskCreationOptions.LongRunning);
}

private void Consume(ITargetBlock<RawBusMessage> target, CancellationToken token, TimeSpan interval)
{
    long lastTicks = 0;
    long counter = 0;
    while (!token.IsCancellationRequested)
    {
        counter++;
        var reading = _dataBus.Read();

        var message = new RawBusMessage();
        message.Data = reading.Data;
        message.ReadingTime = new DateTime(reading.Ticks);
        message.Counter = counter;

        if (lastTicks < reading.Ticks)
            target.Post(message);

        lastTicks = reading.Ticks;
        Thread.Sleep(interval); // we use a dedicated thread
    }
}

Our interfaces that do the work look like this:

public interface IDataBusReader
{
    Task StartConsuming(ITargetBlock<RawBusMessage> target, CancellationToken token, TimeSpan interval, FlowControlMode flowControlMode);
}

public interface IMessageFileWriter
{
    Task WriteAsync(RawBusMessage message);
}

public interface IDecoder
{
    void LoadSensorConfigs();
    IEnumerable<DecodedMessage> Decode(RawBusMessage message);
}

public interface IRealTimePublisher
{
    Task PublishAsync(DecodedMessage message);
}

public interface IStatsFeedPublisher
{
    Task PublishAsync(IList<DecodedMessage> messages, TimeSpan window);
}

public interface IDbPersister
{
    Task PersistAsync(IList<DecodedMessage> messages);
}

 

Let's pick that code apart a bit and look at the decisions that were made.

Decision - Producer to use Post or SendAsync?

In our case we want to poll the machinery bus interface every 10 milliseconds to read the latest sensor readings. Although we refer to it as a bus it is in reality a shared memory location where we can get the latest sensor reading values. Therefore we create a polling loop in a long running Task that operates in its own thread. Sharing the thread pool would introduce too much variance in our polling interval. This also means the async/await is unavailable to us as we would resume after the await back on the thread pool. So we use the Post method and we don't check the result either. We trust the buffer to have capacity and if it doesn't there is nothing else we can do.

Decision - Fast Producer, Slow Consumer Strategy

In our scenario we always want to write the raw bus messages to file in the right order, without losing any data. With a 100 messages per second entering the pipeline and a normal payload being in the 1Kb range, we'll need disk writes in the region of 6Mb a minute which is no problem for our server running this pipeline. However, the later stages involve network sockets to multiple receivers and database writes. We cannot guarantee we'll always be able to manage that same rate. If blocks lower down the pipeline choke up it would cause buffers to fill up right up to the first block, forcing either a slowdown of the producer or load-shedding by the producer - neither of which we want.

But, the BroadcastBlock comes in handy here. Only the blocks up until the first BroadcastBlock could force producer slowdown or load shedding as the BroadcastBlock simply overwrites its buffer on each new message and so neither it or downstream blocks can apply back-pressure to the producer. As long as the Filter Writer and Decoder blocks keep up in the long term then the producer we're fine, any short-term slowdowns can be absorbed by the buffers.

After data is written to disk our requirements state that we just need best effort so the fact that the first BroadcastBlock will drop data when downstream blocks choke up is a good thing for us. It cannot impact the buffers of our file writer block and cause data to be lost at the producer end. The BroadcastBlock acts as a safety cut-off as well as creating our first branch in the pipeline.

Decision - Correct Order Real-Time Publishing

Another requirement of our scenario is that we publish decoded messages in real-time over network sockets. We'll use ZeroMQ to simplify our socket programming and allow multiple applications to subscribe to the real-time message feed in a publish-subscribe architecture. Subscribers can subscribe to all sensor readings or just a subset. We'll set a high-water mark in ZeroMQ so that if ZeroMQ buffers become full then ZeroMQ will drop messages. This will protect us from slow network consumers causing our ZeroMQ buffers to cause us memory issues. Because sending messages with ZeroMQ is non-blocking, we can use an ActionBlock configured to process messages sequentially, thus ensuring the correct order while still keeping up with the high volume.

However, in the event that this block cannot keep up, the preferred behaviour is that messages should be dropped rather than delivered late. Late being more than 2 seconds old. So we set a small buffer of 600 messages which equates to 2 seconds of data (with our 300 decoded messages per second). If the buffer fills up then messages get discarded (overwritten by the BroadcastBlock) until there is room in the buffer. This way we cannot fall very far behind, at the expense of losing a certain amount of data.

Decision - Time Windows

Unfortunately the BatchBlock cannot batch up messages by time period, only by count. In our case we can assume that 300 messages is a 1 second time window. Not perfect, but it is enough in our case. Otherwise we'd have to implement a custom block type.

Decision - Potentially Slow Database Writes

Our database writes, when performed sequentially don't keep up with the message rate. We perform bulk copy operations on 300 element arrays of messages, but even so, the latency is higher than 1 second. But we have found that parallelizing the writes allows us to keep up. So we set the MaxDegreeOfParallelization to 4. That way we are writing 4 batches of messages at a time which we have found is enough. Writes can sometimes happen out of order but we use the message timestamp as part of the primary key and so all we end up with is some index fragmentation which we resolve with an index rebuild every night.

If we do still go too slowly and the ActionBlock's buffers fill up, then we start losing data as the BroadcastBlock that feeds it overwrites messages that it could not deliver. This is not a problem as the requirements state that best effort is enough for the database persistence.

Decision - How to Deal With Errors?

When an unhandled exception occurs in a Func delegate it will cause the block to enter a faulted state. This will cause the block to decline new messages and wipe its buffers. Because we have set the link options to propagate completion, the faulted block will propagate the faulted state down the pipeline.

This means that if a block half way along the pipeline enters a faulted state, then the upstream blocks will continue to function until their buffers fill up. If a block has unbounded buffers then this would mean the buffers will fill up until either the pipeline is shutdown or an OutOfMemoryException is thrown. Unless of course the last functioning block is a BroadcastBlock in which case upstream blocks will continue to function normally.

So we need to be pretty careful about how we handle exceptions. In general I think it's a pretty good idea to wrap all executed code in try/catch blocks to ensure all exceptions get handled.

In our checking for cancellation loop, we could also check the status of the Completion task of each block. If the block has faulted its Completion task will also be faulted. This could tell us we need to restart the pipeline.

Limitations of TPL Dataflow

In our scenario I see the following limitations:

  • BatchBlocks cannot group by time period.

  • When buffers are full, we cannot choose to eject messages from the head of the buffer. The incoming and outgoing buffers are basically FIFO queues. The behaviour is to prevent new messages entering the queue, protecting the existing messages. This favours older messages over new ones. RabbitMQ does the opposite and ejects messages from the head of the queue when a message arrives at a full queue. This favours new messages over old. It would be nice to be able to choose between both models. For our real-time publishing requirements, we definitely favour new messages over older ones.

Of course we could implement our own blocks. But I'd prefer to not have to do that.

Next up we'll build the same pipeline again with TPL Dataflow with an alternate scenario which requires back pressure throughout the length of the pipeline. After that we'll take a look at Reactive Extensions (Rx.NET).

Code is on Github. Posts in series: