In the field

I for one rarely encounter codebases that use Rx.NET in the wild, and when introducing Rx to a team some resistance is to be expected. While unfortunate, there are good reasons for this, the main reason being that Rx has a pretty steep learning curve; It can seem daunting to those without Rx experience, which does make it an unlikely candidate for adoption. A shame, because there is a lot of documentation on it, and if you just start small, you’ll get the hang of it in no time. To me personally, getting the hang of Rx was not without challenges, but an absolute game changer nonetheless.

The concept

In a nutshell, Reactive Extensions (Rx) offers a way to work with data reactively (you don’t say!). One of it’s main concepts is the Observable, which could be regarded the ‘push’ version of Enumerable. Instead of querying it, you subscribe to updates that the observable produces. An observable and its subscribers are in essence quite similar to an event and its eventhandlers, only way more extensible. An observable should be regarded as a sequence of values, that can be filtered, transformed, and combined.

Why I use it

Using Rx has really helped me up my game when it comes to exposing domain data to the UI. A pattern I apply frequently is to have my readmodel services expose observables, so that interested services can subscribe to updates instead of having to poll.

Pushing information to a UI before it is actually needed can be a major help in making the user experience smoother, because the information does not have to be queried at the moment that the user needs it, eliminating potential delays. Sure, this can be done using a pull strategy, but it would be way more efficient to only transfer information that has actually changed. This is a very convenient thing to have in backend applications, and when combined with SignalR, any changes in domain data can even be projected directly down to your web application. On its way down, the data can get transformed, filtered and combined with other observables to fit the requirements of the use case.

Another thing I frequently apply Rx to is time-related processing, like calculating average order rates per minute on production lines, or just watching a textbox to see if the user has stopped typing for 2 seconds (debounce). Rx has all kinds of operators that will help you perform these tasks without having to use a single instance of Timer or Stopwatch.

Let’s have a quick look at the textbox example I mentioned. For a windows application, as an excercise, imagine the code you’d have to write to monitor the input of the textbox, to detect that the user has stopped typing for at least 2 seconds, do the input sanitation and finally execute some action.

This is how it could be done using Rx:

1
2
3
4
5
6
    this.WhenAnyValue(x => x.SearchText)
        .Throttle(TimeSpan.FromSeconds(2))
        .Select(query => query?.Trim())
        .DistinctUntilChanged()
        .Where(query => !string.IsNullOrWhiteSpace(query))
        .InvokeCommand(ExecuteSearch);

This example, from a WPF application, uses the ReactiveUI MVVM framework (which is excellent by the way) to monitor the SearchText property on a viewmodel using the WhenAnyValue method. The rest of the code is quite self-explanatory, no timers, no stopwatches, just a declaration of a pipeline with filters, transformations and actions.

It is very tempting for me to jump into more specifics of these examples, but I’m not going to in this post. I intent to get there in a later post, but for now, allow me to walk you through the basics.

Hello world

1
2
    IObservable<int> range = Observable.Range(1, 5);
    var subscription = range.Subscribe(number => Console.Write($"{number} ");

This example creates an observable that will produce a range of numeric values. Its subscriber then writes each value to the console as it is produced, the output in this case being:

1 2 3 4 5

The observable/subscriber behavior can als be expressed using a marble diagram, which for this example would look like this:

logo SVG

Every colored circle indicates a produced value, the vertical line at the end indicates completion of the sequence. There’s more to it than this, but this will do for now. For me, visualizing observable sequences has been the key factor in understanding them so I highly recommend it when your observables get more complex.

Mind the temperature

Not all observables react to a subscription in the same way. There are 2 types of observable behavior: Cold and Hot. Think of subscribing to a hot observable like turning on the radio; if you’re late, you’re going to miss the start of your favorite show. In this analogy, subscribing to a cold observable is more like listening to a podcast on spotify. You can press play at any time, and it will start at the beginning every time you do. The example above (Observable.Range) is creating a cold observable.

Cold observables don’t do anything until an observer subscribes to it. Whenever a subscription is created, the observable’s initialisation logic will be executed. This can be very convenient, as it makes subscription results very predictable. It can, however, be costly in terms of resource consumption. If, for example, a timer-based cold observable is subscribed to 10 times, it will result in 10 timers running in the background. This is hot observables shine. The hot obserable’s start logic is not dependent on subscriptions being made, meaning that it can start producing values before any subscriber is listening to it. This does mean that any logic that is executed to produce values only has to be executed once, no matter how many subscribers it has.

Subscribing

The subscribe method will start to observe the observable. The example above shows how to watch for new values, but that’s not all to observe. Besides values, observables can produce errors and completion signals. Subscribing to all three looks like this:

1
2
3
4
5
    var subscription = range.Subscribe( 
        number => Console.WriteLine(number),  
        (e) => Console.WriteLine("Error: {0}", e.Message), 
        () => Console.WriteLine("Completed!")
    );

We’ve already seen the first parameter in action: the onNext delegate, which is called whenever a new value is produced by the source observable. The second parameter will be invoked when an uncaught exception occurs in the source observable, or in one of the operators following it. When this occurs, the observable can be regarded as completed, no values will ever be produced again until it is subscribed to again.

Creating observables

There are numerous ways to create observables, I’ll single out a few of my favorites.

Observable.Create

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
    var timedRange = Observable.Create<int>((IObserver<int> observer) =>
    {
        observer.OnNext(1);
        Thread.Sleep(500);
        observer.OnNext(2);
        Thread.Sleep(500);
        observer.OnNext(3);
        observer.OnCompleted();    
        return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));   
    });

This creates an observable that, whenever it is subscribed to, will run the provided delegate with the subscriber as its parameter. Within the delegate, you can basically do anything, making this a very versatile option. For instance, you can make requests to an API every few seconds, and then publish the result in the onNext(). The delegate returns an IDisposable (or a Func<IDisposable>) that the subscribing side can use to properly dispose of the subscription whenever it is no longer needed.

Observable.FromEventPattern

This one basically allows us to handle good-ol’ events in an observable manner:

1
2
3
    var keyPresses = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
            h => KeyPress += h, h => KeyPress -= h)
        .Select(k => k.EventArgs.KeyChar)      

Other Convenient Factory methods

Besides Observable.Create, there are couple of other factory methods deserve some attention as I’ve found them especially convenient for providing defaults, or as mocks in unit tests;

  • Empty: Creates an empty observable completes immediately.
  • Never: Creates an empty observable that never completes.
  • Return: Creates an observable that produces a single value.

Manipulating the sequence

The most commonly used way to manipulate an observable stream are filtering an transforming, which can be done with the extension methods found in System.Reactive.Linq. Let’s expand on our previous example:

1
2
3
4
5
    var keyPresses = Observable.FromEventPattern<KeyPressEventHandler, KeyPressEventArgs>(
            h => KeyPress += h, h => KeyPress -= h)
        .Select(k => k.EventArgs.KeyChar);
    var onlyDigits = keyPresses.Where(character => char.IsDigit(character));
    var threeTimes = onlyDigits.Select(c => $"{int.Parse(c.ToString()) * 3}");

Here, we take the keyPresses observable, and then filter it so it only produces digits. The resulting observable is then transformed, taking the digit, and multiplying it by 3. The marble representation of these observables would look like this:

logo SVG

Another commonly used operation is to combine an observable with another observable. Again, there are many ways to do this, my personal go-to operator is CombineLatest. let’s combine 2 numeric observables (observableA and observableB), and multiply their latest values with each other:

1
    var combination = Observable.CombineLatest(observableA, observableB, (a, b) => a * b);

logo SVG

The most important thing to know about CombineLatest is that it will not produce anything until all of the source streams have produced at least one value. In this case, it means no updates until the 4th frame. Let’s fix that for this example, by applying StartWith to the second source observable:

1
    var combination = Observable.CombineLatest(observableA, observableB.StartWith(0), (a, b) => a * b);

logo SVG

This, we get a result from the start.

Wrap up

This was a (very) brief introduction into absolute basics of Reactive Extensions for .NET. I intend to do more posts on Rx, in which I will elaborate a bit more on more complex scenario’s, scheduling and unit testing, other suggestions can be left in the comments section. As stated before, there are a lot of other operators available in the reactive paradigm, most of them available in all mainstream programming languages. I highly recommend to have a look at reactivex.io, where you can find API documentation on all Rx operators in 13 programming languages.

Resources:

comments powered by Disqus