Sipping from the twitter stream

I’ve been playing around with connecting to twitter’s streaming API, and displaying a live stream of tweets returned.
To do this, I was originally using DotNetOpenAuth, along with the HttpClient, which worked fine for the sample stream, but would return authentication errors for the filtered stream. I looked at the HTML message in fiddler2, and the oauth parameters weren’t ordered, which twitter requires. Instead, I’m using TwitterDoodle, which uses HttpClient.
The C#5/.NET4.5 async/await code is quite elegant – it’s not CPU intensive so it’s fine running on the UI thread, without blocking. My first instinct prior to C#5 would have been to use RX for this, but now if I’m doing something simple I’d stick to async/await, only using RX if doing something more complex like buffering or batching.

    private async void ProcessTweets()  
    {  
      using (var t = new TwitterConnector())  
      {  
        var response = await t.GetSampleFirehoseConnection();  
        var res = await response.Content.ReadAsStreamAsync();  
        using (var streamReader = new StreamReader(res, Encoding.UTF8))  
        {  
          // streamReader.EndOfStream can block, so instead check for null  
          while (!cts.IsCancellationRequested)  
          {  
            var r = await streamReader.ReadLineAsync();  
            if (r == null) { return; }  
            ProcessTweetText(r);  
          }  
        }  
      }  
    }  
   
    private void ProcessTweetText(string r)  
    {  
      if (!string.IsNullOrEmpty(r))  
      {  
        var tweetJToken = JsonConvert.DeserializeObject<dynamic>(r);  
        var tweetObj = tweetJToken["text"];  
        if (tweetObj != null)  
        {  
          var tweetText = tweetObj.ToString();  
          viewModel.Items.Add(tweetText);  
        }  
      }  
    }  

The equivalent F# async code obviously looks quite similar, with the added goodness of Type Providers. Time dependent, I am planning to do some more analysis of tweets which would be a good fit for F#.

type tweetProvider = JsonProvider<"SampleTweet.json", SampleList=true>  
   
type MainWindowViewModel() =  
  inherit ViewModelBase()  
  let items = new ObservableCollection<string>()  
  member x.Items  
    with get () = items  
  member x.ProcessTweet tweet =  
    let tweetParsed = tweetProvider.Parse(tweet)  
    match tweetParsed.Text with  
    | Some(v) ->  x.Items.Add v  
    | None -> ()  
  member x.ProcessTweets =  
    let a = async {  
      use t = new TwitterConnector()  
      let! response = t.GetSampleFirehoseConnection() |> Async.AwaitTask  
      let! result = response.Content.ReadAsStreamAsync() |> Async.AwaitTask  
      use streamReader = new StreamReader(result, Encoding.UTF8)  
      let rec processTweetsAsync (s:StreamReader) =  
        async {  
          let! r = s.ReadLineAsync() |> Async.AwaitTask  
          // streamReader.EndOfStream can block, so instead check for null  
          if r <> null then  
            x.ProcessTweet r  
            return! processTweetsAsync s  
        }  
      do! processTweetsAsync streamReader  
    }  
    a |> Async.StartImmediate  
  member x.GoCommand =   
    new RelayCommand ((fun canExecute -> true),   
      (fun action -> x.ProcessTweets))