Processing Pipelines Series - Reactive Extensions (Rx.NET)

Code for this post is on Github here.

Whereas TPL Dataflow is all about passing messages between blocks, Reactive Extensions is about sequences. With sequences we can create projections, transformations and filters. We can combine multiple sequences into a single one. It is a very flexible and powerful paradigm but with such power comes extra complexity. I find TPL Dataflow easier to reason about due to its simple model. Reactive Extensions can get pretty complex and is not always intuitive, but you can create some elegant solutions with it. It will require some investment in time and tinkering to get a reasonable understanding of it.

So if you just want to learn Rx then you'd be better off checking out the Intro to Rx site which covers the basics more thoroughly than I am going to do. In this post I am simply going to build our a pipeline meeting the requirements of our scenario, just like I did with TPL Dataflow. Along the way we'll learn a few things about Rx and its capabilities.

The first post in the series explains our scenario and I recommend you read it so you have some context.

A Brief Introduction to Reactive Extensions

It's all about IObservable<T> which is basically a sequence which can be finite or continuous. A producer pushes items onto the sequence and consumers passively receive these items and act on them. It is a push model where producers push data onto the sequence and consumers who have registered interest in the sequence via the Subscribe method receive that data. In between the producer and the consumer we can create multiple stages where we apply operations such as transformations, filters, projections and the like.

Each operation is applied on an IObservable<T> and returns an IObservable<T>. The concept is similar to TPL Dataflow where blocks take in data in one form and can output it in another, except the Rx transformations are more varied and more composable. We can apply LINQ like methods like Select, Where, GroupBy, Any, All, Distinct, Skip, Take, the list goes on.

I have to mention there is also a second important interface IObserver<T> but we'll not come directly into contact with it with our pipeline, but it is a core part of Rx.

Hot and Cold Observables

The naming of this is a bit strange, but it is really important to understand the difference between hot and cold observables.

The producer of a cold observable only starts producing once a consumer starts a subscription. Moreover, the producer will produce a dedicated sequence for each subscriber. So if a producer produces a monotonically incrementing sequence that starts at 0, and ten consumers set up a subscription, then each of those ten will receive a separate instance of that monotonically incrementing sequence. But if the producer is taking messages from an external resource, like a message queue, then those ten consumers will now be competing for those messages. 

Let's say we have put 10 messages in a Kafka topic: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10. Let's say we have a producer that reads from the topic and pushes those messages into an IObservable. If we have two subscribers then the messages might get consumed as follows:

  • Subscriber A: 1, 3, 5, 7, 9
  • Subscriber B: 2, 4, 6, 8, 10

This may or may not be what you want. If you want each subscriber to receive the same messages, that is, they should not compete, then you'll need a hot observable.

Hot observables acts as a multicast. The hot observable pushes each message to each subscriber. This means we that Subscribers A and B each would get all ten messages.

The producer of the hot observable starts producing only once the Connect method has been called on the hot observable, irrespective of if there are subscribers or not. Producers of cold observables only produce when a consumer subscribes. But with a hot observable even if there are no subscribers, the producer will start pushing its data down the pipe. If there are no subscribers then these items will be sent into the ether as no-one is listening. In my example if the incrementing number sequence, once a subscriber starts up, the first number they receive will not be 0. This can be avoided by setting up the subscription first and calling Connect() afterwards.

Introduction to the Operations of Our Pipeline

In our pipeline we are going to make use of both hot and cold observables, as well as some grouping, filtering and transforming.

Observable.Generate

A factory method to create our source IObservable<RawBusMessage>. This creates our producer that reads from the machine bus interface every 10 milliseconds and places an item on the IObservable<RawBusMessage>.

Observable.Publish

Passing a cold observable to the Observable.Publish method will return a hot observable.

Buffer

We can accumulate items into an IObservable<IList<T>> based on a count or TimeSpan or both. This is clearly superior to the TPL Dataflow BatchBlock which is missing the TimeSpan capability.

var print = source.Buffer(10).Select(x => Console.WriteLine($"{x.Count()} items"));

The above example, once subscribed to, would print out "10 items" repeatedly.

