Click here to Skip to main content
Click here to Skip to main content

The Rx Framework By Example

, 7 Nov 2011 CPOL
Rate this:
Please Sign up or sign in to vote.
An introduction to the Reactive Framework using some practical examples

Introduction

The Rx Framework is a very interesting and useful library recently released via DevLabs. The purpose of the framework is to provide a large framework for working with the Observer pattern. That is a nice academic description, but it is hardly enough to start using it immediately. So let us dig a little deeper.

This article is to provide an introduction to the Rx Framework by example, not by description. I don't go into any detail about internal behaviour, but I hope to cover that in an up and coming article. I find when learning something completely new, working examples provide me with enough information to get started and put me in a position to learn more.

Background

All of the information presented here is either summarised or derived from the excellent video and text blog posts on Channel 9. I highly recommend this as a learning resource for all developers out there.

For this article, it is assumed that the reader has a working knowledge of the Observer pattern.

This article's code was written against version 1.0.10621.0 of the Reactive Framework from NuGet.

It would also help to watch the Rx Framework introductory video on Channel 9.

IObservable<T>

The centrepiece of the Rx framework is the observable pattern, defined by the IObservable<T> interface. It provides just a single method to subscribe to the observable using an instance of an object implementing the IObserver<T> interface.

The interesting connect that the developers of the Rx Framework made is that IObservable<T> is the mathematical dual to the IEnumerable<T> interface. This is explained excellently in this video, but the summary is that IEnumerable<T> pulls elements from a sequence, and IObservable<T> pushes elements from a sequence.

What Can I Do With This?

The part I found the most challenging is what problems to solve with it. It seemed like a very large hammer looking for problems. While it doesn't solve problems that were unsolvable, I think it does offer new and interesting ways to approach existing problems, and sometimes solve them in an easier to write and easier to maintain way.

The framework provides built in threading support and control to let collections be observed in background threads.

Getting the Directory Tree

One well known task when working with disks is getting the list of directories in a tree in a UI application.

The code for this example is in the Directories project.

Consider this IEnumerable<string> result:

static IEnumerable<string> GetAllDirectories(string path)
{
  string[] subdirs = null;
  
  // Some directories may be inaccessible.
  try
  {
    subdirs = Directory.GetDirectories(path);
  }
  catch (IOException)
  {
  }
  
  if (subdirs != null)
  {
    foreach (var subdir in subdirs)
    {
      yield return subdir;
      
      foreach (var grandchild in GetAllDirectories(subdir))
      {
        yield return grandchild;
      }
    }
  }
}

You can subscribe to this in an asynchronous fashion using:

var observable = Observable.ToObservable(GetAllDirectories(@"c:\"), Scheduler.ThreadPool);
observable.Subscribe(outputDirectory);	

with outputDirectory being an Action<string>. This delegate is called on a thread from the ThreadPool, so that means it will not work directly when calling methods on Winforms objects. You can use the normal Control.Invoke() approach to solving this, or alternatively you can do this:

var observable = Observable.ToObservable
	(GetAllDirectories(@"c:\")).ObserveOn(SynchronizationContext.Current);
observable.Subscribe(outputDirectory);

This will ensure that all calls to outputAction are made on the same thread that created myWinformsControl.

Now the downside to this is that a high frequency of individual calls to Winforms updates can cause a stuttering user interface. One solution to this is to buffer the strings produced by the observer. The Reactive Framework can buffer in two ways, either by quantity, over time or both. Since frequency is the issue, we will be buffering using time.

var observable = 

Observable.ToObservable(GetAllDirectories(@"c:\"))
        .Buffer(TimeSpan.FromSeconds(1))
        .ObserveOn(this);

observable.Subscribe(outputDirectories);

This will buffer until one second has elapsed, and then call getDirectories, which is an Action<IEnumerable<string>>.

Now this worked great for me, until I was lucky enough for acquire an SSD drive. In the 1 second delay, I now retrieve around 16000 elements per second. This is much more than the TreeView control can add and still maintain a smooth user interface.

At first glance, the solution would be to prove an element count of 1000 to Buffer(), but this just means the buffering will complete every 1000 elements, which is around 80 milliseconds, and that makes things even worse.

The solution is to combine two sequences, one that is a regular interval, and the other than is a buffer of 1000 elements.

Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.ThreadPool)
.Zip(GetAllDirectories(@"c:\").ToObservable(Scheduler.ThreadPool)
.Buffer(1000), (a, b) => b)

The Zip() operator combines an element from each sequence in turn. So that means each 1000 entries is paired with an element produced every 1 second.

The last piece is stopping the reading of directories. Each time the Subscribe method is called, it returns an IDisposable that can be used to unsubscribe from the observer. We can use this to stop the reading, like this:

IDisposable observer = observable.Subscribe(outputDirectories)
      ...
observer.Dispose() 

User Interface Interaction

Many applications I work on have an interactive aspect to them, such as drawing or modifying a 2D view of objects. The most common tasks in this environment are selecting, drawing and modifying objects. The core part of these behaviours is mouse interaction, particularly combining mouse buttons and movement into dragging operations.

The challenging part of these is tracking mouse button and movement state so position deltas can be calculated across mouse events. While doing that work isn't challenging, the hard part is arriving at a clean design. One typically ends up with a number of boolean variables tracking if things are being dragged, were just dragging, if things have moved at least 3 pixels, and any number of other combinations.

While I have not used the reactive framework in a production environment to address these issues, it has some promise and is at the very least academically interesting, and does show some other aspects of the Rx framework.

The example showing these techniques is in the DrawingApplication project. It is a very simple line drawing mini-application. It lets you draw and move lines on the screen.

Observing Events

The first step is to set up the events as IObservables.

var movingEvents = Observable.FromEventPattern<MouseEventHandler, 
	MouseEventArgs>(h => this.MouseMove += h, h => this.MouseMove -= h);

var upEvents = Observable.FromEvent<MouseEventHandler, 
	MouseEventArgs>(h => this.MouseUp += h, h => this.MouseUp -= h);

var downEvents = Observable.FromEvent<MouseEventHandler, 
	MouseEventArgs>(h => this.MouseDown += h, h => this.MouseDown -= h);

This sets up the three IObservable<IEvent<MouseEventArgs>> instances, movingEvents, upEvents and downEvents. Each time that mouse interaction occurs, any observers will be notified.

There are many things we can do from here.

Identifying Mouse Moved between Mouse Downs and Mouse Ups

var draggingEvents = movingEvents.WaitUntil(downEvents).Until(upEvents);

Now the trouble with this is that the draggingEvents observable is a once-off. That means as soon as the mouse button is released, the observable will no longer produce elements. The solution for this is to use the Repeat() extension method.

var draggingEvents = 
		movingEvents.WaitUntil(downEvents).Until(upEvents).Repeat();

Picking Out Left Mouse Clicks

var leftClicks = from e in downEvents 
                 where e.EventArgs.Button == MouseButtons.Left 
                 select e;

leftClicks.Subscribe(...);

This also demonstrates that you can use LINQ query syntax to work with observables.

Pairing Up Dragged Mouse Events

This is a common requirement when interacting with items on the screen. You want a delta between pairs of mouse coordinates, or the previous position and the current position, while the mouse is being dragged. This is useful for panning the screen, moving selected items or drawing items. Alternatively, this could be expressed as an actual pair of Points rather than a delta.

var deltas = from pair in movingEvents.Buffer(2)
             let array = pair.ToArray()
             let a = array[0].EventArgs.Location
             let b = array[1].EventArgs.Location
             select new Size(b.X - a.X, b.Y - a.Y);

var dragDeltas = moveDeltas.WaitUntil(downEvents).Until(upEvents).Repeat();

dragDeltas is an IObservable that provides consecutive mouse events paired together with the difference in position calculated. The call to Observable.Buffer() doesn't split elements into separate pairs, but has each single mouse move event appearing in two pairs, except for the first and last event of course.

Combining Streams

The example applications for this are ConsoleReader and ConsoleOutputterConsoleReader is the parent process, and ConsoleOutputter represents the child process.

Console Reader Screenshot

When working with external processes, the stdout and stderr messages often need to be redirected and output to the main window. This can be a challenge to get right because reading from a child process' standard output and error stream is a blocking operation. The reactive framework makes this easy.

The first part is to represent a StreamReader as an IEnumerable to make it easy to work with the Reactive Framework.

private static IEnumerable<string> GetLineReader(StreamReader reader)
{
  while (reader.BaseStream.CanRead)
  {
    var l = reader.ReadLine();
    
    if (l == null)
    {
      break;
    }
    
    yield return l;
  }
}

The next part is to merge the standard and output streams into a single observable:

var process = Process.Start(info);

var childStdOut = GetLineReader(process.StandardOutput).ToObservable();
var childStdErr = GetLineReader(process.StandardError).ToObservable();

Observable.Merge(childStdOut, childStdErr).Subscribe(LineOutputter);

LineOutputter is an Action<string> delegate and is called for each line produced by StandardOutput and StandardError streams from the child process.

A Simple Image Viewing Application

The code for this example is in the ImageViewer project.

Imagine a simple application where you could examine a directory, show the image files in it, and then be able to click on the files to show the image.

This would have these main places of work:

  • Finding the files in the directory
  • Loading a thumbnail for each file
  • Loading the selected image when it is clicked on

For a responsive user interface, all three of these need to occur in a background thread. The challenging part of this would be queuing things in a simple and thread safe way, and communicating that between threads. Here is a diagram showing the flow of actions and information that needs to happen.

Chart showing the flow of data for the image viewer application

The interesting part is that when loading an image, we also have the opportunity to set a thumbnail on the image.

Subjects

A new concept we use in this example is the ISubject<T> interface, and its default implementation, Subject<T>.  A Subject<T> provides an easy mechanism to set up some observers ahead of time, and then provide those observers with other sequences we don't yet know about. Consider it as a door, where you can watch for people coming inside, but you don't really know where they are coming from.

We use this for the thumbnail setting because thumbnails can come from two sources - loading thumbnails based on the list of files, and loading a thumbnail when viewing the complete image.

Chaining Subjects and OnComplete

One of the unexpected outcomes of using a Subject to observe an observable was that it wasn't repeating. Consider these two snippets of code:

myObservable.Subscribe(mySubject)

and:

myObservable.Subcribe(item => mySubject.OnNext(item))

They are similar such that the subject will receive notifications, but the key difference is that when myObservable calls OnCompleted(), the subject will also have its OnCompleted() method called. This means that anything subscribed to the subject will no longer send out notifications. Just something to be aware of.

To Publish or Not to Publish

In the first version of the article, this image viewer had some severe memory problems. It would have wildly fluctuating memory usage that was very indicative of a large number of memory allocations occurring and then being garbage collected. The root cause ended up being the way observables are evaluated using the query methods.

I needed some help solving this problem, and the kind folks over at the Rx Forums explained things nicely. Here is the thread in question.

When using a query on an observable, such as this:

var images = from filename in filenames 
            select Image.FromFile(filenames)

It turns out that the select statement will be evaluated every time for each observer attached to images. When dealing with large Disposable objects such as Images, this is a significant problem.

The solution is similar in concept to the ToList() extension method for IEnumerable, in this case it is Publish(). This forces the select statement to only be evaluated once per incoming item, rather than once per outgoing observer. Like so:

var images = (from filename in filenames 
             select Image.FromFile(filenames)).Publish()

// ... subscribe more observables

images.Connect();

Setting Up the Chain of Observables

The code below shows the core of the image viewer form. This is where we connect up all the observables on the various threads ready for the filenames to be read in when the Find Images button is pressed.

  // Create a subject to make it easy to listen for thumbnail images 
  // and update them on the winforms thread.
  var thumbnailSubject = new Subject<KeyValuePair<ListViewItem, Image>>();
  
  // Observes thumbnailSubject on the winforms thread and puts the thumbnail 
  // into the image list and the listview.
  thumbnailSubject.ObserveOn(SynchronizationContext.Current).Subscribe(item =>
  {
    this.imageList.Images.Add(item.Key.Text, item.Value);
    item.Key.ImageKey = item.Key.Text;
  });
  
  // We want to receive item notifications on a different thread, so we 
  // make use of the ObserveOn() method.
  var thumbnailLoader = from listItem in itemObservable.ObserveOn(Scheduler.ThreadPool)
                        let filename = listItem.Text
                        let thumb = LoadThumbnail(filename)
                        select MakePair(listItem, (Image)thumb);
                        
  // We can't just call thumbnailLoader.Subscribe(thumbnailSubject) 
  // because when the thumbnailLoader 
  // finishes it will call the subjects OnComplete(), 
  // and that will stop our thumbnails loading on a different directory.
  thumbnailLoader.Subscribe(a => thumbnailSubject.OnNext(a));
  
  // Create an observable based on the SelectedIndexChanged event on the list 
  // view, and provide objects describing the selected item 
  // and the filename for that item.
  var selectedFilename = (from index in Observable.FromEventPattern<EventArgs>
  		(this.listViewThumbnails, "SelectedIndexChanged")
                         where this.listViewThumbnails.SelectedItems.Count > 0
                         select new
                         {
                           Item = this.listViewThumbnails.SelectedItems[0],
                           Filename = this.listViewThumbnails.SelectedItems[0].Text
                         }).DistinctUntilChanged(i => i.Filename);
                         
  // Transform the selectedFilename observable into one that 
  // loads in images from the selectedFilename observable.
  // We use ObserveOn() to make sure the image loading happens on a separate thread.
  var selectedImage = (from item in selectedFilename.ObserveOn(Scheduler.ThreadPool)
                       let filename = item.Filename
                       let image = Image.FromFile(filename)
                       let thumb = new Bitmap(image, new Size(64, 64))
                       select new
                       {
                         Item = item.Item,
                         Filename = item.Filename,
                         Image = image,
                         Thumb = thumb
                       }).Publish();
                       
  // Observe the selectedImage and construct an object the thumbnailSubject understands
  var selectedImageThumb = from item in selectedImage
                           select MakePair(item.Item, (Image)item.Thumb);
                           
  // Connect the selected thumb image to the thumbnail subject 
  // so when we load the selected image, the thumbnail updates.
  selectedImageThumb.Subscribe(a => thumbnailSubject.OnNext(a));
  
  // Observe selectedImage on the WinForms thread to show the selected image 
  // in the main window.
  selectedImage.ObserveOn(SynchronizationContext.Current).Subscribe
			(item => ShowImage(item.Image));
  
  // now that everything is subscribed to the selected image, 
  // connect it up to start publishing.
  selectedImage.Connect();

If you follow the code, you can see that it follows the flow diagram shown above.

Points of Interest

The Reactive Framework isn't a silver bullet, the be all and end all, or the hammer for every nail. It is just another tool in a developer's toolbelt. 

I haven't touched on the internals of the framework or how it works at a fundamental level. That will be for a future article.

The Rx Framework opens some interesting possibilities for transformation pipelines, or dataflow networks. While not perfectly suited, it would still be an interesting topic to investigate.

Thanks

Thanks go to fcharlon on the MSDN forums for assisting with some Rx issues.

Thanks to the kind CodeProject member leeloo999 for identifying and helping fix some memory usage issues in the Image Viewer.

Thanks to everyone who read and voted for the article!

History

  • Jan 10, 2010 - First draft
  • Jan 15, 2010 - First release
  • Feb 5, 2010 - Fixed some memory issues and other minor article issues
  • Nov 4, 2011 - Updated to use the official Rx Release 1.0.10621.0 from NuGet and Visual Studio 2010

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Phil Martin
Software Developer (Senior) MineSched Software International
Canada Canada
A professional developer in the Mining sector since 2001, Phil has a passion for creating robust, quality scientific applications.
 
My recent professional work focuses on scheduling and optimisation in the mining sector.
 
My major fields of interest in software are 3D applications, visualisation, scheduling and optimisation, and novel UI design. My goal as a professional developer is to help people solve real world problems through reliable and fun to use software.

Comments and Discussions

 
GeneralExcellent article, but .... Pinmemberleeloo9991-Feb-10 21:00 
GeneralRe: Excellent article, but .... PinmemberPhil Martin...1-Feb-10 21:09 
GeneralRe: Excellent article, but .... PinmemberPhil Martin...2-Feb-10 2:09 
GeneralRe: Excellent article, but .... Pinmemberleeloo9993-Feb-10 21:17 
GeneralRe: Excellent article, but .... PinmemberPhil Martin...3-Feb-10 22:20 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web03 | 2.8.141022.2 | Last Updated 8 Nov 2011
Article Copyright 2010 by Phil Martin
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid