Sipping from the twitter stream

I’ve been playing around with connecting to twitter’s streaming API, and displaying a live stream of tweets returned.
To do this, I was originally using DotNetOpenAuth, along with the HttpClient, which worked fine for the sample stream, but would return authentication errors for the filtered stream. I looked at the HTML message in fiddler2, and the oauth parameters weren’t ordered, which twitter requires. Instead, I’m using TwitterDoodle, which uses HttpClient.
The C#5/.NET4.5 async/await code is quite elegant – it’s not CPU intensive so it’s fine running on the UI thread, without blocking. My first instinct prior to C#5 would have been to use RX for this, but now if I’m doing something simple I’d stick to async/await, only using RX if doing something more complex like buffering or batching.

    private async void ProcessTweets()
    {
      using (var t = new TwitterConnector())
      {
        var response = await t.GetSampleFirehoseConnection();
        var res = await response.Content.ReadAsStreamAsync();
        using (var streamReader = new StreamReader(resEncoding.UTF8))
        {
          // streamReader.EndOfStream can block, so instead check for null
          while (!cts.IsCancellationRequested)
          {
            var r = await streamReader.ReadLineAsync();
            if (r == null) { return; }
            ProcessTweetText(r);
          }
        }
      }
    }

    private void ProcessTweetText(string r)
    {
      if (!string.IsNullOrEmpty(r))
      {
        var tweetJToken = JsonConvert.DeserializeObject<dynamic>(r);
        var tweetObj = tweetJToken["text"];
        if (tweetObj != null)
        {
          var tweetText = tweetObj.ToString();
          viewModel.Items.Add(tweetText);
        }
      }
    }

The equivalent F# async code obviously looks quite similar, with the added goodness of Type Providers. Time dependent, I am planning to do some more analysis of tweets which would be a good fit for F#.

type tweetProvider = JsonProvider<"SampleTweet.json"SampleList=true>

type MainWindowViewModel() =
  inherit ViewModelBase()
  let items = new ObservableCollection<string>()
  member x.Items
    with get () = items
  member x.ProcessTweet tweet =
    let tweetParsed = tweetProvider.Parse(tweet)
    match tweetParsed.Text with
    | Some(v->  x.Items.Add v
    | None -> ()
  member x.ProcessTweets =
    let a = async {
      use t = new TwitterConnector()
      let! response = t.GetSampleFirehoseConnection() |> Async.AwaitTask
      let! result = response.Content.ReadAsStreamAsync() |> Async.AwaitTask
      use streamReader = new StreamReader(resultEncoding.UTF8)
      let rec processTweetsAsync (s:StreamReader=
        async {
          let! r = s.ReadLineAsync() |> Async.AwaitTask
          // streamReader.EndOfStream can block, so instead check for null
          if r <> null then
            x.ProcessTweet r
            return! processTweetsAsync s
        }
      do! processTweetsAsync streamReader
    }
    a |> Async.StartImmediate
  member x.GoCommand = 
    new RelayCommand ((fun canExecute -> true), 
      (fun action -> x.ProcessTweets))

2 thoughts on “Sipping from the twitter stream”

  1. Nice post, Gary.

    Neat implementation with Async, although I think I would still reach for Rx.

    Once you've got yourself an Observable of tweets, you could do all kinds of stuff on it like time windows, filtering, grouping by hashtags, selecting related information from other sources, etc.

    What's your plan for using the twitter connectivity? Or it's just to try it out?

  2. Hey Niall,
    I just posted a follow-up showing what I'm doing – it's not real-time so didn't use RX.

    There are loads of cool things you could do with RX and the Twitter API, I was thinking of real-time correlations between hashtags (could be interesting performance work – investigate GPGPU etc)…

Comments are closed.