Oscilloscope using RX and C# Async CTP

In my last blog post I described the implementation of a simple ‘oscilloscope app’ in F#, as I wanted to see how the code would be structured using only F# idioms (http://taumuon-jabuka.blogspot.com/2012/01/visualising-sound.html )

My natural instinct would have been to implement it using the Reactive Extensions for .NET (RX Framework), but I first wanted to investigate a pure F# solution. This post describes an alternate RX implementation.

Similar to my last post, I’ll describe the code inside-out.

Creating the Observable

This code returns an IObservable with a float array payload, representing each read from the CaptureBuffer. The implementation could have internally started a dedicated Thread from which to push out values, but instead I’m using the new C# 5 async functionality (using the Async CTP Update 3), so that my code looks pretty similar to the previous F# example.

The observable takes a CancellationToken, whose IsCancellationRequested is set to true once all subscriptions to the observable have been disposed. The CompositeDisposable returned out from the lambda is also disposed of at that point.

The code loops while its CancellationToken has not been cancelled, asynchronously awaiting for one of the WaitHandles to be set, and then it reads from the buffer. The value is pushed out to subscribers in the OnNext.

The ConvertByteArrayToFloatArray() is trivial:

Subscriptions to the observable

First off, the float array returned from the observable is decomposed into the individual values, using a SelectMany, as sampling, buffering and windowing operations all operate on a stream of floats. Then, the observable is published to an IConnectableObservable. The microphone access returns a Cold observable, meaning that each subscriber to it would end up creating their own microphone access. This would work (the CaptureBuffer doesn’t prevent this), but the connectable observable means that instead all clients share the same observable, and ensures that they all see the same values (so that the traces on the two charts are in sync).

The RefCount() means that when all subscriptions to the observable IConnectableObservable variable have been disposed of then the subscription to the underlying observable will also be disposed.

The top ‘oscilloscope’ trace is a simple Observable.Buffer() over the data stream. There is no need to ObserveOn the dispatcher thread as the subscription occurs on the UI thread. Using RX, it would be easy to schedule various parts of the work onto different threads, but I’ll discuss this in a later article (I want to keep everything on the UI thread for now to compare performance with the single threaded F# implementation).

All subscriptions are added to a CompositeDisposable member in the ViewModel – the Stop button’s command implementation disposes of this, which causes all subscriptions to be disposed of, and so the microphone access loop to be terminated via its CancellationToken.

The windowing operation simply samples every 100 data points, and from those sampled data points takes a sliding window of 1000 values to display. The windowCount variable is closed over to allow the y axis to be continually updated.

The Sample operator is simple, but not particularly efficient – it takes a buffer (i.e a non-overlapping window) of count values, and then takes the last value in the buffer.

The WindowWithCount operator is the same one I discussed at http://taumuon-jabuka.blogspot.com/2011/07/rx-framework-performance-awareness.html (with implementation grabbed from http://social.msdn.microsoft.com/Forums/en-US/rx/thread/37428f58-f241-45b3-a878-c1627deb9ac4#bcdc7b79-bbde-4145-88e4-583685285682 )

And as I also talked about in that post, the RX guidelines recommend implementing an operator in terms of existing operator. There’s only one problem in this case, it’s quite slow, (I’ll get quantitative figures on this in a future blog post discussing performance of all approaches).

The following is faster (again, I know more specific figures are needed):

Comparing implementations

For me, the RX solution is cleaner than the F# solution, and easier to follow. I did implement my F# solution in quite an imperative way though, and should have perhaps used AsyncSeq or mailbox processors, but as reading from the microphone is a push-based activity, none of those solutions would be as clean as RX (of course I haven’t covered using RX in F#). The F# version is much faster, and I’ll take a big more of a dig into performance in an upcoming blog post.

RX Framework Performance Awareness

In a post a while ago, here, I implemented a DifferentiateWithTime operator, which was implemented in terms of other RX operators, and at the end I closed out by saying “It does say in section 6.1 of the RX Design Guidelines that new operators should be composed of existing operators, unless performance is a concern – this is something I may get around to investigating in a future blog post.” and thought I’d finally get around to looking at this.

As a recap, the differentiate operator was written using a sliding window operator (updated to use RX 1.1 experimental):

        public static IObservable<double> DifferentiateWithTime            (this IObservable<double> obs)        {            return (from j in obs.SlidingWindow(2)                     select j[1] - j[0]);        }

// from http://social.msdn.microsoft.com/Forums/en-US/rx/thread/37428f58-f241-45b3-a878-c1627deb9ac4#bcdc7b79-bbde-4145-88e4-583685285682        public static IObservable<IList<TSource>>             SlidingWindow<TSource>(this IObservable<TSource> source,            int count)        {            Contract.Requires(source != null);            Contract.Requires(count >= 0);            return source.Publish(published =>                from x in published                from buffer in published.StartWith(x).Buffer(count).Take(1)                where buffer.Count == count                select buffer            );        }
I created a simple test fixture to exercise this.
const double accelerationGravity = 9.81;            var count = 10000.0;            var positions = Observable.Generate(0.0,                i => i < count,                i => i + 1.0,                i => accelerationGravity * i * i / 2.0);            var stopwatch = new Stopwatch();            stopwatch.Start();            var velocity = positions.DifferentiateWithTime();            var sumVelocity = 0.0;            using (velocity.Subscribe(i =>                            { sumVelocity += i; })) { };            var acceleration = velocity.DifferentiateWithTime();            var sumAcceleration = 0.0;            using (acceleration.Subscribe(i =>                                { sumAcceleration += i; })) { };            stopwatch.Stop();            Console.WriteLine("sumVel:{0} sumAcc:{1} time:{2}",                 sumVelocity,                sumAcceleration,                stopwatch.ElapsedMilliseconds);

Monitoring in Perfmon, I saw that during the 10000 iterations there were 170 Generation 0 garbage collections, which does seem quite overkill. The total time was 6580 milliseconds.
I then implemented the DifferentiateWithTime operator directly:
public static IObservable<double> DifferentiateWithTime            (this IObservable<double> obs)        {            return Observable.Create<double>(o =>            {                double previousValue = 0.0;                var initialized = false;                return obs.Subscribe(i =>                {                    if (initialized)                    {                        o.OnNext(i - previousValue);                    }                    else                    {                        initialized = true;                    }                    previousValue = i;                });            });        }

This time there were only 4 Generation 0 garbage collections during the test run, and even more excitingly, the execution time was 379 milliseconds. A 17 time speedup is quite impressive, and shows that it’s worth being careful!
It’s likely that other operations based on sliding windows (rolling averages, VWAP etc) may have similar issues, and may benefit from similar observations. Instead of forcing the user to manually make these changes, it may be possible to do this in a more automated way (I’ve got some ideas I’ll play with when I get time.)



F# Units of Measure with the Reactive Framework

My last blog post dealt with manipulating incoming streams of data, such of position data, composing those streams, and manipulating them, and saw how well RX handles these operations.

As I was doing this, I felt that this would benefit from the extra safety that you can get from F#’s Units of Measure feature.

We can implement an operation to find the separation between two positions (the same as in my previous C# post):

open Systemopen System.Linqopen System.Threadingopen System.Collections.Genericopen Microsoft.FSharp.Mathopen Microsoft.FSharp.Linqopen Microsoft.FSharp.Linq.Querymodule ObservableEx =  type System.IObservable<'u> with    member this.WindowWithCount(count:int)      = this.Publish(Func<_,_>(fun (p:IObservable<'u>) ->         p.SelectMany(          Func<_,_>(fun x -> p.StartWith(x).BufferWithCount(count).Take(1)),          Func<_,_,_>(fun x buffer -> buffer)).Where(            Func<IList<'u>,_>(fun x -> x.Count = count)).Select(              Func<_,_> (fun x -> x))));open ObservableExlet accelerationGravity = 9.81;let positions = Observable.Generate(0.0,                  Func<_,_>(fun i -> i < 10.0),                  Func<_,_>(fun i -> i + 1.0),                  Func<_,_>(fun i -> accelerationGravity * i * i / 2.0)); let positions2 = Observable.Generate(0.0,                  Func<_,_>(fun i -> i < 10.0),                  Func<_,_>(fun i -> i + 1.0),                  Func<_,_>(fun i -> i)); let separation = Observable.Zip(positions,                   positions2, Func<_,_,_>(fun i j -> j - i)); let res = separation.Subscribe(fun i -> i |> printfn "%f");

But let’s see what happens if we accidentally do something physically meaningless such as adding a velocity to a position:

let accelerationGravity = 9.81; let positions = Observable.Generate(                  0.0,                  Func<_,_>(fun i -> i < 10.0),                  Func<_,_>(fun i -> i + 1.0),                  Func<_,_>(fun i -> accelerationGravity * i * i / 2.0)); let positions2 = Observable.Generate(0.0,                   Func<_,_>(fun i -> i < 10.0),                   Func<_,_>(fun i -> i + 1.0),                   Func<_,_>(fun i -> i)); let DifferentiateWithTime (input: IObservable<'a>) =  input.WindowWithCount(2).Select(fun (j:IList<'a>) -> (j.[1]-j.[0])); let velocities = DifferentiateWithTime(positions); let separation = Observable.Zip(positions, velocities, Func<_,_,_>(fun i j -> j - i)); let res = separation.Subscribe(fun i -> i |> printfn "%f");

The program compiles and runs normally (as we’d expect, the compiler doesn’t know better than the fact that it’s dealing with some float values).

Now, let’s annotate our code with units of measure (I’m using the F# PowerPack). We can calculate the difference between two positions:

let accelerationGravity = 9.81<SI.m SI.s^-2> let positions = Observable.Generate(0.0<SI.s>, Func<_,_>(fun i -> i < 10.0<SI.s>),                   Func<_,_>(fun i -> i + 1.0<SI.s>),                  Func<_,_>(fun i -> accelerationGravity * i * i / 2.0)); let positions2 = Observable.Generate(0.0<SI.s>,                  Func<_,_>(fun i -> i < 10.0<SI.s>),                  Func<_,_>(fun i -> i + 1.0<SI.s>),                  Func<_,_>(fun i -> 5.0 * accelerationGravity * i * i / 2.0)); let DifferentiateWithTime (input: IObservable<float<_>>) =   input.WindowWithCount(2).Select(    fun (j:IList<float<_>>) -> (((j.[1]-j.[0])/1.0<SI.s>))); let velocities = DifferentiateWithTime(positions); let accelerations = DifferentiateWithTime(velocities); let separation = Observable.Zip(positions, positions2, Func<_,_,_>(fun i j -> j - i)); let res = separation.Subscribe(fun i -> float i |> printfn "%f"); // Next line will not compile //let wrongseparation = Observable.Zip(//                        accelerations, velocities, Func<_,_,_>(fun i j -> j - i));

But if we try instead to calculate the difference between the position and the velocity, the code will no longer compile. This is very cool.

We can also do the same by annotating our IObservables with units:

let accelerationGravity = 9.81<SI.m SI.s^-2> let DifferentiateWithTime (input: IObservable<float<_>>) =   input.WindowWithCount(2).Select(    fun (j:IList<float<_>>) -> (((j.[1]-j.[0])/1.0<SI.s>)));let positions = Observable.Generate(0.0<SI.s>,                  Func<_,_>(fun i -> i < 10.0<SI.s>),                   Func<_,_>(fun i -> i + 1.0<SI.s>),                  Func<_,_>(fun i -> accelerationGravity * i * i / 2.0)); let velocities = DifferentiateWithTime(positions); let accelerations = DifferentiateWithTime(velocities); let res = positions.Subscribe(fun i -> float i |> printfn "%f"); printfn "-- velocities -- ";let res2 = velocities.Subscribe(fun i -> float i |> printfn "%f");printf "-- accelerations -- "; let res3 = accelerations.Subscribe(fun i -> float i |> printfn "%f");

I love this feature, and can see that it would be incredibly useful with RX, as RX statements can include all sorts of streams of data into a complex operation.

(I was originally mistaken in the original version of this posting, and thought that I couldn’t create the DifferentiateWithTime method to be generic to the units of measure, but was saved by a posting on stackoverflow, here).

RX Framework for manipulating real-life data

As I said in my previous blog post, in one of my previous roles involved performing real-time analysis of incoming sensor and telemetry data, combining streams of data of different data rates, allowing the user to write their own calculation operations on the data, and displaying it to the user. I wondered what this would have been like to implement using RX.

As an aside, it may be that the new TPL Dataflow library would be more suitable for this – and I am a bit confused about the overlap between some many different technologies, such as RX and the forthcoming C# 5.0 async, or whether TPL Dataflow replaces the Actor model in Microsoft Axum or F#).

Anyway, back with the plot…

Imagine that you have to handle incoming streams of sensor data. This doesn’t have to be hardware sensor data, it could be a phone’s GPS or accelerometer data, or incoming streams of trade data. In the old style of programming there would be a callback or event handler that would be triggered when the data arrived, and to perform aggregate calculations over multiple data streams would involve convoluted code with locks – with RX there’s none of that, just a simple linq query to combine the data:

Imagine that you had two streams of position data (here the observables are generated, but they could be observables from events, the usage is identical, which is beautiful!)

var positions = Observable.Generate(0.0, i => i < 10.0, i => i + 1.0, i => i);

var positions2 = Observable.Generate(10.0, i => i >= 0.0, i => i – 1.0, i => i);

An operator to calculate the separation is as simple as:

var separation = positions.Zip(positions2, (i, j) => new { i, j }).Select(pos => Math.Abs(pos.j – pos.i));

That’s really nice and succinct, and the logic is all in one place (instead of being spread out amongst different callbacks).

As well as calculating the difference between two positions, you might want to find the rate of change of some input data:

static void Main(string[] args)

{

const double accelerationGravity = 9.81;

var positions = Observable.Generate(0.0, i => i < 10.0, i => i + 1.0, i => accelerationGravity * i * i / 2.0);

var velocity = positions.DifferentiateWithTime();

Console.WriteLine(“– velocities — “);

velocity.Subscribe(i => {

Console.WriteLine(i); });

var acceleration = velocity.DifferentiateWithTime();

Console.WriteLine(“– accelerations — “);

acceleration.Subscribe(i => {

Console.WriteLine(i); });

}

}

Again, this is nice. The acceleration is correctly written out for every value as 9.81 (well, within rounding errors).

DifferentiateWithTime is implemented as:

public static partial class ObservableEx2

{

public static IObservable<double> DifferentiateWithTime(this IObservable<double> obs)

{

return (from j in obs.WindowWithCount(2) select j[1] – j[0]);

}

..

In this case we’re taking the current and previous value and taking the difference between them – this of course is assuming that we are receiving values once per second. We could include the time of each sample in a Tuple, and use the Where operator to ignore values which occur at the same time period (to avoid divide-by-zero errors).

WindowWithCount I found on StackOverflow. This takes a rolling window of values around an observables value, which can be used for operations such as a moving average.

public static IObservable<IList<TSource>> WindowWithCount<TSource>(this IObservable<TSource> source, int count)

{

Contract.Requires(source != null);

Contract.Requires(count >= 0);

return source.Publish(published =>

from x in published

from buffer in published.StartWith(x).BufferWithCount(count).Take(1)

where buffer.Count == count

select buffer

);

}

This is cool, but I am concerned that we’re returning a new list for each sample, even though we’re only interested in two values, this could be hammering the garbage collector for high rates of data. It does say in section 6.1 of the RX Design Guidelines that new operators should be composed of existing operators, unless performance is a concern – this is something I may get around to investigating in a future blog post.

Reactive Framework thoughts

I have been very slack with blogging recently, as I was too busy with work and life. I’ve recently moved jobs, and even though I’m just as busy as I was in my previous job, I have a long train commute so I thought that I may as well do something constructive with my time. I figured that blogging is as good as anything else.

In my new role, I’m working on a project that makes heavy use of the the RX Framework, and I’m getting up to speed as fast as I can. It’s actually really fun – the way to approach a problem is different to the old method I’m used-to of having async callbacks everywhere, it’s much more functional, and at the start I felt a little bit like my brain had been turned inside out.

The project is using RX for handling the streaming of trading data, and for orchestrating asynchronous operations. The code base is a lot more succinct, understandable and elegant, than it would be without RX (though I’m finding it takes more time to think through how to write the RX operations). There are no .NET events throughout the codebase, all events are exposed as IObservables, allowing for composition with all sorts of async operations.

I’ve come across the following blog post, where the author was performing calculations on trade data, and that sprung to mind how in a previous job I was manipulating streams of data from hardware devices, and I thought I might go ahead and have a bit of a play with this.

http://eprystupa.wordpress.com/2009/12/20/running-vwap-calculation-using-reactive-extensions-for-net/

I’ll play with this in my next blog post(s).