A story about how Rx Framework saved my life for high frequency event processing
Sometimes, I learn about a technology that looks somewhat cool…
But I can’t find why. I know there is an idea, a spark, a sort of genius that I don’t understand quite yet, it’s here I see it, but out of my reach for now.
In this case, the technology was Rx Framework (also called Reactive Framework).
–Leap Motion I love you too, don’t be jealous
After reading the excellent Intro To Rx website, my reaction was : “That’s cool, but why changing from event-driven paradigm to reactive paradigm when the former worked for me since ages ?”
So I start reading article on Rx and got very upset about one thing : There were so many 101 articles teaching how to do basic stuff with RX but always, event-driven approach were better and easier, no article gave me a clear reason about why using reactive framework were easier than event-driven.
The only scenario mentioned was trading quotes… But I don’t work in trading so why should I care ?
Then I got an idea of project in WPF on the Leap Motion (link on GitHub), with some interesting metrics on data throughput by the leap motion:
- 30 to 100 frames to process per second
- 1-5 fingers per frame (30 – 500 finger per second)
- 0-1 gesture per frame
- 1-2 hand per frame (30 – 200 hand per second)
Now, that’s what I call high frequency events… Finally I can try Rx Framework for real… and results are great ! It saved my life.
This article is not an Intro To Rx, and in fact, you don’t need to understand Rx to understand my article. This article is a real, practical case on how RX made my day better, an experience that I share with you. Feel free to generalize or contextualize to your own life, and dig in RX tutorials or numerous excellent channel9 videos.
Event stream, Event processor
Rx Framework introduces two types : Observable<T> that represent something that output events, were T is the type of your event.
And Observer<T>, that is someone that will observe/process these events.
I don’t really like to talk about Observer/Observable, because it reminds me pre-event driven programming when I was a poor Java guy… If C# invented events, it is not to go back to stone age years later.
I will using the term Observable<T>, I will use the term Event Stream or Event Source instead.
And instead of Observer<T>, I will use Event Processor.
This make me closer to the truth vocabulary of the Event Sourcing design pattern.
It makes so more sense to say “Throttle your event stream for 10 seconds”, that it is to say “Throttle your observable for 10 seconds”.
It makes more sense to “filter a stream” than “filtering an observable”.
I agree that for AI, it is semantically the same, but as human it is easy to picture yourself throttling and filtering a water stream than doing it to an “observable”.
What you will imagine, you will understand.
Turning Leap Motion data into event streams
The first goal is to use leap motion data with RX to get FPS and fingers count in my debug window:
The first step was to implement leap motion’s base class Listener
Leap motion’s developers only implement the OnFrame, it is called when a new frame from the leap motion is coming.
OnFrame is called 30 to 100 times per second on a secondary thread.
My goal was to access these frames as an event stream is a class called ReactiveListener.
For that you can use rx’s Subject<T> class, this inherit from Observable<T> and you can push events T inside.
As you can see, the OnFrame was not very complicated
public override void OnFrame(Controller arg0)
Now I can get frames as observable :
public IObservable<Frame> Frames()
From here, I could finally calculate the FPS (Frame Per Second) with this nice expression in my ViewModel.
var seconds = (o.Timestamp - o.Timestamp).TotalSeconds;
FPS = (int)(1.0 / seconds);
return the frame annotated with its timestamp,
tells that I’m interesting in taking 2 frame at a time. (Transform IObservable<Frame>
ObserveOn specify that the event processor (which is defined by Subscribe) should be executed on the SynchronizationContext of the UI Thread to update the UI.
Subscribe update the FPS property.
As you can see in the gif video at the beginning of this part, the Debug window shows how many fingers Leap Motion is detecting.
How is it implemented ?
But how do I substract Fingers ? This is a little bit trickier.
Here is the code, I explain you what is going on right after :
.Select(c => c
}, () =>
To understand this code you need to understand what is a single FingerMove:
public IObservable<IGroupedObservable<int, Finger>> FingersMoves()
.SelectMany(f => f.Fingers)
.GroupByUntil(f => f.Id,
g => g.ThrottleWithDefault(TimeSpan.FromMilliseconds(300)) );
So what does it means ?
First I start from the frame stream and extract every fingers in each frame with SelectMany.
Then I group them by Id, so I can get finger data stream for each of your finger.
Here is a simple representation of streams, also called Marble Diagram (time is the horizontal axis):
The second parameter of GroupByUntil return an event stream (called the “duration”), when the duration output an event, the group is considered “closed”. If fingers with the same id are coming after, GroupByUntil create a new group.
In this case, the duration output a value when its group stop receiving events for 300 ms.
So let’s go back to the expression to decrement the finger count:
.Select(c => c
}, () =>
I subscribe to every finger group that the groups stream outputs.
Then for each finger I see in this group, I do nothing.
The second parameter of Subscribe specify that when the group's stream is closed, then I decrement FingerCount.
It was very handy to represent fingers as stream, and grouping
them in moves.
The leap motion sometimes misses a finger for 1 or 2 frame ( for something like 5 ms), it would be stupid to consider that since the finger was missing for only 2 frames, then the user want to make a new move.
300 ms is a timespan that an unalarmed person would not notice, and your can be sure that the leap motion will not miss a finger for 300ms by mistake, this make the perfect balance.
Plugging your ViewModel with your event stream
So before starting, what is GestSpace ?
GestSpace is a leap motion application to control your computer with your hand.
When GestSpace see your hand, it shows a customizable control panel.
A control panel is made of customizable tiles. (In fact, most of the tiles are configurable keyboard shortcuts associated with a gesture)
To use a Tile, you need to select it with your finger (Image 1)
Then lock it with your hand (Image 2)
Then move your hand (Image 3)
The gesture of your hand depends on the type of tile you are on. Image 3 shows how to modify volume.
So there is 3 state in this application:
- Control panel hidden
- Control panel visible and navigation
- Control panel visible and tile locked
This is expressed with the enumeration MainViewState as you can see in this class diagram.
Another point you can notice is that the ViewModel does not reference the previous ReactiveListener directly, but ReactiveSpace.
The difference between ReactiveListener and ReactiveSpace is that ReactiveSpace have expose streams highly coupled to my project whereas ReactiveListener can be used in other future projects.
s IsLocked, a boolean stream, that sends a value every times we should lock or unlock the tile.
So here is what happen in my MainViewModel.
if(this.State != MainViewState.Minimized)
State = locked ? MainViewState.Locked : MainViewState.Navigating;
How do I create the IsLocked stream ? Very easy.
- If there is one hand with 4 fingers, I am locked
- If no hand with 4 fingers appeared in the last 200 ms, I am unlocked.
- New subscriber to this stream should get pushed the last received boolean value
public IObservable<bool> IsLocked()
if(_IsLocked == null)
var handIsPresent = listener
.SelectMany(f => f.Hands)
.Where(h => h.Fingers.Count >= 4)
.Select(s => true); var handIsAbsent = handIsPresent
var handPresence =
.Merge(handIsAbsent.Select(o => false)) /Step 3
.DistinctUntilChanged() .Replay(1); handPresence.Connect(); _IsLocked = handPresence;
Step 1 – I create a stream of True boolean when there is a frame with a hand with 4 fingers. (handIsPresent)
Step 2 – I create a stream of True boolean when there is no value outputted from the first stream for 200 ms (Throttle with handIsAsbent)
Step 3 – I merge these two streams, and transform Trues of the handIsAbsent to False.
Step 4 – I keep only distinct consecutive values (the output stream can’t send two false or two true in a row)
Step 5 – I create a stream that will replay the last value of this stream when next subscribers will subscribe.
When I said it is easy, I should say instead : It is easy once you got your head wrapped in the problem for some hours… but now RX Framework expression are totally natural to me.
Now, how do I manage to control volume with the hand,
Each tile is associated with a class that inherit PresenterViewModel.These classes are responsible to subscribe to the ReactiveSpace when they are locked, and change the value of their property from event streams.
The PresenterViewModel for the volume is called ValuePresenterViewModel, currently it is only used for volume, you can’t bind keyboard shortcut on it.
On the WPF side, ValuePresenterViewModel is a styled progress bar, so the exposed properties should not surprise you.
But, look at closely the signature of the last constructor of ValuePresenterViewModel.
It waits for an event stream.
The constructor just listens for getValue, and update the Value property.
public ValuePresenterViewModel(double minValue, double maxValue, IObservable<double> getValue, Action<double> setValue)
this.setValue = setValue;
this.getValue = getValue;
this.MinValue = minValue;
this.MaxValue = maxValue;
this._Subscription = getValue
_Value = Normalize(d);
OnPropertyChanged(() => this.Value);
private double Normalize(double value)
value = Math.Min(MaxValue, value);
value = Math.Max(MinValue, value);
Then Volume tile is just a matter of passing the right observable.
I used the excellent CoreAudioAPI.
MMDeviceEnumerator devEnum = new MMDeviceEnumerator();
MMDevice defaultDevice = devEnum.GetDefaultAudioEndpoint(EDataFlow.eRender, ERole.eMultimedia);
return new ValuePresenterViewModel(
setValue: (v) => defaultDevice.AudioEndpointVolume.MasterVolumeLevelScalar = (float)(v / 100.0),
o => defaultDevice.AudioEndpointVolume.OnVolumeNotification += o,
o => defaultDevice.AudioEndpointVolume.OnVolumeNotification -= o)
.Select(v=>(double)(v * 100.0)));
You can see that I converted a traditional event (object sender + EventArgs) to an event stream with Observable.FromEvent.
I use merge with an event stream that only return the current volume, so, when the presenter will subscribe, it will directly get the current volume, without waiting the next OnVolumeNotification to fire.
Ok… so now let’s look another example : the CyclePresenterViewModel that you can use for scrolling for example.
Contrary to ValuePresenterViewModel, you can script what the circle is doing when you draw a circle with your hand.
In this screenshot, I configure the Circle to simulate keyboard UP arrow 5 times when I go counter clock wise, and 5 times DOWN when I go ClockWise.
This is a mini language with 3 commands : PRESS, UP and DOWN, and each can take multiple arguments, there is a small intellisense to help you.
I also could have written : PRESS UP,UP,UP,UP,UP
The keyboard simulation was made possible by the excellent library InputSimulator.
Then, as the GIF image show you, I just need to turn my hand in the right direction to fire these keyboard shortcut.
The code of the subscription of the CyclePresenterViewModel is stupid, once again thanks to RX Framework.
I project the direction of one finger on the tangente of the circle at the current rotation point, and then, update the rotation. (In the setter of rotation, there is logic to decide when to fire the keyboard shortcut)
protected override IDisposable SubscribeCore(ReactiveSpace spaceListener)
var v = c.TipVelocity.To2D();
var cos = Math.Cos(Helper.DegreeeToRadian(Rotation));
var sin = Math.Sin(Helper.DegreeeToRadian(Rotation));
var tan = new Vector((float)sin, (float)cos, 0);
var man = tan.Dot(v);
Rotation -= man / 100;
Remember that FingerMoves are fingers grouped by ID, Concat
takes the first group it find, and output its content. When the group is closed, it takes the next one.
Turning whatever you want into event streams
I already show you how to turn an event into an event stream, or data from the leap to event stream.
But this is not limited to that.
With GestSpace, you can configure to go automatically to a tile when the foreground program change.
In the screenshot below, I specify that I want to go to Browser control when iexplore, firefox or chrome is on the front.
This way you can contextualize your gesture to what you are doing on your computer.
Here an example, I switch window, then I’m on codeproject, ready to scroll up/down, and go to previous/next page.
In the screenshot just before you see that I give a tip to the user about the current foreground program. How ?
By polling, with an event stream !
_ProgListener = new ForegroundProgramListener();
if(pid != _CurrentPid)
using(var p = Process.GetProcessById(pid))
CurrentProgram = p.ProcessName;
var tile = Tiles.FirstOrDefault(t => t.BelongsToFastContext(p.ProcessName));
if(tile != null)
CurrentTile = tile;
And here is the code of the ForegroundProcess.
var hwnd = user32.GetForegroundWindow();
uint pid = 0;
user32.GetWindowThreadProcessId(hwnd, out pid);
Observable.Interval is an event stream that output every 800 ms, then I fetch the pid of the foreground window, then I output only values that changed compared to the previous one, and I am done.
RX Framework made my life easier.
It is a new way a thinking about events processing, it is cool for simple use case, but indispensable for handling high frequency events.
It takes time to wrap your head around the concept, I advice you to read Intro To Rx and check videos on channel9, it is worth it.
I got a bug in the framework for the GroupByUntil I documented here, except that point it was very cool.
The sources are on GitHub, feel free to experiment if you have your leap motion. :)