I’ve been playing around with connecting to twitter’s streaming API, and displaying a live stream of tweets returned.
To do this, I was originally using DotNetOpenAuth, along with the HttpClient, which worked fine for the sample stream, but would return authentication errors for the filtered stream. I looked at the HTML message in fiddler2, and the oauth parameters weren’t ordered, which twitter requires. Instead, I’m using TwitterDoodle, which uses HttpClient.
The C#5/.NET4.5 async/await code is quite elegant – it’s not CPU intensive so it’s fine running on the UI thread, without blocking. My first instinct prior to C#5 would have been to use RX for this, but now if I’m doing something simple I’d stick to async/await, only using RX if doing something more complex like buffering or batching.
private async void ProcessTweets()
{
using (var t = new TwitterConnector())
{
var response = await t.GetSampleFirehoseConnection();
var res = await response.Content.ReadAsStreamAsync();
using (var streamReader = new StreamReader(res, Encoding.UTF8))
{
// streamReader.EndOfStream can block, so instead check for null
while (!cts.IsCancellationRequested)
{
var r = await streamReader.ReadLineAsync();
if (r == null) { return; }
ProcessTweetText(r);
}
}
}
}
private void ProcessTweetText(string r)
{
if (!string.IsNullOrEmpty(r))
{
var tweetJToken = JsonConvert.DeserializeObject<dynamic>(r);
var tweetObj = tweetJToken["text"];
if (tweetObj != null)
{
var tweetText = tweetObj.ToString();
viewModel.Items.Add(tweetText);
}
}
}
The equivalent F# async code obviously looks quite similar, with the added goodness of Type Providers. Time dependent, I am planning to do some more analysis of tweets which would be a good fit for F#.
type tweetProvider = JsonProvider<"SampleTweet.json", SampleList=true>
type MainWindowViewModel() =
inherit ViewModelBase()
let items = new ObservableCollection<string>()
member x.Items
with get () = items
member x.ProcessTweet tweet =
let tweetParsed = tweetProvider.Parse(tweet)
match tweetParsed.Text with
| Some(v) -> x.Items.Add v
| None -> ()
member x.ProcessTweets =
let a = async {
use t = new TwitterConnector()
let! response = t.GetSampleFirehoseConnection() |> Async.AwaitTask
let! result = response.Content.ReadAsStreamAsync() |> Async.AwaitTask
use streamReader = new StreamReader(result, Encoding.UTF8)
let rec processTweetsAsync (s:StreamReader) =
async {
let! r = s.ReadLineAsync() |> Async.AwaitTask
// streamReader.EndOfStream can block, so instead check for null
if r <> null then
x.ProcessTweet r
return! processTweetsAsync s
}
do! processTweetsAsync streamReader
}
a |> Async.StartImmediate
member x.GoCommand =
new RelayCommand ((fun canExecute -> true),
(fun action -> x.ProcessTweets))
Nice post, Gary.
Neat implementation with Async, although I think I would still reach for Rx.
Once you've got yourself an Observable of tweets, you could do all kinds of stuff on it like time windows, filtering, grouping by hashtags, selecting related information from other sources, etc.
What's your plan for using the twitter connectivity? Or it's just to try it out?
Hey Niall,
I just posted a follow-up showing what I'm doing – it's not real-time so didn't use RX.
There are loads of cool things you could do with RX and the Twitter API, I was thinking of real-time correlations between hashtags (could be interesting performance work – investigate GPGPU etc)…