Click here to Skip to main content
Click here to Skip to main content
Technical Blog

A quick look at the Reactive Extensions (Rx) for .Net

, 4 Jan 2010 CPOL
Rate this:
Please Sign up or sign in to vote.
Introduction and some words about what I’ve been doing lately Hello everyone, first of all, let me tell you that I’ve been very busy at work, we finally released a site for one of our customers and is a pleasure to see our hard work materialized in such a nice website. We worked pretty hard par

Introduction and some words about what I’ve been doing lately

Hello everyone, first of all, let me tell you that I’ve been very busy at work, we finally released a site for one of our customers and is a pleasure to see our hard work materialized in such a nice website.
We worked pretty hard particularly in the eCommerce module, which lead us to learn a lot of new paradigms and  even more tools and  frameworks around that module. Also I think the designers made a great work on the look and feel side of things, what can I say? I’m part of a great team. You can visit the site here.

Furthermore, there are a couple of personal projects for the next year which had get me very excited. So stay tuned for further info about this, I know you, dear reader, will really like it Smile | :)

Back to the subject, so, ¿what’s Rx?, again, I’m not so good at giving definitions or concepts, and in order to avoid any misunderstanding, here’s an excerpt from the project home page:

Rx is a library for composing asynchronous and event-based programs using observable collections.

Rx is a superset of the standard LINQ sequence operators that exposes asynchronous and event-based computations as push-based, observable  collections via the new .NET 4.0 interfaces IObservable<T> and IObserver<T>.  These are the mathematical dual of the familiar IEnumerable<T> and IEnumerator<T> interfaces for pull-based, enumerable collections in the .NET framework.

The Rx project is a new project from Microsoft and I hope you to play with it and when it becomes an stable library start using it in your projects together with other niceties that Microsoft is lately releasing to the public in general.

You may infer from the excerpt above that this is only available for .Net 4.0, but in fact it is also available for .Net 3.5. Check the downloadable installers and you’ll see there are 3 different versions of Rx: One for .Net 3.5, other for .Net 4.0 Beta 2, and another for Silverlight 3.

The code I’m going to present you today is using the .Net 3.5 bits (to reach a broader audience), but I think it is also applicable to .Net 4.0, I am not sure about Silverlight 3.

There’s a great set of articles which will introduce you into Rx way more articulated that I’m doing it, I’ve read them all and found them very useful:

Also, there’s a wiki with a great amount of information about the Rx API which is updated from time to time, you check it out here.

Demonstrating Rx usage

As usual, I tend myself to explain things in code, together with a *common* scenario, well, at least one that could slightly reflect real world issues. However, this is completely relative from developer to developer.

The scenario is as follows, in our company we’ve a website that requires the following notifications:

  • Register all the visits.
  • When the user visiting our site is a new one, sent a notification to the webmaster about this.
  • When the user visiting our site is from USA or UK, sent a notification to the webmaster about this.

Those are roughly what the user tell to us, we as developer, add a couple of niceties that the user is not aware of:

  • In order to avoid spamming the webmaster email account, we sent those emails by chunks X, and each feature has a way to set the threshold. For our sample we are going to set this value to 4, but in a real app you must provide perhaps a UI with a database or some other way to configure this.

Now, lets define our ISiteVisitorsService interface:

 

    public interface ISiteVisitorsService 
    {  
        void StartMonitoring();
        IObservable<IEvent<VisitEventArgs>> NewVisitsObservable { get; }
        event EventHandler<VisitEventArgs> NewVisitEvent;
    }  

 
