Monday, February 08, 2010

Continuing on from my first play with the Rx framework yesterday, I thought it would be good to come up with a real world example. I thought about rss feeds and stock tickers, but in the end decided on an app with consumes the Twitter REST API.

I will build up an application which can subscribe to a couple of twitter news feeds, polling for new messages on a particular schedule, and merging the results into a single output stream to which we subscribe.

Accessing the Twitter API

I am going to keep this very simple, just access the user_timeline service for a given user, and get the results in XML format, which make it easy for us to project into a Tweet class which we define. I create a class called TwitterAccess with one key static method:

public static Tweet[] GetNewTweets(string username, long lastTweetId)
{
    if(lastTweetId < 1)
        throw new ArgumentOutOfRangeException("lastTweetId", "id must be greater than 0");

    string url = String.Format("http://twitter.com/statuses/user_timeline/{0}.xml?since_id={1}",
                               username.Replace("@", ""), lastTweetId);
    var doc = XDocument.Load(url);

    var items = from s in doc.Descendants("status")
                orderby long.Parse(s.Element("id").Value)
                select new Tweet { Id = long.Parse(s.Element("id").Value),
                                   Time = ParseTwitterDate(s.Element("created_at").Value),
                                   Username = s.Element("user").Element("screen_name").Value,
                                   Status = s.Element("text").Value };
     
    return items.ToArray();
}

This method gets all tweets on the given users timeline whose ID is greater than lastTweetId (although the default limit in twitter is 20 tweets, you will need to specify the count parameter in the rest url if you need more, but this suits our purpose).

The Tweet class just wraps the properties you can see above, and the ParseTwitterDate method just does a DateTime.ParseExact for the specific date format used in twitter.

Wrapping Twitter in an IObservable<T>

Now that we have a means to access twitter, we should create a class which handles the polling of this service and pushes all new tweets out through an IObservable<Tweet> as follows:

public class TweetStream
{
    private long lastTweet = 1;
    private object updateLock = new object();
    private IObservable<Tweet> stream;

    public IObservable<Tweet> Stream
    {
        get { return stream; }
    }

    public TweetStream(string username, int interval)
    {
        var xs = Observable.Interval(TimeSpan.FromSeconds(interval));
        stream = xs.SelectMany(count =>
            {
                Tweet[] items;
                lock (updateLock)
                {
                    items = TwitterAccess.GetNewTweets(username, lastTweet);
                    lastTweet = items.Length > 0 ? items.Last().Id : lastTweet;
                }
                return items.ToObservable();
            });
    }
}

The SelectMany call allows us to define a function which is called each time the interval fires and which itself returns an IObservable. Note that we must use SelectMany instead of Select as SelectMany will automatically flatten all the items in the returned stream into its own stream of Tweets whereas Select would leave you with a stream of IObservables, which will then be difficult to handle.

Getting some output from this stream

I can now update the console app we looked at yesterday in order to create an instance of TweetStream and subscribe to its output, writing this in turn out to the console.

TweetStream bbc = new TweetStream("bbcnews", 120);

var sub = bbc.Stream.Subscribe(i => Console.WriteLine("{0}: {1}: {2}",
                                i.Time.ToString("dd hh:mm"), i.Username, i.Status));

Running this up we get the following output:

07 08:10: bbcnews: A man who was injured during a Premier League football match at Stoke City dies in hospital.  http://
bit.ly/bDjtDM
07 09:00: bbcnews: Worst snow storms to hit eastern US for decades  http://bit.ly/93Jpn9
07 09:00: bbcnews: Iran's leader asks the country's nuclear chief to begin enriching uranium to 20%, in a new challenge
to Western.. http://bit.ly/9AUplq
...
07 02:20: bbcnews: A Scottish pensioner becomes only the seventh person in history to achieve judo's highest rank. http:
//bit.ly/8XRsxv
07 02:40: bbcnews: Scotland take on France at Murrayfield in the third of the opening round of Six Nations fixtures. htt
p://bit.ly/98KHLc

This is very cool – without much code we now have a live twitter feed from a single source, which will automatically bring in new tweets (if available) every two minutes. As a final exercise I will attempt to merge two twitter news feeds into one single stream.

Merging Twitter Feeds

It is a simple matter now to update the above code to create two input feeds and then merge them:

TweetStream bbc = new TweetStream("bbcnews", 120);
TweetStream cnn = new TweetStream("cnn", 120);

var news = bbc.Stream.Merge(cnn.Stream);

var sub = news.Subscribe(i => Console.WriteLine("{0}: {1}: {2}",
                                i.Time.ToString("dd hh:mm"), i.Username, i.Status));

Just by calling the Merge method, the output from the two streams gets combined into a single stream. Note that the tweets from a given stream remain in order, but the relative ordering between the two streams depends on how they are scheduled to execute, the merge doesn’t work based on the feed dates or Ids – this is left as an exercise for the reader.

07 12:00: bbcnews: England and Wales are drawn in the same qualifying group for Euro 2012 in Poland and the Ukraine. htt
p://bit.ly/cCEza2
07 12:00: bbcnews: Home Secretary Alan Johnson rejects the idea that MPs facing expenses charges might avoid trial by ci
ting Parli.. http://bit.ly/9tk3X9
...
07 03:20: bbcnews: A court in Libya dismisses a case against a Swiss businessman who was accused of illegal business act
ivities. http://bit.ly/da0kFc
07 03:20: bbcnews: Police have launched an inquiry after a man's body was found on a main road in rural Stirlingshire. h
ttp://bit.ly/9xHMGp
06 01:06: CNN: Toyota: Apology but no new recall. http://on.cnn.com/cxQQrj
06 01:31: CNN: All military health facilities get Plan B pill. http://on.cnn.com/dbrZox
...
06 11:26: CNN: Tonight 9 ET: Watch the Tea Party Convention's keynote address by Sarah Palin live on CNN, CNN.com and CN
N's iPhone App.
07 11:29: CNN: Residents shovel out from heavy snow as blizzard conditions sweep across the eastern U.S.: http://bit.ly/
cFmAiU
07 11:30: CNN: Steps turn into slopes:  http://bit.ly/cyVbMM
Rx
Monday, February 08, 2010 10:32:02 PM (GMT Standard Time, UTC+00:00)  #    Comments [0]  | 
Saturday, February 06, 2010

I have finally got around to taking a look at this very interesting framework by DevLabs at Microsoft. What it promises to do is amazing, namely to revolutionize certain types of event driven (i.e. asynchronous) applications by allowing to developer to use the familiar LINQ paradigm to manage multiple streams of incoming asynchronous events.

There are already a fair number of videos about Rx available on Channel 9. In my opinion, this Introduction by Erik Meijer is the best starting point, followed by Wes Dyer’s Hello World equivalent. Following these there are a number of In Depth videos, which are indexed on the Team Blog. I cannot hope to describe Rx and its API anywhere near as well as those guys, so for the rest of this post will assume you have at least skimmed through those intros.

Getting Started

For this quick test, I downloaded the .NET Framework 3.5 SP1 variant and installed it. The install goes to Program Files\Microsoft Reactive Extensions\Redist\DesktopV2 and includes four assemblies and matching help files.

I crack open Visual Studio 2008 SP1 and create a new windows console application (oh yeah, we are pushing the boat out with this demonstration). Add references to the Rx framework assemblies:

AssembliesHello World

Lets just get straight in there and create a real hello world application using the Rx framework:

static void Main(string[] args)
{
    var xs = Observable.Return("Hello world");
    var sb = xs.Subscribe(msg => Console.WriteLine(msg));

    Console.WriteLine("Press any key to exit");
    Console.ReadKey();

    sb.Dispose();
}

In this trivial snippet, we call Observable.Return which creates an IObservable whose sole purpose is to push out a single string value and then terminate. In itself this would achieve nothing unless we create something which can subscribe to this stream, so we call Subscribe on this IObservable and pass in a lambda which takes the message content (a string) and writes it to the console. So we get:

HelloWorld-output

This is a start, but is not exactly stretching the capabilities of Rx so far. Over the next few demos, hopefully we can start to see some of the real possibilities.

Counting

The above demo did one thing and then stopped. For real excitement, lets now create an IObservable which just keeps pushing out events forever:

var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var sb = xs.Subscribe(i => Console.WriteLine("{0}: {1}", 
                               DateTime.Now.ToString("mm:ss"), i));

The call to Observable.Interval gives us an IObservable which will sit there pushing events continuously. It starts with an integer value of 0, and after each of the defined intervals it increments this and pushes it out again.

IncrementOne-Output

By itself this appears to be a relatively useless feature, but we will see later that in conjunction with some of the more advanced facilities offered by Rx, this can be quite powerful as a means to quantize some other stream of events.

Advanced Counting

For this section, lets first start by creating two separate streams of events with Observable.Interval.

var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var ys = Observable.Interval(TimeSpan.FromMilliseconds(200));

var sb1 = xs.Subscribe(i => Console.WriteLine("{0}: {1} (xs)",
                                DateTime.Now.ToString("mm:ss"), i));
var sb2 = ys.Subscribe(i => Console.WriteLine("{0}: {1} (ys)",
                                DateTime.Now.ToString("mm:ss"), i));

IncrementTwo-Output

As expected, we have two streams outputting results, one ticking five times faster than the other. If we want to synchronize these, we have two options, either throttle each stream so that we output a single event only when both streams have a new event available, or alternatively each time one of the streams pushes a new event we combine that with the most recent event of the other stream. These features are provided by the Zip and the CombineLatest methods respectively.

Zipping Up

One of the nice touches in the videos linked above is the concept of marble diagrams. These do a very good job of visualizing some potentially mind-melting concurrency scenarios, and also do a good job in showing what each of the Rx operators achieve. The simple (no failures) scenario for Zip is as follows:

Zip-Marble Each blob represents some event occurring with xs and ys being the input streams in this case. Stream zs is created by zipping xs with ys and combining the two events using the function ‘f’.

var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var ys = Observable.Interval(TimeSpan.FromMilliseconds(200));
var zs = xs.Zip(ys, (x, y) => String.Format("({0},{1})", x, y));

var sb = zs.Subscribe(i => Console.WriteLine("{0}: {1} (zs)",
                               DateTime.Now.ToString("mm:ss"), i));

In this method we introduce the call to Zip, which combines events from the input streams, and created new events using the lambda. In this case we simply return a string representation of the tuple.

Zip-Output

As we can see here, the 200ms stream is effectively throttled back to the speed of the slowest stream. In many cases this may be the desired result, but consider if the two streams were different types of financial market data, one remains up to date whilst the other gets more and more stale, after one hour of this, one of the streams would be feeding data 48 minutes old.

CombineLatest

So to solve this problem, another operator, CombineLatest, steps in. The marble diagram below shows that the same type of function is performed to pair the events, but the means for achieving this is quite different:

CombineLatest-Marble

In this case, whenever we get an event on either stream, we combine that with the most recent event (or wait until there is one) on the other stream. So once we have data on both streams, the combined event stream pushes events as fast as the fastest of the input streams.

var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var ys = Observable.Interval(TimeSpan.FromMilliseconds(200));
var zs = xs.CombineLatest(ys, (x, y) => String.Format("({0},{1})", x, y));

var sb = zs.Subscribe(i => Console.WriteLine("{0}: {1} (zs)",
                               DateTime.Now.ToString("mm:ss"), i));

CombineLatest-Output

Summary

What I have briefly looked at is just the tip of the iceberg in terms of functionality. There are numerous other methods in the framework for dealing with events, and many ways in which they can be composed to achieve some very powerful results. Also, I have only looked at success scenarios so far, and Rx has much to give in terms of dealing with exceptions and retry policies on particular event streams.

Rx
Saturday, February 06, 2010 1:35:55 PM (GMT Standard Time, UTC+00:00)  #    Comments [0]  | 

Theme design by Jelle Druyts