## Sipping from the twitter stream

I’ve been playing around with connecting to twitter’s streaming API, and displaying a live stream of tweets returned.
To do this, I was originally using DotNetOpenAuth, along with the HttpClient, which worked fine for the sample stream, but would return authentication errors for the filtered stream. I looked at the HTML message in fiddler2, and the oauth parameters weren’t ordered, which twitter requires. Instead, I’m using TwitterDoodle, which uses HttpClient.
The C#5/.NET4.5 async/await code is quite elegant – it’s not CPU intensive so it’s fine running on the UI thread, without blocking. My first instinct prior to C#5 would have been to use RX for this, but now if I’m doing something simple I’d stick to async/await, only using RX if doing something more complex like buffering or batching.

    private async void ProcessTweets()    {      using (var t = new TwitterConnector())      {        var response = await t.GetSampleFirehoseConnection();        var res = await response.Content.ReadAsStreamAsync();        using (var streamReader = new StreamReader(res, Encoding.UTF8))        {          // streamReader.EndOfStream can block, so instead check for null          while (!cts.IsCancellationRequested)          {            var r = await streamReader.ReadLineAsync();            if (r == null) { return; }            ProcessTweetText(r);          }        }      }    }     private void ProcessTweetText(string r)    {      if (!string.IsNullOrEmpty(r))      {        var tweetJToken = JsonConvert.DeserializeObject<dynamic>(r);        var tweetObj = tweetJToken["text"];        if (tweetObj != null)        {          var tweetText = tweetObj.ToString();          viewModel.Items.Add(tweetText);        }      }    }

The equivalent F# async code obviously looks quite similar, with the added goodness of Type Providers. Time dependent, I am planning to do some more analysis of tweets which would be a good fit for F#.

type tweetProvider = JsonProvider<"SampleTweet.json", SampleList=true> type MainWindowViewModel() =  inherit ViewModelBase()  let items = new ObservableCollection<string>()  member x.Items    with get () = items  member x.ProcessTweet tweet =    let tweetParsed = tweetProvider.Parse(tweet)    match tweetParsed.Text with    | Some(v) ->  x.Items.Add v    | None -> ()  member x.ProcessTweets =    let a = async {      use t = new TwitterConnector()      let! response = t.GetSampleFirehoseConnection() |> Async.AwaitTask      let! result = response.Content.ReadAsStreamAsync() |> Async.AwaitTask      use streamReader = new StreamReader(result, Encoding.UTF8)      let rec processTweetsAsync (s:StreamReader) =        async {          let! r = s.ReadLineAsync() |> Async.AwaitTask          // streamReader.EndOfStream can block, so instead check for null          if r <> null then            x.ProcessTweet r            return! processTweetsAsync s        }      do! processTweetsAsync streamReader    }    a |> Async.StartImmediate  member x.GoCommand =     new RelayCommand ((fun canExecute -> true),       (fun action -> x.ProcessTweets))

## TPL Dataflow

This blog post is coming a couple of weeks late due to my copy of Windows becoming corrupted – and strangely the only things I didn’t have backed up were this Oscilloscope app!

Anyway, last time I took a look at charting performance, but this post will investigate TPL Dataflow.

In my original F# post, I discussed that I felt that the code would have been better implemented in RX, which I then went and did in this post. However, RX isn’t the only recent new technology coming out of Microsoft that deals with streams of data. TPL Dataflow has a confusingly similar overlap with the usages of RX, and this is what its whitepaper has to say:

“Astute readers may notice some similarities between TPL Dataflow and Reactive Extensions (Rx), currently available as a download from the MSDN Data developer center. Rx is predominantly focused on coordination and composition of event streams with a LINQ-based API, providing a rich set of combinators for manipulating IObservable<T>s of data. In contrast, TPL Dataflow is focused on providing building blocks for message passing and parallelizing CPU- and I/O-intensive applications with high-throughput and low-latency, while also providing developers explicit control over how data is buffered and moves about the system. As such, Rx and TPL Dataflow, while potentially viewed as similar at a 30,000 foot level, address distinct needs. Even so, TPL Dataflow and Rx provide a better together story.”

That does sound very interesting – who doesn’t want better performance!

### Implementation

I’ll dive in straight away and look at some code. The code is structured similarly to the RX code in my previous post.

The microphone still provides an IObservable<float[]> via the GetStream() method. The TaskSchedulers are specified explicitly throughout, so that the code can be unit tested (I’m using a MockSynchronizationContext).

The transformManyBlock behaves in the same way to the SelectMany operator, it separates the float array into its constituent parts.

The broadcast block is analogous to a connectable observable – many blocks can be linked to it to receive the same messages.

The transform many block is then explicitly linked to the broadcast block, meaning that its outputs make it into the broadcast block’s input.

#### Oscilloscope/Buffered View

The code to deal with the buffered display of data is:

The batch block behaves the same as LINQ’s Buffer operator (a bit confusingly when TPL Dataflow’s Buffer means something different). This simply takes the incoming float values and packages them into an array.

The ActionBlock is the final point on the chain – it takes the values it receives and outputs them to the screen (via oscilloscopeOutput) – this is the equivalent code to the code in the Observable Subscription in the RX version of my code.

#### Sliding Window View

Again, the sliding window view is structured similarly to the previous RX code:

A sample block is created so that every n samples are fed into the SlidingWindowBlock. This returns a new float[] array representing the new window of data on every input.

The second sample block ensures that not every window of data is drawn (so that the chart is refreshed at 40fps rather than 400fps!)

Finally, the observable is subscribed to, and the messages are posted into the transformManyBlock:

Whereas the RX guidelines recommend creating new operators based on existing operators, TPL Dataflow encourages building up custom blocks (though they can be built out of combining existing blocks).

The SlidingWindowBlock is a slightly modified version of the one provided in the white paper, and here (http://msdn.microsoft.com/en-us/library/hh228606(v=vs.110).aspx) The one in the example only posts a window once it has received a full window worth of data, whereas I want to start drawing the windows as soon as data arrives.

The SampleBlock is simple, it yields a value once it has received enough inputs, discarding the intermediate ones. This is structured more efficiently than my RX implementation – that one buffers the values, and then only take the last value out of the buffer.

#### Performance

I removed the updating of the chart, to look purely at what the performance of the app is in taking the input from the microphone, and shaping it to be in the correct format for output.

RX Performance:

TPL Dataflow:

The processor usage between the apps is in the same ballpark, but the TPL Dataflow app has many more garbage collections, and # bytes in all heaps also is running slightly higher. I had a quick look in the profiler, and it seems that there are many Task allocations from the Dataflow.Post() operation from within the SlidingWindow and Sample implementations.

The extra generation 1 collections don’t really matter for this application with the functionality that it currently has – the % time in GC barely registers, and there are no Gen 2 garbage collections over the minute or so that the app was running.

Once the app is doing more calculations and drawing more complex charts it would be interesting to see whether any latency spikes due gen 2 GCs cause similar slowdowns to the one discussed at the start of my previous post.

It would be an interesting exercise to limit the amount of GCs throughout the app, for instance there’s no need for the microphone access observable to return an IObservable<float[]> instead of IObservable<float>; currently every read from the microphone allocates a new float[]. Similarly, new List<Point> are created to more easily interface with DynamicDataDisplay – it would be better to change the types of data that D3 takes to be more observable-friendly, and to save having so many allocations. Again, there’s not much point doing this, other than an interesting theoretical exercise, until the garbage collection overhead proves to be a performance issue.

##### Conclusion

For an application as simple as this, there isn’t any benefit to using TPL Dataflow – it is a powerful library, with functionality such as blocks being able to decline offered blocks, and request them later, which would be difficult to implement in RX. As my app doesn’t (currently) need that level of functionality, there’s no benefit to using the library.

I may revisit this in the future – if I had some computationally expensive operation (FFT for instance) where I’d want greater control over the flow of data through the system.

## Oscilloscope using RX and C# Async CTP

In my last blog post I described the implementation of a simple ‘oscilloscope app’ in F#, as I wanted to see how the code would be structured using only F# idioms (http://taumuon-jabuka.blogspot.com/2012/01/visualising-sound.html )

My natural instinct would have been to implement it using the Reactive Extensions for .NET (RX Framework), but I first wanted to investigate a pure F# solution. This post describes an alternate RX implementation.

Similar to my last post, I’ll describe the code inside-out.

### Creating the Observable

This code returns an IObservable with a float array payload, representing each read from the CaptureBuffer. The implementation could have internally started a dedicated Thread from which to push out values, but instead I’m using the new C# 5 async functionality (using the Async CTP Update 3), so that my code looks pretty similar to the previous F# example.

The observable takes a CancellationToken, whose IsCancellationRequested is set to true once all subscriptions to the observable have been disposed. The CompositeDisposable returned out from the lambda is also disposed of at that point.

The code loops while its CancellationToken has not been cancelled, asynchronously awaiting for one of the WaitHandles to be set, and then it reads from the buffer. The value is pushed out to subscribers in the OnNext.

The ConvertByteArrayToFloatArray() is trivial:

### Subscriptions to the observable

First off, the float array returned from the observable is decomposed into the individual values, using a SelectMany, as sampling, buffering and windowing operations all operate on a stream of floats. Then, the observable is published to an IConnectableObservable. The microphone access returns a Cold observable, meaning that each subscriber to it would end up creating their own microphone access. This would work (the CaptureBuffer doesn’t prevent this), but the connectable observable means that instead all clients share the same observable, and ensures that they all see the same values (so that the traces on the two charts are in sync).

The RefCount() means that when all subscriptions to the observable IConnectableObservable variable have been disposed of then the subscription to the underlying observable will also be disposed.

The top ‘oscilloscope’ trace is a simple Observable.Buffer() over the data stream. There is no need to ObserveOn the dispatcher thread as the subscription occurs on the UI thread. Using RX, it would be easy to schedule various parts of the work onto different threads, but I’ll discuss this in a later article (I want to keep everything on the UI thread for now to compare performance with the single threaded F# implementation).

All subscriptions are added to a CompositeDisposable member in the ViewModel – the Stop button’s command implementation disposes of this, which causes all subscriptions to be disposed of, and so the microphone access loop to be terminated via its CancellationToken.

The windowing operation simply samples every 100 data points, and from those sampled data points takes a sliding window of 1000 values to display. The windowCount variable is closed over to allow the y axis to be continually updated.

The Sample operator is simple, but not particularly efficient – it takes a buffer (i.e a non-overlapping window) of count values, and then takes the last value in the buffer.

The WindowWithCount operator is the same one I discussed at http://taumuon-jabuka.blogspot.com/2011/07/rx-framework-performance-awareness.html (with implementation grabbed from http://social.msdn.microsoft.com/Forums/en-US/rx/thread/37428f58-f241-45b3-a878-c1627deb9ac4#bcdc7b79-bbde-4145-88e4-583685285682 )

And as I also talked about in that post, the RX guidelines recommend implementing an operator in terms of existing operator. There’s only one problem in this case, it’s quite slow, (I’ll get quantitative figures on this in a future blog post discussing performance of all approaches).

The following is faster (again, I know more specific figures are needed):

### Comparing implementations

For me, the RX solution is cleaner than the F# solution, and easier to follow. I did implement my F# solution in quite an imperative way though, and should have perhaps used AsyncSeq or mailbox processors, but as reading from the microphone is a push-based activity, none of those solutions would be as clean as RX (of course I haven’t covered using RX in F#). The F# version is much faster, and I’ll take a big more of a dig into performance in an upcoming blog post.

## Family tree timelines

As I talked about in my previous post, I created a simple family tree visualisation program to let me know which of my ancestors I have the most interest for.

I also prototyped up a timeline view – in the above image. It’s probably pretty obvious that it’s a prototype, as there are no labels to identify any of the individuals.

The idea for this came about as genesreunited has a non-validated freeform text field for all entries of dates. There’s no way to validate whether this information is valid for GEDCOM export – the GEDCOM spec lets various dates, such as approximate dates, bounded dates, dates within a specific quarter etc. to be specified but obviously the date has to be specified in the correct format.

I wanted to check that all dates were both in the correct format, and actually valid (i.e. check for nonsensical dates such as parents born after their children etc).

The idea behind the colouring is for green to show a GEDCOM format valid date, yellow to specify a missing date (with the date inferred), and red to indicate an invalid date.

The opacity is to indicate the ‘confidence’ in a date – with a specified date range not being at full opacity. Also, the dates can be inferred using a set of rules (e.g. parents should be at least 12 years older than their children, a child is probably born within a year of his or her baptism, the first child is born within a year of his or her parents marriage etc.). These rules could obviously get quite complicated.

The layout matches pretty much the layout of the ancestor chart, with ancestors being adjacent to each other. I was fussing over this a little bit – I thought it’d be nice for the descendents on the chart to be nearest to each other on the x-axis), but there’s no way for this to happen through the tree.

I was feeling pretty happy with this, and thought that it’s probably worth putting into the app (with some tweakes such as making the y-axis an adorner layer that adjusts with scale etc), but then I found that there’s some software which has a pretty-much identical view to this (I did google around for this before implementing, I obviously didn’t google hard enough).

Progeny Genealogy has an identical layout (OK, turned on its side). It even has the same idea of using opacity to indicate what data is estimated (not varying opacity, but even still). I guess that there’s only so many ways to solve a problem, but it’s still gutting when you think you’ve had an original idea!