The VisitEventArgs and its surrounding classes are structured as follows: 

    public class VisitEventArgs : EventArgs
    { 
        public VisitDetails VisitDetail { get; set; }
    } 
 
    public class VisitDetails
    { 
        public int VisitId { get; set; }
        public string FirstName { get; set; }
        public string LastName { get; set; }
        public VisitTypes VisitType { get; set; }
        public Contries Country { get; set; }
        public DateTime VisitDate { get; set; }
        public int VisitCount { get; set; }
   }  
 
    public enum VisitTypes
    { 
        FirstVisitor = 0,
        ReturningVisitor = 1,
    } 
 

    public enum Contries
    {
        Usa = 0,
        Uk = 1, 
        China = 2,
        Other = 3
    } 
 
    public static class VisitorHandlerHelper
    {
        public static void LogVisit(IEnumerable<VisitDetails> visitDetails)
        {
            foreach (var visitDetail in visitDetails)
            {
                Console.WriteLine();
                Console.WriteLine("Registering visit => {0}: {1} {2} {3} {4} {5} {6}", visitDetail.VisitId,
                                 visitDetail.FirstName, visitDetail.LastName, visitDetail.VisitType,
                                 visitDetail.Country,
                                 visitDetail.VisitDate, visitDetail.VisitCount);
           }
       } 

       public static void SendEmailForEnglishSpeakersVisitors(IEnumerable<VisitDetails> visitDetails)
     { 
           foreach (var visitDetail in visitDetails)
           { 
               Console.WriteLine(
                   "Sending email to webmaster notifying an english speaker visitor => {0}: {1} {2} - {3}", 
                   visitDetail.VisitId,
                   visitDetail.FirstName, 
                   visitDetail.LastName,
                   visitDetail.Country); 
           }
       }     
       public static void SendFirstVisitorMessage(IEnumerable<VisitDetails> visitDetails)
       { 
           foreach (var visitDetail in visitDetails)
          { 
               Console.WriteLine("Sending email to webmaster notifying a new visitor => {0}: {1} {2} {3} {4} {5}",
                                 visitDetail.VisitId, 
                                 visitDetail.FirstName,
                                 visitDetail.LastName, 
                                 visitDetail.Country,
                                 visitDetail.VisitType, 
                                 visitDetail.VisitCount);
           } 
       }
   } 
 

I provide two flavors to hook into the scenario where a new visitor arrives, the old fashioned will use the NewVisitEvent, and the others could use the Rx IObservable interface. I’ve added to StartMonitoring method to tell the service to enable firing events when new visitors land to our site. Obviously, I’m have no plans in implement a real monitoring service, rather you’ll see a faked service here. The implementations looks as follows: 

    public class SiteVisitorsService : ISiteVisitorsService 
    { 
        private readonly IObservable<IEvent<VisitEventArgs>> _visitsObservable;
        private readonly string[] _names = new[] { "John", "Alex", "Jose", "Frank", "Laura", "Marco", "Mary" };
        private readonly string[] _lastNames = new[] { "Cruz", "Smith", "Gonzales", "Garcia", "Williams" };
    
        public event EventHandler<VisitEventArgs> NewVisitEvent;
    
        public SiteVisitorsService()
       {
           _visitsObservable = Observable.FromEvent<VisitEventArgs>(handler => NewVisitEvent += handler,
                                                             handler => NewVisitEvent -= handler);
       }
   
       public void StartMonitoring()
       {
           ThreadPool.QueueUserWorkItem(runSimulation);
       }
   
       public IObservable<IEvent<VisitEventArgs>> NewVisitObservable
       {
           get { return _visitsObservable; }
       }
   
       private void invokeNewVisitEvent(VisitEventArgs e)
       {
           var newVisitEvent = NewVisitEvent;
           if (newVisitEvent != null)
           {
               newVisitEvent(this, e);
           }
        }
   
       private void runSimulation(object state)
       {
           var visitId = 0;
           while (true)
           {
               Thread.Sleep(2000);
               var rnd = new Random();
               var visitType = (VisitTypes) rnd.Next(0, 2);
               var visitDetail = new VisitDetails
                                     {
                                         VisitId = visitId++,
                                         Country = (Contries) rnd.Next(0, 4),
                                         FirstName = _names[rnd.Next(0, _names.Length)],
                                         LastName = _lastNames[rnd.Next(0, _lastNames.Length)],
                                         VisitCount = visitType == VisitTypes.FirstVisitor ? 1 : rnd.Next(1, 2000),
                                         VisitDate = DateTime.Now.AddMonths(rnd.Next(0, 12)),
                                         VisitType = visitType
                                     };
               invokeNewVisitEvent(new VisitEventArgs()
                                       {
                                           VisitDetail = visitDetail
                                       });
           }
       }
   
   } 
  

I think the code is pretty clear on what it does, what’s interesting about the code is the following snippet: 

