Sipping from the twitter stream

I’ve been playing around with connecting to twitter’s streaming API, and displaying a live stream of tweets returned.
To do this, I was originally using DotNetOpenAuth, along with the HttpClient, which worked fine for the sample stream, but would return authentication errors for the filtered stream. I looked at the HTML message in fiddler2, and the oauth parameters weren’t ordered, which twitter requires. Instead, I’m using TwitterDoodle, which uses HttpClient.
The C#5/.NET4.5 async/await code is quite elegant – it’s not CPU intensive so it’s fine running on the UI thread, without blocking. My first instinct prior to C#5 would have been to use RX for this, but now if I’m doing something simple I’d stick to async/await, only using RX if doing something more complex like buffering or batching.

    private async void ProcessTweets()
    {
      using (var t = new TwitterConnector())
      {
        var response = await t.GetSampleFirehoseConnection();
        var res = await response.Content.ReadAsStreamAsync();
        using (var streamReader = new StreamReader(resEncoding.UTF8))
        {
          // streamReader.EndOfStream can block, so instead check for null
          while (!cts.IsCancellationRequested)
          {
            var r = await streamReader.ReadLineAsync();
            if (r == null) { return; }
            ProcessTweetText(r);
          }
        }
      }
    }

    private void ProcessTweetText(string r)
    {
      if (!string.IsNullOrEmpty(r))
      {
        var tweetJToken = JsonConvert.DeserializeObject<dynamic>(r);
        var tweetObj = tweetJToken["text"];
        if (tweetObj != null)
        {
          var tweetText = tweetObj.ToString();
          viewModel.Items.Add(tweetText);
        }
      }
    }

The equivalent F# async code obviously looks quite similar, with the added goodness of Type Providers. Time dependent, I am planning to do some more analysis of tweets which would be a good fit for F#.

type tweetProvider = JsonProvider<"SampleTweet.json"SampleList=true>

type MainWindowViewModel() =
  inherit ViewModelBase()
  let items = new ObservableCollection<string>()
  member x.Items
    with get () = items
  member x.ProcessTweet tweet =
    let tweetParsed = tweetProvider.Parse(tweet)
    match tweetParsed.Text with
    | Some(v->  x.Items.Add v
    | None -> ()
  member x.ProcessTweets =
    let a = async {
      use t = new TwitterConnector()
      let! response = t.GetSampleFirehoseConnection() |> Async.AwaitTask
      let! result = response.Content.ReadAsStreamAsync() |> Async.AwaitTask
      use streamReader = new StreamReader(resultEncoding.UTF8)
      let rec processTweetsAsync (s:StreamReader=
        async {
          let! r = s.ReadLineAsync() |> Async.AwaitTask
          // streamReader.EndOfStream can block, so instead check for null
          if r <> null then
            x.ProcessTweet r
            return! processTweetsAsync s
        }
      do! processTweetsAsync streamReader
    }
    a |> Async.StartImmediate
  member x.GoCommand = 
    new RelayCommand ((fun canExecute -> true), 
      (fun action -> x.ProcessTweets))

Oscilloscope using RX and C# Async CTP

In my last blog post I described the implementation of a simple ‘oscilloscope app’ in F#, as I wanted to see how the code would be structured using only F# idioms (http://taumuon-jabuka.blogspot.com/2012/01/visualising-sound.html )

My natural instinct would have been to implement it using the Reactive Extensions for .NET (RX Framework), but I first wanted to investigate a pure F# solution. This post describes an alternate RX implementation.

Similar to my last post, I’ll describe the code inside-out.

Creating the Observable

clip_image002

This code returns an IObservable with a float array payload, representing each read from the CaptureBuffer. The implementation could have internally started a dedicated Thread from which to push out values, but instead I’m using the new C# 5 async functionality (using the Async CTP Update 3), so that my code looks pretty similar to the previous F# example.

The observable takes a CancellationToken, whose IsCancellationRequested is set to true once all subscriptions to the observable have been disposed. The CompositeDisposable returned out from the lambda is also disposed of at that point.

The code loops while its CancellationToken has not been cancelled, asynchronously awaiting for one of the WaitHandles to be set, and then it reads from the buffer. The value is pushed out to subscribers in the OnNext.

The ConvertByteArrayToFloatArray() is trivial:

clip_image003

Subscriptions to the observable

clip_image004

First off, the float array returned from the observable is decomposed into the individual values, using a SelectMany, as sampling, buffering and windowing operations all operate on a stream of floats. Then, the observable is published to an IConnectableObservable. The microphone access returns a Cold observable, meaning that each subscriber to it would end up creating their own microphone access. This would work (the CaptureBuffer doesn’t prevent this), but the connectable observable means that instead all clients share the same observable, and ensures that they all see the same values (so that the traces on the two charts are in sync).

The RefCount() means that when all subscriptions to the observable IConnectableObservable variable have been disposed of then the subscription to the underlying observable will also be disposed.

clip_image005

The top ‘oscilloscope’ trace is a simple Observable.Buffer() over the data stream. There is no need to ObserveOn the dispatcher thread as the subscription occurs on the UI thread. Using RX, it would be easy to schedule various parts of the work onto different threads, but I’ll discuss this in a later article (I want to keep everything on the UI thread for now to compare performance with the single threaded F# implementation).

All subscriptions are added to a CompositeDisposable member in the ViewModel – the Stop button’s command implementation disposes of this, which causes all subscriptions to be disposed of, and so the microphone access loop to be terminated via its CancellationToken.

clip_image006

The windowing operation simply samples every 100 data points, and from those sampled data points takes a sliding window of 1000 values to display. The windowCount variable is closed over to allow the y axis to be continually updated.

clip_image007

The Sample operator is simple, but not particularly efficient – it takes a buffer (i.e a non-overlapping window) of count values, and then takes the last value in the buffer.

The WindowWithCount operator is the same one I discussed at http://taumuon-jabuka.blogspot.com/2011/07/rx-framework-performance-awareness.html (with implementation grabbed from http://social.msdn.microsoft.com/Forums/en-US/rx/thread/37428f58-f241-45b3-a878-c1627deb9ac4#bcdc7b79-bbde-4145-88e4-583685285682 )

clip_image008

And as I also talked about in that post, the RX guidelines recommend implementing an operator in terms of existing operator. There’s only one problem in this case, it’s quite slow, (I’ll get quantitative figures on this in a future blog post discussing performance of all approaches).

The following is faster (again, I know more specific figures are needed):

clip_image009

Comparing implementations

For me, the RX solution is cleaner than the F# solution, and easier to follow. I did implement my F# solution in quite an imperative way though, and should have perhaps used AsyncSeq or mailbox processors, but as reading from the microphone is a push-based activity, none of those solutions would be as clean as RX (of course I haven’t covered using RX in F#). The F# version is much faster, and I’ll take a big more of a dig into performance in an upcoming blog post.