Saturday, February 06, 2010

I have finally got around to taking a look at this very interesting framework by DevLabs at Microsoft. What it promises to do is amazing, namely to revolutionize certain types of event driven (i.e. asynchronous) applications by allowing to developer to use the familiar LINQ paradigm to manage multiple streams of incoming asynchronous events.

There are already a fair number of videos about Rx available on Channel 9. In my opinion, this Introduction by Erik Meijer is the best starting point, followed by Wes Dyer’s Hello World equivalent. Following these there are a number of In Depth videos, which are indexed on the Team Blog. I cannot hope to describe Rx and its API anywhere near as well as those guys, so for the rest of this post will assume you have at least skimmed through those intros.

Getting Started

For this quick test, I downloaded the .NET Framework 3.5 SP1 variant and installed it. The install goes to Program Files\Microsoft Reactive Extensions\Redist\DesktopV2 and includes four assemblies and matching help files.

I crack open Visual Studio 2008 SP1 and create a new windows console application (oh yeah, we are pushing the boat out with this demonstration). Add references to the Rx framework assemblies:

AssembliesHello World

Lets just get straight in there and create a real hello world application using the Rx framework:

static void Main(string[] args)
{
    var xs = Observable.Return("Hello world");
    var sb = xs.Subscribe(msg => Console.WriteLine(msg));

    Console.WriteLine("Press any key to exit");
    Console.ReadKey();

    sb.Dispose();
}

In this trivial snippet, we call Observable.Return which creates an IObservable whose sole purpose is to push out a single string value and then terminate. In itself this would achieve nothing unless we create something which can subscribe to this stream, so we call Subscribe on this IObservable and pass in a lambda which takes the message content (a string) and writes it to the console. So we get:

HelloWorld-output

This is a start, but is not exactly stretching the capabilities of Rx so far. Over the next few demos, hopefully we can start to see some of the real possibilities.

Counting

The above demo did one thing and then stopped. For real excitement, lets now create an IObservable which just keeps pushing out events forever:

var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var sb = xs.Subscribe(i => Console.WriteLine("{0}: {1}", 
                               DateTime.Now.ToString("mm:ss"), i));

The call to Observable.Interval gives us an IObservable which will sit there pushing events continuously. It starts with an integer value of 0, and after each of the defined intervals it increments this and pushes it out again.

IncrementOne-Output

By itself this appears to be a relatively useless feature, but we will see later that in conjunction with some of the more advanced facilities offered by Rx, this can be quite powerful as a means to quantize some other stream of events.

Advanced Counting

For this section, lets first start by creating two separate streams of events with Observable.Interval.

var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var ys = Observable.Interval(TimeSpan.FromMilliseconds(200));

var sb1 = xs.Subscribe(i => Console.WriteLine("{0}: {1} (xs)",
                                DateTime.Now.ToString("mm:ss"), i));
var sb2 = ys.Subscribe(i => Console.WriteLine("{0}: {1} (ys)",
                                DateTime.Now.ToString("mm:ss"), i));

IncrementTwo-Output

As expected, we have two streams outputting results, one ticking five times faster than the other. If we want to synchronize these, we have two options, either throttle each stream so that we output a single event only when both streams have a new event available, or alternatively each time one of the streams pushes a new event we combine that with the most recent event of the other stream. These features are provided by the Zip and the CombineLatest methods respectively.

Zipping Up

One of the nice touches in the videos linked above is the concept of marble diagrams. These do a very good job of visualizing some potentially mind-melting concurrency scenarios, and also do a good job in showing what each of the Rx operators achieve. The simple (no failures) scenario for Zip is as follows:

Zip-Marble Each blob represents some event occurring with xs and ys being the input streams in this case. Stream zs is created by zipping xs with ys and combining the two events using the function ‘f’.

var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var ys = Observable.Interval(TimeSpan.FromMilliseconds(200));
var zs = xs.Zip(ys, (x, y) => String.Format("({0},{1})", x, y));

var sb = zs.Subscribe(i => Console.WriteLine("{0}: {1} (zs)",
                               DateTime.Now.ToString("mm:ss"), i));

In this method we introduce the call to Zip, which combines events from the input streams, and created new events using the lambda. In this case we simply return a string representation of the tuple.

Zip-Output

As we can see here, the 200ms stream is effectively throttled back to the speed of the slowest stream. In many cases this may be the desired result, but consider if the two streams were different types of financial market data, one remains up to date whilst the other gets more and more stale, after one hour of this, one of the streams would be feeding data 48 minutes old.

CombineLatest

So to solve this problem, another operator, CombineLatest, steps in. The marble diagram below shows that the same type of function is performed to pair the events, but the means for achieving this is quite different:

CombineLatest-Marble

In this case, whenever we get an event on either stream, we combine that with the most recent event (or wait until there is one) on the other stream. So once we have data on both streams, the combined event stream pushes events as fast as the fastest of the input streams.

var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var ys = Observable.Interval(TimeSpan.FromMilliseconds(200));
var zs = xs.CombineLatest(ys, (x, y) => String.Format("({0},{1})", x, y));

var sb = zs.Subscribe(i => Console.WriteLine("{0}: {1} (zs)",
                               DateTime.Now.ToString("mm:ss"), i));

CombineLatest-Output

Summary

What I have briefly looked at is just the tip of the iceberg in terms of functionality. There are numerous other methods in the framework for dealing with events, and many ways in which they can be composed to achieve some very powerful results. Also, I have only looked at success scenarios so far, and Rx has much to give in terms of dealing with exceptions and retry policies on particular event streams.

Rx
Saturday, February 06, 2010 1:35:55 PM (GMT Standard Time, UTC+00:00)  #    Comments [0]  | 
Name
E-mail
Home page

Comment (Some html is allowed: a@href@title, strike) where the @ means "attribute." For example, you can use <a href="" title=""> or <blockquote cite="Scott">.  

Enter the code shown (prevents robots):

Live Comment Preview

Theme design by Jelle Druyts