_visitsObservable = Observable.FromEvent<VisitEventArgs>(handler => NewVisitEvent += handler, 
                                                  handler => NewVisitEvent -= handler); 
 

What I’m doing there is creating an IObservable (thus, an instance that sent notifications) from our NewVisitEvent. 

The FromEvent extension method has the following signature:

public static System.IObservable<IEvent<TEventArgs>> FromEvent<TEventArgs>(System.Action<EventHandler<TEventArgs>> addHandler, System.Action<EventHandler<TEventArgs>> removeHandler)
    where TEventArgs : System.EventArgs

As you can see, it’s a generic method which expect an inheritor of EventArgs, that would be our VisitEventArgs class, and also a delegate to add and remove handlers to this event.

Now, lets define our handlers, they should apply to the following interface: 

public interface IVisitorHandler  
{  
	void RegisterListeners(ISiteVisitorsService service);
}        

The Old way 

First, our old fashioned handler:  

    public class OldFashionedVisitorHandler : IVisitorHandler
    { 
        private static readonly List<VisitDetails> VisitsBuffer = new List<VisitDetails>();
        private static readonly List<VisitDetails> EnglishSpeakersVisitorsBuffer = new List<VisitDetails>();
        private static readonly List<VisitDetails> FirstVisitorBuffer = new List<VisitDetails>();
    
        public void RegisterListeners(ISiteVisitorsService service)
        {
            service.NewVisitEvent += serviceNewVisitHandler;
        }
    
        static void serviceNewVisitHandler(object sender, VisitEventArgs e)
        {
            var visitDetail = e.VisitDetail;
            VisitsBuffer.Add(visitDetail);
            if (VisitsBuffer.Count == VisitorHandlerHelper.LogVisitThreshold)
            {
                VisitorHandlerHelper.LogVisit(VisitsBuffer);
                VisitsBuffer.Clear(); 
            } 
            if (visitDetail.Country == Contries.Uk || visitDetail.Country == Contries.Usa)
            {
                EnglishSpeakersVisitorsBuffer.Add(visitDetail);
                if (EnglishSpeakersVisitorsBuffer.Count == VisitorHandlerHelper.SendEmailForEnglishSpeakersVisitorsThreshold)
                {
                    VisitorHandlerHelper.SendEmailForEnglishSpeakersVisitors(EnglishSpeakersVisitorsBuffer);
                    EnglishSpeakersVisitorsBuffer.Clear();
                }
            }
            if (visitDetail.VisitType == VisitTypes.FirstVisitor)
            {
                FirstVisitorBuffer.Add(visitDetail);
                if (FirstVisitorBuffer.Count == VisitorHandlerHelper.SendFirstVisitorMessageThreshold)
                {
                    VisitorHandlerHelper.SendFirstVisitorMessage(FirstVisitorBuffer);
                    FirstVisitorBuffer.Clear();
                }
            }
        }
    }
 

Now, please look at the serviceNewVisitHandler static method, all those if statements, the thing about the three buffers to hold the VisitDetails instances (VisitsBuffer, EnglishSpeakersVisitorsBuffer and FirstVisitorBuffer), a complete mess!, also all that complexity add noises and hides the real purpose of the class. Also one last thing, in the RegisterListeners method we are using the NewVisitEvent event to handle when a new visitor arrives.
To summarize, this is how we’ve been doing this until today (I hope). 

The .Net Reactive Extension way 

One of the benefits we get by using the RX library is that we can use events in a LINQ fashion, thus, that’s why some people name the Reactive Library as LINQ to Events.
Now, lets take a look to our reactive fashioned visitor handler:

 

    1 public class ReactiveVisitorHandler : IVisitorHandler
    2 {
    3     const int LogVisitThreshold = VisitorHandlerHelper.LogVisitThreshold;
    4     const int SendEmailForEnglishSpeakersVisitorsThreshold = VisitorHandlerHelper.SendEmailForEnglishSpeakersVisitorsThreshold;
    5     const int SendFirstVisitorMessageThreshold = VisitorHandlerHelper.SendFirstVisitorMessageThreshold;
    6  
    7     public void RegisterListeners(ISiteVisitorsService service)
    8     {
    9         var observerData = service.NewVisitObservable.Select(x => x.EventArgs.VisitDetail);
   10         observerData
   11             .Buffer(LogVisitThreshold, LogVisitThreshold)
   12             .Subscribe(VisitorHandlerHelper.LogVisit);
   13 
   14         observerData
   15             .Where(x => x.Country == Contries.Uk ||
   16                         x.Country == Contries.Usa)
   17             .Buffer(SendEmailForEnglishSpeakersVisitorsThreshold, SendEmailForEnglishSpeakersVisitorsThreshold)
   18             .Subscribe(VisitorHandlerHelper.SendEmailForEnglishSpeakersVisitors);
   19 
   20         observerData
   21             .Where(x => x.VisitType == VisitTypes.FirstVisitor)
   22             .Buffer(SendFirstVisitorMessageThreshold, SendFirstVisitorMessageThreshold)
   23             .Subscribe(VisitorHandlerHelper.SendFirstVisitorMessage); 
   24     }
   25 }
 

The interesting stuffs happen in the RegisterListeners method.
First of all, if you look at the line 9, you’ll see we are projecting the  event arguments from the service IObservable to get only the stuff we are interesting in (the VisitDetail property).

Later on, we start  subscribing our delegates to the IObservable previously created.

From line 10 to 12, we are buffering the output of the event and after that we are subscribing the resulting IObservable to our VisitorHandlerHelper.LogVisit method. Way more simpler way than in our counterpart from the OldFashionedVisitorHandler.

From line 14 to 17 and from line 20 to 23, first we are filtering the IObservable by certain criteria, the first one will filter those events where the Country property is UK or USA, while the other IObservable is filtered only to be handled when the VisitType is equal to FirstVisitor. Next we are buffering the output as we did for the previous observer. And finally, we just subscribe the observable to its corresponding methods.

I don’t know about you, but I think that our ReactiveVisitorHandler is way more friendly than the OldFashionedVisitorHandler. It removes all that complexity and reduces drastically the amount of code you need to write in order to subscribe to an event and perform some special handling based on a complex requirement.

The output

Both versions will output the same thing, which in a console app may look like :

ReactiveSample

 

And one day we get a request for an improvement…

So, days go by, everything seems to work correctly. However we get a request telling us that the notification handling should work as always but now it must work in an async fashion, without blocking the service, thus, improving the performance.

So, what’s required to solve this problem? Let’s change our code to reflect just this: 

    1 public void RegisterListeners(ISiteVisitorsService service)  
    2 { 
    3     var observerData = service.NewVisitObservable.Select(x => x.EventArgs.VisitDetail)
    4         .Asynchronous();
    5     observerData
    6         .Buffer(LogVisitThreshold, LogVisitThreshold)
    7         .Subscribe(VisitorHandlerHelper.LogVisit);
    8 
    9     observerData 
   10         .Where(x => x.Country == Contries.Uk ||
   11                     x.Country == Contries.Usa)
   12         .Buffer(SendEmailForEnglishSpeakersVisitorsThreshold, SendEmailForEnglishSpeakersVisitorsThreshold)
   13         .Subscribe(VisitorHandlerHelper.SendEmailForEnglishSpeakersVisitors);
   14 
   15     observerData 
   16         .Where(x => x.VisitType == VisitTypes.FirstVisitor)
   17         .Buffer(SendFirstVisitorMessageThreshold, SendFirstVisitorMessageThreshold)
   18         .Subscribe(VisitorHandlerHelper.SendFirstVisitorMessage);
   19 }
 

Do you note the difference (line 4)? So it took us just one line to achieve such a complex thing in a thread safe fashion.

What’s required to do the same in our OldFashionedVisitorHandler…well, I left that as an exercise to the reader, because from now on, frankly, I don’t care.

 

Summary

I hope dear reader you’ve noted the great improvement and less amount of code that you’ll get by using the Reactive Library, while it is in its initial stage, it is definitely something you should give a try.

The downloadable bits can be found here.

Bye bye.

Shameless plug: You can check this article on my blog here.  

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

emiaj
Web Developer
Peru Peru
No Biography provided

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Terms of Use | Mobile
Web04 | 2.8.1411023.1 | Last Updated 4 Jan 2010
Article Copyright 2009 by emiaj
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid