Code for this post is on Github here.
In the last post we built a TPL Dataflow pipeline based on the scenario from our first post in the series. Today we'll build another pipeline very similar to the first but with different requirements around latency and data loss.
In the first scenario we could not slow down the producer as slowing it down would cause data loss (it read from a bus that would not wait if you weren't there to consume the data). We also cared a lot about ensuring the first stage kept up with the producer and successfully wrote every message to disk. The rest was best effort, and we performed load-shedding so as not to slow down the producer.
Alternate Scenario - Back Pressure
The new scenario is that we can slow down the producer as much as we want. It reads from a durable queue that is for our purposes unbounded. We need that the whole pipeline processes every message and if one stage goes slow then we need the whole pipeline to slow down to accomodate it. Latency is not an issue. For this scenario we need back-pressure that can flow from the leaves all the way back to the producer.
We need to replace the BroadcastBlocks with something else as they act as bulk heads, preventing pressure from passing them. We'll replace them with TransformManyBlocks and a couple of wrapper classes for our DecodedMessage class.
TransformMany as a BroadcastBlock
The basic idea is that for each DecodedMessage that the block consumes, it outputs a list of RoutedMessage, one for each branch at the fork.
We also have to do a broadcast of batches of messages. So we create a TransformManyBlock that receives RoutedMessage[] and outputs a RoutedBatch.
public class RoutedMessage
{
public RoutedMessage(int routeKey, DecodedMessage message)
{
RouteKey = routeKey;
Message = message;
}
public int RouteKey { get; set; }
public DecodedMessage Message { get; set; }
}
public class RoutedBatch
{
public RoutedBatch(int routeKey, IEnumerable<DecodedMessage> messages)
{
RouteKey = routeKey;
Messages = messages;
}
public int RouteKey { get; set; }
public IEnumerable<DecodedMessage> Messages { get; set; }
}
Then when we link the block to multiple downstream blocks, we use a lambda to filter on the RouteKey property.
msgBranchBlock.LinkTo(realTimeFeedBlock, linkOptions, routedMsg => routedMsg.RouteKey == 1); // route on the key
msgBranchBlock.LinkTo(oneSecondBatchBlock, linkOptions, routedMsg => routedMsg.RouteKey == 2); // route on the key
msgBranchBlock.LinkTo(thirtySecondBatchBlock, linkOptions, routedMsg => routedMsg.RouteKey == 3); // route on the key
This is how we can perform a broadcast and keep the back-pressure that we want.
Next we need the producer to slow down when the block it posts to has full buffers.
private void ConsumeWithBackPressure(ITargetBlock<RawBusMessage> target, CancellationToken token, TimeSpan interval)
{
long lastTicks = 0;
while (!token.IsCancellationRequested)
{
_counter++;
var reading = _dataBus.Read();
var message = new RawBusMessage();
message.Data = reading.Data;
message.ReadingTime = new DateTime(reading.Ticks);
message.Counter = _counter;
if (lastTicks < reading.Ticks)
{
// keep trying to post the message until it gets accepted
while (!target.Post(message))
Thread.Sleep((int)interval.TotalMilliseconds);
}
lastTicks = reading.Ticks;
Thread.Sleep(interval);
}
}
We basically keep trying to post it until the receiving block has space in its buffer.
The code to set up the pipeline is as follows:
public async Task StartPipelineWithBackPressureAsync(CancellationToken token)
{
_decoder.LoadSensorConfigs();
// Step 1 - Configure the pipeline
// make sure our complete call gets propagated throughout the whole pipeline
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
// create our block configurations
var largeBufferOptions = new ExecutionDataflowBlockOptions() { BoundedCapacity = 6000 };
var smallBufferOptions = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 };
var parallelizedOptions = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100, MaxDegreeOfParallelism = 4 };
var batchOptions = new GroupingDataflowBlockOptions() { BoundedCapacity = 1000 };
// create some branching functions for our TransformManyBlocks
// DecodedMessage gets tranformed into 3 RoutedMessage for the first three way branch
Func<DecodedMessage, IEnumerable<RoutedMessage>> messageBranchFunc = x => new List<RoutedMessage>
{
new RoutedMessage(1, x),
new RoutedMessage(2, x),
new RoutedMessage(3, x)
};
// DecodedMessage[] gets tranformed into a RoutedBatch for the final branch
Func<RoutedMessage[], IEnumerable<RoutedBatch>> batchBranchFunc = x => new List<RoutedBatch>
{
new RoutedBatch(1, x.Select(c => c.Message).ToList()),
new RoutedBatch(2, x.Select(c => c.Message).ToList())
};
// define each block
var writeRawMessageBlock = new TransformBlock<RawBusMessage, RawBusMessage>(async (RawBusMessage msg) =>
{
await _messageFileWriter.WriteAsync(msg);
return msg;
}, largeBufferOptions);
var decoderBlock = new TransformManyBlock<RawBusMessage, DecodedMessage>(
(RawBusMessage msg) => _decoder.Decode(msg), largeBufferOptions);
var msgBranchBlock = new TransformManyBlock<DecodedMessage, RoutedMessage>(messageBranchFunc, largeBufferOptions);
var realTimeFeedBlock = new ActionBlock<RoutedMessage>(async (RoutedMessage routedMsg) =>
await _realTimeFeedPublisher.PublishAsync(routedMsg.Message), largeBufferOptions);
var thirtySecondBatchBlock = new BatchBlock<RoutedMessage>(9000, batchOptions);
var thirtySecondStatsFeedBlock = new ActionBlock<RoutedMessage[]>(async (RoutedMessage[] batch) =>
await _statsFeedPublisher.PublishAsync(batch.Select(x => x.Message).ToList(), TimeSpan.FromSeconds(30)), smallBufferOptions);
var oneSecondBatchBlock = new BatchBlock<RoutedMessage>(300, batchOptions);
var batchBroadcastBlock = new TransformManyBlock<RoutedMessage[], RoutedBatch>(batchBranchFunc, smallBufferOptions);
var oneSecondStatsFeedBlock = new ActionBlock<RoutedBatch>(async (RoutedBatch batch) =>
await _statsFeedPublisher.PublishAsync(batch.Messages.ToList(), TimeSpan.FromSeconds(1)), smallBufferOptions);
var dbPersistenceBlock = new ActionBlock<RoutedBatch>(async (RoutedBatch batch) =>
await _dbPersister.PersistAsync(batch.Messages.ToList()), parallelizedOptions);
// link the blocks to together
writeRawMessageBlock.LinkTo(decoderBlock, linkOptions);
decoderBlock.LinkTo(msgBranchBlock, linkOptions);
msgBranchBlock.LinkTo(realTimeFeedBlock, linkOptions, routedMsg => routedMsg.RouteKey == 1); // route on the key
msgBranchBlock.LinkTo(oneSecondBatchBlock, linkOptions, routedMsg => routedMsg.RouteKey == 2); // route on the key
msgBranchBlock.LinkTo(thirtySecondBatchBlock, linkOptions, routedMsg => routedMsg.RouteKey == 3); // route on the key
thirtySecondBatchBlock.LinkTo(thirtySecondStatsFeedBlock, linkOptions);
oneSecondBatchBlock.LinkTo(batchBroadcastBlock, linkOptions);
batchBroadcastBlock.LinkTo(oneSecondStatsFeedBlock, linkOptions, routedMsg => routedMsg.RouteKey == 1); // route on the key
batchBroadcastBlock.LinkTo(dbPersistenceBlock, linkOptions, routedMsg => routedMsg.RouteKey == 2); // route on the key
// Step 2 - Start consuming the machine bus interface (the producer)
var consumerTask = _dataBusReader.StartConsuming(writeRawMessageBlock, token, TimeSpan.FromMilliseconds(10));
// Step 3 - Keep going until the CancellationToken is cancelled.
while (!token.IsCancellationRequested
&& !oneSecondStatsFeedBlock.Completion.IsCompleted
&& !dbPersistenceBlock.Completion.IsCompleted
&& !realTimeFeedBlock.Completion.IsCompleted
&& !thirtySecondStatsFeedBlock.Completion.IsCompleted)
{
await Task.Delay(500);
}
// Step 4 - the CancellationToken has been cancelled and our producer has stopped producing
// call Complete on the first block, this will propagate down the pipeline
writeRawMessageBlock.Complete();
// wait for all leaf blocks to finish processing their data
await Task.WhenAll(oneSecondStatsFeedBlock.Completion,
thirtySecondStatsFeedBlock.Completion,
dbPersistenceBlock.Completion,
realTimeFeedBlock.Completion,
consumerTask);
// clean up any other resources like ZeroMQ for example
}
So we can that TPL dataflow can easily support both the message rate control mechanisms of back-pressure and load-shedding. Next up Reactive Extensions (Rx.NET).
Code is on Github. Posts in Series:
Processing Pipelines Series - TPL Dataflow - Alternate Scenario (this post)