In my last blog post I described the implementation of a simple ‘oscilloscope app’ in F#, as I wanted to see how the code would be structured using only F# idioms (http://taumuon-jabuka.blogspot.com/2012/01/visualising-sound.html )
My natural instinct would have been to implement it using the Reactive Extensions for .NET (RX Framework), but I first wanted to investigate a pure F# solution. This post describes an alternate RX implementation.
Similar to my last post, I’ll describe the code inside-out.
Creating the Observable
This code returns an IObservable with a float array payload, representing each read from the CaptureBuffer. The implementation could have internally started a dedicated Thread from which to push out values, but instead I’m using the new C# 5 async functionality (using the Async CTP Update 3), so that my code looks pretty similar to the previous F# example.
The observable takes a CancellationToken, whose IsCancellationRequested is set to true once all subscriptions to the observable have been disposed. The CompositeDisposable returned out from the lambda is also disposed of at that point.
The code loops while its CancellationToken has not been cancelled, asynchronously awaiting for one of the WaitHandles to be set, and then it reads from the buffer. The value is pushed out to subscribers in the OnNext.
The ConvertByteArrayToFloatArray() is trivial:
Subscriptions to the observable
First off, the float array returned from the observable is decomposed into the individual values, using a SelectMany, as sampling, buffering and windowing operations all operate on a stream of floats. Then, the observable is published to an IConnectableObservable. The microphone access returns a Cold observable, meaning that each subscriber to it would end up creating their own microphone access. This would work (the CaptureBuffer doesn’t prevent this), but the connectable observable means that instead all clients share the same observable, and ensures that they all see the same values (so that the traces on the two charts are in sync).
The RefCount() means that when all subscriptions to the observable IConnectableObservable variable have been disposed of then the subscription to the underlying observable will also be disposed.
The top ‘oscilloscope’ trace is a simple Observable.Buffer() over the data stream. There is no need to ObserveOn the dispatcher thread as the subscription occurs on the UI thread. Using RX, it would be easy to schedule various parts of the work onto different threads, but I’ll discuss this in a later article (I want to keep everything on the UI thread for now to compare performance with the single threaded F# implementation).
All subscriptions are added to a CompositeDisposable member in the ViewModel – the Stop button’s command implementation disposes of this, which causes all subscriptions to be disposed of, and so the microphone access loop to be terminated via its CancellationToken.
The windowing operation simply samples every 100 data points, and from those sampled data points takes a sliding window of 1000 values to display. The windowCount variable is closed over to allow the y axis to be continually updated.
The Sample operator is simple, but not particularly efficient – it takes a buffer (i.e a non-overlapping window) of count values, and then takes the last value in the buffer.
The WindowWithCount operator is the same one I discussed at http://taumuon-jabuka.blogspot.com/2011/07/rx-framework-performance-awareness.html (with implementation grabbed from http://social.msdn.microsoft.com/Forums/en-US/rx/thread/37428f58-f241-45b3-a878-c1627deb9ac4#bcdc7b79-bbde-4145-88e4-583685285682 )
And as I also talked about in that post, the RX guidelines recommend implementing an operator in terms of existing operator. There’s only one problem in this case, it’s quite slow, (I’ll get quantitative figures on this in a future blog post discussing performance of all approaches).
The following is faster (again, I know more specific figures are needed):
For me, the RX solution is cleaner than the F# solution, and easier to follow. I did implement my F# solution in quite an imperative way though, and should have perhaps used AsyncSeq or mailbox processors, but as reading from the microphone is a push-based activity, none of those solutions would be as clean as RX (of course I haven’t covered using RX in F#). The F# version is much faster, and I’ll take a big more of a dig into performance in an upcoming blog post.