I’ve been playing around with connecting to twitter’s streaming API, and displaying a live stream of tweets returned.
To do this, I was originally using DotNetOpenAuth, along with the HttpClient, which worked fine for the sample stream, but would return authentication errors for the filtered stream. I looked at the HTML message in fiddler2, and the oauth parameters weren’t ordered, which twitter requires. Instead, I’m using TwitterDoodle, which uses HttpClient.
The C#5/.NET4.5 async/await code is quite elegant – it’s not CPU intensive so it’s fine running on the UI thread, without blocking. My first instinct prior to C#5 would have been to use RX for this, but now if I’m doing something simple I’d stick to async/await, only using RX if doing something more complex like buffering or batching.
private async void ProcessTweets()
{
using (var t = new TwitterConnector())
{
var response = await t.GetSampleFirehoseConnection();
var res = await response.Content.ReadAsStreamAsync();
using (var streamReader = new StreamReader(res, Encoding.UTF8))
{
// streamReader.EndOfStream can block, so instead check for null
while (!cts.IsCancellationRequested)
{
var r = await streamReader.ReadLineAsync();
if (r == null) { return; }
ProcessTweetText(r);
}
}
}
}
private void ProcessTweetText(string r)
{
if (!string.IsNullOrEmpty(r))
{
var tweetJToken = JsonConvert.DeserializeObject<dynamic>(r);
var tweetObj = tweetJToken["text"];
if (tweetObj != null)
{
var tweetText = tweetObj.ToString();
viewModel.Items.Add(tweetText);
}
}
}
The equivalent F# async code obviously looks quite similar, with the added goodness of Type Providers. Time dependent, I am planning to do some more analysis of tweets which would be a good fit for F#.
type tweetProvider = JsonProvider<"SampleTweet.json", SampleList=true>
type MainWindowViewModel() =
inherit ViewModelBase()
let items = new ObservableCollection<string>()
member x.Items
with get () = items
member x.ProcessTweet tweet =
let tweetParsed = tweetProvider.Parse(tweet)
match tweetParsed.Text with
| Some(v) -> x.Items.Add v
| None -> ()
member x.ProcessTweets =
let a = async {
use t = new TwitterConnector()
let! response = t.GetSampleFirehoseConnection() |> Async.AwaitTask
let! result = response.Content.ReadAsStreamAsync() |> Async.AwaitTask
use streamReader = new StreamReader(result, Encoding.UTF8)
let rec processTweetsAsync (s:StreamReader) =
async {
let! r = s.ReadLineAsync() |> Async.AwaitTask
// streamReader.EndOfStream can block, so instead check for null
if r <> null then
x.ProcessTweet r
return! processTweetsAsync s
}
do! processTweetsAsync streamReader
}
a |> Async.StartImmediate
member x.GoCommand =
new RelayCommand ((fun canExecute -> true),
(fun action -> x.ProcessTweets))