Select

With select we can execute code on each item. That means we can perform a database query, publish a message over ZeroMQ, or simply transform the data from one form to another.

SelectMany

Just like select we can execute code on each individual item, SelectMany can execute on each item of nested collections, flattening it out.

Where

We can return an IObservable<T> with certain items filtered out. This can be important to avoid some exceptions where an empty observable can cause an exception. For example chaining a Select to a Buffer(TimeSpan) operation will fail if no items were received during the TimeSpan period and the Buffer returns an empty observable to Select, which needs data to operate on. So we protect it with a Where(x => x.Any() in between.

var print = source.Buffer(10)
               .Where(x => x.Min() > 0)
               .Select(x => Console.WriteLine($"Lowest value is {x.Min()}"));

In the above example source might be an IObservable<int>. We batch it up into 10 element ILists, filter out any lists that have a value equal to or lower than 0 and the print out the minimum value of each list that remains.

Concat

Some operations ingest one observable and return multiple observables. We can concatenate those observables back into one using Concat. Concat will ensure that each observable completes before appending it. In this way we ensure that the ingested observable is processed sequentially.

It is still quite abstract at this point, but I go into more specifics when we look at our concrete pipeline example.

Merge

Merge also combines multiple observables into one, but does so concurrently. Merge(4) will run four observables at a time and append them to the new observable on completion.

Subscribe

Sets up a subscription and allows you to execute an Action on each item, error and on completion. If subscribing to a cold observable then the producer will start producing now. If subscribing to a hot observable then the subscriber will start receiving the sequence that has already been started. Note that, in order to not lose data, you can set up a subscription to a hot observable first, then call the Connect method on the hot observable to start it up.

Async/Await

While not an operation I'll note it here. Rx.NET does support async await though with some restrictions. Subscribe does not support await. You can use it and it will compile and run, but nothing will be awaited causing unbounded parallelism that will probably cause you some issues, in addition to not knowing when the subscriber has finished processing the sequence.

Also, all LINQ methods with async/await will return multiple observables instead of a single one, so you'll need to use Concat (or another combination operation) on the result to have a single outgoing observable.

Async/await allows for sequentially and concurrent processing:

  • Using Select(...).Concat() will process the sequence sequentially (one at a time in the same order)
  • Select(...).Merge(3) will process the sequence concurrently, with up to 3 items being processed at a time.
  • SelectMany(...) is just Select(...).Merge() which does not constrain the concurrency level, which might not be the best option. You may find that if you want concurrency, that you want to constrain it to a specific number of concurrent items at a time.
var print = source
        .Select(x => Observable.FromAsync(async () =>
            {
                await Task.Delay(1000);
                Console.WriteLine($"Value is {x}");
                return x;
            })
        ).Merge(4)

The above example processes up to 4 async tasks at a time (once subscribed to).

Final Note About Cold Observables

The LINQ methods above and Buffer all return cold observables. This is important as it affects our pipeline branching stategy.

Concurrency

By default Rx is single-threaded and synchronous. Every time the producer pushes an item onto the sequence, the whole chain of operations is executed immediately. This is different to TPL Dataflow where each block is executing concurrently, though it processes its own items sequentially.

There are some exceptions. The Buffer operation runs on a separate thread, which means that all items passed to it are done so in a non-blocking away. This adds concurrency into a pipeline.

Schedulers allow us to run the producer and various operations concurrently. We can create dedicated non-thread pool threads, or use the thread pool, or use a timer. We'll be making use of schedulers to ensure that the producer does not get slowed down by short-lived slowdowns in the pipeline stages. Rx uses queues internally to buffer items between operations on different threads, but these queues are unbounded which should be of concern to you if you are memory or latency constrained, or have a high velocity of data.

Implementing Our Pipeline

In the first post I described the scenario. So please check that out if you didn't read that.

Fig 1. The pipeline with Reactive Extensions

We'll pick that diagram apart and look at the code of each part. First the producer.

public IObservable<RawBusMessage> StartConsuming(CancellationToken token, TimeSpan interval)
{
    // run the producer on a separate dedicated thread so we don't get a jittery interval from running on the thread pool
    var scheduler = new NewThreadScheduler(ts => new Thread(ts) { Name = "DataBusPoller" });

    var source = Observable.Generate(Read(),
        x => !token.IsCancellationRequested, // run until cancelled
        x => Read(), // consume the machine bus interface
        x => x, // return the message as is
        x => interval, // execute Read() on the supplied interval
        scheduler);

    return source;
}

private RawBusMessage Read()
{
    _counter++;
    var reading = _dataBus.Read();

    var message = new RawBusMessage();
    message.Data = reading.Data;
    message.ReadingTime = new DateTime(reading.Ticks);
    message.Counter = _counter;

    return message;
}

Just like with our TPL Dataflow pipeline we run our consumer on a separate thread as a 10 ms second interval is a little too tight for running on the thread pool. We would probably get too much variance. We use Observable.Generate to create an IObservable<RawBusMessage> which will call the Read() method every 10 milliseconds.

Let's look at the code of the pipeline. It has been divided into 8 steps.

public async Task StartPipelineAsync(CancellationToken token)
{
    _decoder.LoadSensorConfigs();

    // Step 1 - Create our producer as a cold observable
    var source = _dataBusReader.StartConsuming(token, TimeSpan.FromMilliseconds(10));

    // Step 2 - Add file writing and decoding stages to our cold observable pipeline
    var writeStream = source.ObserveOn(ThreadPoolScheduler.Instance)
                        .Select(x => Observable.FromAsync(async () => 
    {
        await _messageFileWriter.WriteAsync(x);
        return x;
    })).Concat();

    var decodedStream = writeStream.Select(x => 
    {
        return _decoder.Decode(x).ToObservable();
    }).Concat();

    // Step 3 - Create a hot observable that acts as a broadcast 
    // and allows multiple subscribers without duplicating the work of the producer
    var multiCastStream = Observable.Publish(decodedStream);

    // Step 4 - Create our subscriptions that perform work on the decoded messages
    // create a subscription to the hot obeservable that buffers in 1 second periods and performs up to 4 concurrent db writes
    var dbPersistenceComplete = false;
    var dbPersistenceSub = multiCastStream
                        .Buffer(TimeSpan.FromSeconds(1))
                        .Where(messages => messages.Any())
                        .Select(messages => Observable.FromAsync(async () => await _dbPersister.PersistAsync(messages)))
                        .Merge(4) // up to 4 concurrent executions of PersistAsync
                        .Subscribe(
                            (Unit u) => { },
                            (Exception ex) => { Console.WriteLine("DB Persistence error: " + ex); },
                            () => 
                            {
                                dbPersistenceComplete = true;
                                Console.WriteLine("DB Persistence complete!");
                            });

    // create a subscription to the hot obeservable that buffers in 1 second periods and performs sequential processing of each batch
    bool statsFeed1Complete = false;
    var oneSecondStatsFeedSub = multiCastStream
                        .Buffer(TimeSpan.FromSeconds(1))
                        .Where(messages => messages.Any())
                        .Select(messages => Observable.FromAsync(async () => await _statsFeedPublisher.PublishAsync(messages, TimeSpan.FromSeconds(1))))
                        .Concat() // one batch at a time
                        .Subscribe(
                            (Unit u) => { },
                            (Exception ex) => { Console.WriteLine("1 Second Stats Feed Error: " + ex); },
                            () => 
                            {
                                statsFeed1Complete = true;
                                Console.WriteLine("1 Second Stats Feed Complete!");
                            });

    // create a subscription to the hot obeservable that buffers in 30 second periods and performs sequential processing of each batch
    bool statsFeed30Complete = false;
    var thirtySecondStatsFeedSub = multiCastStream
                        .Buffer(TimeSpan.FromSeconds(30))
                        .Where(messages => messages.Any())
                        .Select(messages => Observable.FromAsync(async () => await _statsFeedPublisher.PublishAsync(messages, TimeSpan.FromSeconds(30))))
                        .Concat() // one batch at a time
                        .Subscribe(
                            (Unit u) => { },
                            (Exception ex) => { Console.WriteLine("30 Second Stats Feed Error: " + ex); },
                            () => 
                            {
                                statsFeed30Complete = true;
                                Console.WriteLine("30 Second Stats Feed Error Complete!");
                            });

    // create a subscription to the hot obeservable that sequentially processes one message at a time in order
    bool realTimePubComplete = false;
    var realTimePubSub = multiCastStream
                        .Select(messages => Observable.FromAsync(async () => await _realTimeFeedPublisher.PublishAsync(messages)))
                        .Concat() // one message at a time
                        .Subscribe(
                            (Unit u) => { },
                            (Exception ex) => { Console.WriteLine("Real-time Pub Error: " + ex); },
                            () => 
                            {
                                realTimePubComplete = true;
                                Console.WriteLine("Real-time Pub Complete!");
                            });

    // Step 6. Start the producer
    multiCastStream.Connect();

    // Step 7. Keep things going until the CancellationToken gets cancelled
    while (!token.IsCancellationRequested)
        await Task.Delay(500);

    // Step 8. Safe shutdown of the pipeline
    // Wait for all subscriptions to complete their work
    while (!realTimePubComplete || !dbPersistenceComplete || !statsFeed1Complete || !statsFeed30Complete)
        await Task.Delay(500);

    Console.WriteLine("All subscribers complete!");

    // dispose of all subscriptions
    dbPersistenceSub.Dispose();
    oneSecondStatsFeedSub.Dispose();
    thirtySecondStatsFeedSub.Dispose();
    realTimePubSub.Dispose();

    // clean up the producer
    _dataBusReader.CleanUp();

    // safely clean up any other resources, for example, ZeroMQ
}

Our interfaces for the classes that perform the work are:

public interface IDataBusReader
{
    IObservable<RawBusMessage> StartConsuming(CancellationToken token, TimeSpan interval);
}

public interface IMessageFileWriter
{
    Task WriteAsync(RawBusMessage message);
}

public interface IDecoder
{
    void LoadSensorConfigs();
    IEnumerable<DecodedMessage> Decode(RawBusMessage message);
}

public interface IRealTimePublisher
{
    Task PublishAsync(DecodedMessage message);
}

public interface IStatsFeedPublisher
{
    Task PublishAsync(IList<DecodedMessage> messages, TimeSpan window);
}

public interface IDbPersister
{
    Task PersistAsync(IList<DecodedMessage> messages);
}

Step 1 - Create our producer as a cold observable

The BusDataReader (producer), File writer and Decoder stages are all cold up until stage 4 where they get converted into a hot observable. Once the Connect method is called, that part of the pipeline will execute regardless of any subscribers.

Step 2 - Add file writing and decoding stages to our cold observable pipeline

The File Writer stage is executed by a Select operation which uses async/await, and is therefore appended with a Concat() call to combine the multiple observables into a single one. We could use Merge to allow us to parallelize that stage, but we MUST write the messages to disk in the correct order, so we'll go with a sequential Concat().

We schedule this first stage on the thread pool to ensure that it is processed asynchronously. If we don't do this then each time the producer adds a message onto the sequence it would run much of the pipeline in a blocking synchronous fashion that would slow down the producer every time the pipeline slowed down, making our polling interval highly variable. By scheduling our pipeline on the thread pool we ensure that the producer posts data asynchronously and Rx queues up the messages to be consumed.

The Decoder stage is a SelectMany in the diagram as each invocation of Decode returns an IEnumerable<DecodedMessage> which will need to be flattened. We need to make the IEnumerbale<DecodedMessage> into an IObservable<DecodedMessage> so we call ToObservable() on it. But I ended up changing SelectMany to a Select(...).Concat() to ensure guaranteed ordering. SelectMany loses our original order of messages which is a problem for us as we want to publish messages in real-time in the correct order in a later stage.

Step 3 - Create a hot observable that acts as a broadcast 

At this point we want to fork the pipeline into multiple branches. In our TPL Dataflow pipeline we had four leaf blocks:

  • Real-time Publising
  • One second stats publishing
  • One second batched DB persistence
  • Thirty second stats publishing

In our Rx.NET pipeline, those leaf blocks correspond to subscriptions. One subscription simply acts on each individual item and the other three need to buffer the items into time windows then act only those batches. Each subscription has its own mini-pipeline for achieving those needs. We could simply get those subscribers to feed off the Decode stage observable and not bother with a hot observable. But then we'd end up with the following topology:

Fig 2. Without the hot observable we have four separate pipelines

Now we'd have four separate pipelines and each would compete for the same bus messages. This is not what we want. So the hot observable is critical for achieving our single pipeline.

Step 4 - Create our subscriptions

Each subscription corresponds to a leaf in our topology. Taking the DB persistence subscriber as an example, we chain together the following operations, feeding off the hot observable:

  • Buffer
  • Where
  • Select
  • Merge
  • Subscribe

We ingest the data from the hot observable and batch it up into 1 second groupings. We filter out grouping that have no elements. Next we perform the database write in a Select using async/await which returns multiple observables. The Merge "executes" those observables four at a time giving us some concurrency and we finally subscribe to the IObservable that the Merge returns. This call to subscribe kicks off the whole mini pipeline that starts at the hot observable.

The other subscriptions use Concat instead of Merge as they either don't need the concurrency or processing order guarantees are needed.

Step 6. Start the producer

So far, the producer is not running. It is a cold observable and will only start once a consumer starts consuming it. The consumer is the downstream Observable.Publish which starts consuming that chain of cold observables, publishing it as a hot observable. But this only happens once Connect is called. Step 6 is to simply make the call to Connect. Because all our subscriptions are already set up, we don't lose the first messages.

Step 7. Keep things going until the CancellationToken gets cancelled

Just like with our TPL Dataflow pipeline, the whole thing can be cancelled by a single CancellationToken passed to it. We simply loop here until it gets cancelled. Once cancelled the shutdown and clean-up commences.

Step 8. Safe shutdown of the pipeline

Each subscriber sets a local boolean variable to true once it has received the OnComplete notification. The Producer sends out that notification when the CancellationToken is cancelled.

We loop until all stages have confirmed that they have received the OnComplete notification. This means that all data in the pipeline has been processed. We dispose the subscriptions and clean up the scheduler of the producer, then we're all done.

Back-Pressure and Load-Shedding

Back-pressure is a complicated topic with Rx. On one hand, when a pipeline is fully synchronous (blocking) from end-to-end then the producer can only produce at the rate that the pipeline can process the data. This means when fully synchronous, we get back-pressure whether we want it or not. But if there is any asynchrony in the pipeline, then we lose our back-pressure and there is no overly simply way of constraining our producer.

Examples of asynchrony are using a Buffer operation (which runs in a separate thread). Any data pushed to the Buffer operation is done so asynchronously (no blocking). So a Buffer in the middle of your pipeline turns a synchronous pipeline into a pipeline of two synchronous halves, with an asynchronous bridge between the two.

There are other operations such as Sample that also introduce asynchrony to your pipeline. Also, if you use schedulers to keep your producer and pipeline on different threads then items are queued and no capacity is set on those internal queues. Your producer is only constrained by the memory available to your process.

There are some options using semaphores to balance the work between the consumers and the producer. So look into that if you need to constrain a producer.

If back-pressure is not an option, you need to basically ensure your pipeline can keep up. It is your pipelines responsibility to cope with the data the producer hands it.

We'll look at three general strategies.

Load Shedding (reduce the load)

The following operations can help:

  • Sample. Take a sample of an observable on an interval.
  • Throttle. Slices the top off high velocity peaks of items. Useful when the rate is variable with bursty periods.

Concurrency (go faster)

  • Use async/await with Merge to process multiple items concurrently.
  • Use Schedulers to run different operations on different threads

Batches (be more efficient)

We can also speed things up sometimes by using the Buffer operation. Sometimes it is more efficient to work on a batch of items together than one by one. An example of this is using bulk insert with a database.

Just like with TPL Dataflow, we'll take an alternative but similar scenario and make a second version of our Rx.NET pipeline in the next post.

Code is on Github. Posts in series: