The Rx Framework is a very interesting and useful library recently released via DevLabs. The purpose of the framework is to provide a large framework for working with the Observer pattern. That is a nice academic description, but it is hardly enough to start using it immediately. So let us dig a little deeper.
This article is to provide an introduction to the Rx Framework by example, not by description. I don't go into any detail about internal behaviour, but I hope to cover that in an up and coming article. I find when learning something completely new, working examples provide me with enough information to get started and put me in a position to learn more.
All of the information presented here is either summarised or derived from the excellent video and text blog posts on Channel 9. I highly recommend this as a learning resource for all developers out there.
For this article, it is assumed that the reader has a working knowledge of the Observer pattern.
This article's code was written against version 1.0.10621.0 of the Reactive Framework from NuGet.
It would also help to watch the Rx Framework introductory video on Channel 9.
The centrepiece of the Rx framework is the observable pattern, defined by the IObservable<T> interface. It provides just a single method to subscribe to the observable using an instance of an object implementing the
The interesting connect that the developers of the Rx Framework made is that
IObservable<T> is the mathematical dual to the
IEnumerable<T> interface. This is explained excellently in this video, but the summary is that
IEnumerable<T> pulls elements from a sequence, and
IObservable<T> pushes elements from a sequence.
What Can I Do With This?
The part I found the most challenging is what problems to solve with it. It seemed like a very large hammer looking for problems. While it doesn't solve problems that were unsolvable, I think it does offer new and interesting ways to approach existing problems, and sometimes solve them in an easier to write and easier to maintain way.
The framework provides built in threading support and control to let collections be observed in background threads.
Getting the Directory Tree
One well known task when working with disks is getting the list of directories in a tree in a UI application.
The code for this example is in the
static IEnumerable<string> GetAllDirectories(string path)
string subdirs = null;
subdirs = Directory.GetDirectories(path);
if (subdirs != null)
foreach (var subdir in subdirs)
yield return subdir;
foreach (var grandchild in GetAllDirectories(subdir))
yield return grandchild;
You can subscribe to this in an asynchronous fashion using:
var observable = Observable.ToObservable(GetAllDirectories(@"c:\"), Scheduler.ThreadPool);
outputDirectory being an
Action<string>. This delegate is called on a thread from the
ThreadPool, so that means it will not work directly when calling methods on Winforms objects. You can use the normal
Control.Invoke() approach to solving this, or alternatively you can do this:
var observable = Observable.ToObservable
This will ensure that all calls to
outputAction are made on the same thread that created
Now the downside to this is that a high frequency of individual calls to Winforms updates can cause a stuttering user interface. One solution to this is to buffer the
strings produced by the observer. The Reactive Framework can buffer in two ways, either by quantity, over time or both. Since frequency is the issue, we will be buffering using time.
var observable =
This will buffer until one second has elapsed, and then call
getDirectories, which is an
Now this worked great for me, until I was lucky enough for acquire an SSD drive. In the 1 second delay, I now retrieve around 16000 elements per second. This is much more than the
TreeView control can add and still maintain a smooth user interface.
At first glance, the solution would be to prove an element count of
Buffer(), but this just means the buffering will complete every
1000 elements, which is around 80 milliseconds, and that makes things even worse.
The solution is to combine two sequences, one that is a regular interval, and the other than is a buffer of
.Buffer(1000), (a, b) => b)
Zip() operator combines an element from each sequence in turn. So that means each 1000 entries is paired with an element produced every 1 second.
The last piece is stopping the reading of directories. Each time the
Subscribe method is called, it returns an
IDisposable that can be used to unsubscribe from the observer. We can use this to stop the reading, like this:
IDisposable observer = observable.Subscribe(outputDirectories)
User Interface Interaction
Many applications I work on have an interactive aspect to them, such as drawing or modifying a 2D view of objects. The most common tasks in this environment are selecting, drawing and modifying objects. The core part of these behaviours is mouse interaction, particularly combining mouse buttons and movement into dragging operations.
The challenging part of these is tracking mouse button and movement state so position deltas can be calculated across mouse events. While doing that work isn't challenging, the hard part is arriving at a clean design. One typically ends up with a number of boolean variables tracking if things are being dragged, were just dragging, if things have moved at least 3 pixels, and any number of other combinations.
While I have not used the reactive framework in a production environment to address these issues, it has some promise and is at the very least academically interesting, and does show some other aspects of the Rx framework.
The example showing these techniques is in the
DrawingApplication project. It is a very simple line drawing mini-application. It lets you draw and move lines on the screen.
The first step is to set up the events as
var movingEvents = Observable.FromEventPattern<MouseEventHandler,
MouseEventArgs>(h => this.MouseMove += h, h => this.MouseMove -= h);
var upEvents = Observable.FromEvent<MouseEventHandler,
MouseEventArgs>(h => this.MouseUp += h, h => this.MouseUp -= h);
var downEvents = Observable.FromEvent<MouseEventHandler,
MouseEventArgs>(h => this.MouseDown += h, h => this.MouseDown -= h);
This sets up the three
downEvents. Each time that mouse interaction occurs, any observers will be notified.
There are many things we can do from here.
Identifying Mouse Moved between Mouse Downs and Mouse Ups
var draggingEvents = movingEvents.WaitUntil(downEvents).Until(upEvents);
Now the trouble with this is that the
draggingEvents observable is a once-off. That means as soon as the mouse button is released, the observable will no longer produce elements. The solution for this is to use the
Repeat() extension method.
var draggingEvents =
Picking Out Left Mouse Clicks
var leftClicks = from e in downEvents
where e.EventArgs.Button == MouseButtons.Left
This also demonstrates that you can use LINQ query syntax to work with observables.
Pairing Up Dragged Mouse Events
This is a common requirement when interacting with items on the screen. You want a delta between pairs of mouse coordinates, or the previous position and the current position, while the mouse is being dragged. This is useful for panning the screen, moving selected items or drawing items. Alternatively, this could be expressed as an actual pair of Points rather than a delta.
var deltas = from pair in movingEvents.Buffer(2)
let array = pair.ToArray()
let a = array.EventArgs.Location
let b = array.EventArgs.Location
select new Size(b.X - a.X, b.Y - a.Y);
var dragDeltas = moveDeltas.WaitUntil(downEvents).Until(upEvents).Repeat();
dragDeltas is an
IObservable that provides consecutive mouse events paired together with the difference in position calculated. The call to
Observable.Buffer() doesn't split elements into separate pairs, but has each single mouse move event appearing in two pairs, except for the first and last event of course.
The example applications for this are
ConsoleReader is the parent process, and
ConsoleOutputter represents the child process.
When working with external processes, the
stderr messages often need to be redirected and output to the main window. This can be a challenge to get right because reading from a child process' standard output and error stream is a blocking operation. The reactive framework makes this easy.
The first part is to represent a
StreamReader as an
IEnumerable to make it easy to work with the Reactive Framework.
private static IEnumerable<string> GetLineReader(StreamReader reader)
var l = reader.ReadLine();
if (l == null)
yield return l;
The next part is to merge the standard and output streams into a single observable:
var process = Process.Start(info);
var childStdOut = GetLineReader(process.StandardOutput).ToObservable();
var childStdErr = GetLineReader(process.StandardError).ToObservable();
LineOutputter is an
Action<string> delegate and is called for each line produced by
StandardError streams from the child process.
A Simple Image Viewing Application
The code for this example is in the
Imagine a simple application where you could examine a directory, show the image files in it, and then be able to click on the files to show the image.
This would have these main places of work:
- Finding the files in the directory
- Loading a thumbnail for each file
- Loading the selected image when it is clicked on
For a responsive user interface, all three of these need to occur in a background thread. The challenging part of this would be queuing things in a simple and thread safe way, and communicating that between threads. Here is a diagram showing the flow of actions and information that needs to happen.
The interesting part is that when loading an image, we also have the opportunity to set a thumbnail on the image.
A new concept we use in this example is the
ISubject<T> interface, and its default implementation,
Subject<T> provides an easy mechanism to set up some observers ahead of time, and then provide those observers with other sequences we don't yet know about. Consider it as a door, where you can watch for people coming inside, but you don't really know where they are coming from.
We use this for the thumbnail setting because thumbnails can come from two sources - loading thumbnails based on the list of files, and loading a thumbnail when viewing the complete image.
Chaining Subjects and OnComplete
One of the unexpected outcomes of using a Subject to observe an observable was that it wasn't repeating. Consider these two snippets of code:
myObservable.Subcribe(item => mySubject.OnNext(item))
They are similar such that the subject will receive notifications, but the key difference is that when
OnCompleted(), the subject will also have its
OnCompleted() method called. This means that anything subscribed to the subject will no longer send out notifications. Just something to be aware of.
To Publish or Not to Publish
In the first version of the article, this image viewer had some severe memory problems. It would have wildly fluctuating memory usage that was very indicative of a large number of memory allocations occurring and then being garbage collected. The root cause ended up being the way observables are evaluated using the query methods.
I needed some help solving this problem, and the kind folks over at the Rx Forums explained things nicely. Here is the thread in question.
When using a query on an observable, such as this:
var images = from filename in filenames
It turns out that the
select statement will be evaluated every time for each observer attached to images. When dealing with large
Disposable objects such as
Images, this is a significant problem.
The solution is similar in concept to the
ToList() extension method for
IEnumerable, in this case it is
Publish(). This forces the
select statement to only be evaluated once per incoming item, rather than once per outgoing observer. Like so:
var images = (from filename in filenames
Setting Up the Chain of Observables
The code below shows the core of the image viewer form. This is where we connect up all the observables on the various threads ready for the filenames to be read in when the Find Images button is pressed.
var thumbnailSubject = new Subject<KeyValuePair<ListViewItem, Image>>();
item.Key.ImageKey = item.Key.Text;
var thumbnailLoader = from listItem in itemObservable.ObserveOn(Scheduler.ThreadPool)
let filename = listItem.Text
let thumb = LoadThumbnail(filename)
select MakePair(listItem, (Image)thumb);
thumbnailLoader.Subscribe(a => thumbnailSubject.OnNext(a));
var selectedFilename = (from index in Observable.FromEventPattern<EventArgs>
where this.listViewThumbnails.SelectedItems.Count > 0
Item = this.listViewThumbnails.SelectedItems,
Filename = this.listViewThumbnails.SelectedItems.Text
}).DistinctUntilChanged(i => i.Filename);
var selectedImage = (from item in selectedFilename.ObserveOn(Scheduler.ThreadPool)
let filename = item.Filename
let image = Image.FromFile(filename)
let thumb = new Bitmap(image, new Size(64, 64))
Item = item.Item,
Filename = item.Filename,
Image = image,
Thumb = thumb
var selectedImageThumb = from item in selectedImage
select MakePair(item.Item, (Image)item.Thumb);
selectedImageThumb.Subscribe(a => thumbnailSubject.OnNext(a));
(item => ShowImage(item.Image));
If you follow the code, you can see that it follows the flow diagram shown above.
Points of Interest
The Reactive Framework isn't a silver bullet, the be all and end all, or the hammer for every nail. It is just another tool in a developer's toolbelt.
I haven't touched on the internals of the framework or how it works at a fundamental level. That will be for a future article.
The Rx Framework opens some interesting possibilities for transformation pipelines, or dataflow networks. While not perfectly suited, it would still be an interesting topic to investigate.
Thanks go to fcharlon on the MSDN forums for assisting with some Rx issues.
Thanks to the kind CodeProject member leeloo999 for identifying and helping fix some memory usage issues in the Image Viewer.
Thanks to everyone who read and voted for the article!
- Jan 10, 2010 - First draft
- Jan 15, 2010 - First release
- Feb 5, 2010 - Fixed some memory issues and other minor article issues
- Nov 4, 2011 - Updated to use the official Rx Release 1.0.10621.0 from NuGet and Visual Studio 2010