My last blog post dealt with manipulating incoming streams of data, such of position data, composing those streams, and manipulating them, and saw how well RX handles these operations.
As I was doing this, I felt that this would benefit from the extra safety that you can get from F#’s Units of Measure feature.
We can implement an operation to find the separation between two positions (the same as in my previous C# post):
open System
open System.Linq
open System.Threading
open System.Collections.Generic
open Microsoft.FSharp.Math
open Microsoft.FSharp.Linq
open Microsoft.FSharp.Linq.Query
module ObservableEx = type System.IObservable<'u> with
member this.WindowWithCount(count:int) = this.Publish(Func<_,_>(fun (p:IObservable<'u>) ->
p.SelectMany(
Func<_,_>(fun x -> p.StartWith(x).BufferWithCount(count).Take(1)),
Func<_,_,_>(fun x buffer -> buffer)).Where(
Func<IList<'u>,_>(fun x -> x.Count = count)).Select(
Func<_,_> (fun x -> x))));
open ObservableEx
let accelerationGravity = 9.81;
let positions = Observable.Generate(0.0,
Func<_,_>(fun i -> i < 10.0),
Func<_,_>(fun i -> i + 1.0),
Func<_,_>(fun i -> accelerationGravity * i * i / 2.0));
let positions2 = Observable.Generate(0.0,
Func<_,_>(fun i -> i < 10.0),
Func<_,_>(fun i -> i + 1.0),
Func<_,_>(fun i -> i));
let separation = Observable.Zip(positions,
positions2, Func<_,_,_>(fun i j -> j - i));
let res = separation.Subscribe(fun i -> i |> printfn "%f");
But let’s see what happens if we accidentally do something physically meaningless such as adding a velocity to a position:
let accelerationGravity = 9.81;
let positions = Observable.Generate( 0.0,
Func<_,_>(fun i -> i < 10.0),
Func<_,_>(fun i -> i + 1.0),
Func<_,_>(fun i -> accelerationGravity * i * i / 2.0));
let positions2 = Observable.Generate(0.0,
Func<_,_>(fun i -> i < 10.0),
Func<_,_>(fun i -> i + 1.0),
Func<_,_>(fun i -> i));
let DifferentiateWithTime (input: IObservable<'a>) =
input.WindowWithCount(2).Select(fun (j:IList<'a>) -> (j.[1]-j.[0]));
let velocities = DifferentiateWithTime(positions);
let separation = Observable.Zip(positions, velocities, Func<_,_,_>(fun i j -> j - i));
let res = separation.Subscribe(fun i -> i |> printfn "%f");
The program compiles and runs normally (as we’d expect, the compiler doesn’t know better than the fact that it’s dealing with some float values).
Now, let’s annotate our code with units of measure (I’m using the F# PowerPack). We can calculate the difference between two positions:
let accelerationGravity = 9.81<SI.m SI.s^-2>
let positions = Observable.Generate(0.0<SI.s>, Func<_,_>(fun i -> i < 10.0<SI.s>),
Func<_,_>(fun i -> i + 1.0<SI.s>),
Func<_,_>(fun i -> accelerationGravity * i * i / 2.0));
let positions2 = Observable.Generate(0.0<SI.s>,
Func<_,_>(fun i -> i < 10.0<SI.s>),
Func<_,_>(fun i -> i + 1.0<SI.s>),
Func<_,_>(fun i -> 5.0 * accelerationGravity * i * i / 2.0));
let DifferentiateWithTime (input: IObservable<float<_>>) =
input.WindowWithCount(2).Select(
fun (j:IList<float<_>>) -> (((j.[1]-j.[0])/1.0<SI.s>)));
let velocities = DifferentiateWithTime(positions);
let accelerations = DifferentiateWithTime(velocities);
let separation = Observable.Zip(positions, positions2, Func<_,_,_>(fun i j -> j - i));
let res = separation.Subscribe(fun i -> float i |> printfn "%f");
// Next line will not compile
//let wrongseparation = Observable.Zip(
// accelerations, velocities, Func<_,_,_>(fun i j -> j - i));
But if we try instead to calculate the difference between the position and the velocity, the code will no longer compile. This is very cool.
We can also do the same by annotating our IObservables with units:
let accelerationGravity = 9.81<SI.m SI.s^-2>
let DifferentiateWithTime (input: IObservable<float<_>>) =
input.WindowWithCount(2).Select(
fun (j:IList<float<_>>) -> (((j.[1]-j.[0])/1.0<SI.s>)));
let positions = Observable.Generate(0.0<SI.s>,
Func<_,_>(fun i -> i < 10.0<SI.s>),
Func<_,_>(fun i -> i + 1.0<SI.s>),
Func<_,_>(fun i -> accelerationGravity * i * i / 2.0));
let velocities = DifferentiateWithTime(positions);
let accelerations = DifferentiateWithTime(velocities);
let res = positions.Subscribe(fun i -> float i |> printfn "%f");
printfn "-- velocities -- ";
let res2 = velocities.Subscribe(fun i -> float i |> printfn "%f");
printf "-- accelerations -- ";
let res3 = accelerations.Subscribe(fun i -> float i |> printfn "%f");
I love this feature, and can see that it would be incredibly useful with RX, as RX statements can include all sorts of streams of data into a complex operation.
(I was originally mistaken in the original version of this posting, and thought that I couldn’t create the DifferentiateWithTime method to be generic to the units of measure, but was saved by a posting on stackoverflow, here).