Click here to Skip to main content
Click here to Skip to main content

Implementing the Observer Pattern using Rx.

, 2 Jun 2014 CPOL
Rate this:
Please Sign up or sign in to vote.
An alternative to Observer Pattern (C#).

Introduction.

This is an alternative to the Observer Pattern implementation detailed in Observer Pattern (C#). This alternative uses an event processing system based on Linq known as Reactive Extension or Rx for short. It is very well documented here and the Rx Wiki is here.

A brief explanation of the Observer Pattern.

The Observer Pattern defines a one-to-many relationship between objects. It is also known as Publish Subscribe. When the publisher (the ‘one’) publishes data, all the subscribers (the ‘many’) receive the data simultaneously. The pattern helps to maintain consistency between related objects and shows good separation of concerns because the publisher need only know that a subscriber implements a method to receive the data and subscribers need only to know that the publisher implements a method to allow them to receive the data.

Why use Rx?

One of the advantages of using Rx with the observer pattern is that publishers in Rx run asynchronously, so that program execution does not get blocked while the publisher is busy retrieving data. Instead, callbacks are used to notify subscribers when data is available. Callbacks are also used to inform subscribers if there is an error and when the sequence has ended. This avoids the situation where subscribers are continuing to observe events that have either finished or faulted. Rx also facilitates the use of Linq for filtering, grouping and composing data and there are methods available for testing as well as carrying out time-related processing such as buffering and throttling.

Rx implementation basics.

In Rx, the subscribers to a data stream are called Observers and the publisher of a data stream is called an Observable. Observables implement the interface IObservable<T>, it has one method, named Subscribe, that, as the name implies, enables Observers to subscribe. The Subscribe method returns an object that implements IDisposable. Unsubscribing is simply a matter of calling that object's Dispose() method.

Observers implement the interface IObserver<T>. IObserver<T> has three callback methods that allow the Observable to send information to its Observers. Here is the interface.

public interface IObserver<in T>
{
//Provides the observer with new data.
void OnNext(T value);

//Notifies the observer that the provider has experienced an error 
void OnError(Exception error);

//Notifies the observer that the provider has finished sending 
void OnCompleted();
}

An Implementation of the Observer pattern using Rx.

Rx has a helper class named Subject. Subject implements both IObservable<T> and IObserver<T> so it can be both an Observer and an Observable. Its OnNext(T value) method outputs value to all subscribed Observers. Here's a simple IObserver<int> class that can be used to subscribe to a Subject and output the data received to the console.

public class ObserverOfInts : IObserver<int>
    {
        public ObserverOfInts(string observerName)
        {
            this.ObserverName = observerName;
        }

        public string ObserverName { get; private set; }

        public void OnCompleted()
        {
            Console.WriteLine("Sequence completed");
        }

        public void OnError(Exception error)
        {
            Console.WriteLine("Exception raised. {0}", error.Message);
        }

        public void OnNext(int value)
        {
            Console.WriteLine("{0}: Observed {1}", this.ObserverName, value);
        }

     }

In this example, the Subject subscribes to an Observable and outputs the data stream to its subscribers.

        private static void SimpleDemoUsingSubject()
        {
            //declare an IEnumerable of ints with some data
            IEnumerable<int> intList = new List<int> { 2, 4, 8, 10, 14 };
            //Convert the enumerable to an Observable
            IObservable<int> observableOfInts = intList.ToObservable();
            var subject = new Subject<int>();
            var firstObserver = new ObserverOfInts("First");
            IDisposable firstSubscription = subject.Subscribe(firstObserver);
            //You can subscribe by providing only the OnNext(T value) callback handler
            IDisposable anonymousSubscription = subject.Subscribe(
                v => Console.WriteLine("  Anonymous observed " + v),
                //In this example, the OnCompleted callback is also provided
                () => Console.WriteLine("  Anonymous observed the sequence completed"));
            Console.WriteLine("Press Enter to End");
            //Connect the subject to the observable
            //This starts the sequence
            IDisposable subjectSubscription = observableOfInts.Subscribe(subject);
         
            Console.ReadLine();
            //Unsubscribe the subscribers
            firstSubscription.Dispose();
            anonymousSubscription.Dispose();
            subjectSubscription.Dispose();

            Console.ReadKey();
        }  

Simple Demo

Observing Events with Rx

Events are not easy to observe. The relevant data can be buried deeply in the event arguments and events are usually firmly attached to the class they are defined in. This makes them cumbersome to pass around for processing. With Rx, Events can be converted to observable objects of a type equal to the relevant data type and can be both passed to methods and tested with ease. The following example uses the MouseMove Event and the data is processed into an observable of Points. Some filtering is carried out so that a point is only sent to the observer when 10 is a factor of both point.X and point.Y. The important method is Observable.FromEventPattern<MouseEventArgs>(this, "MouseMove") where MoveEventArgs is the event arguments type, this is the sender and MouseMove is the Event name. The reason for calling the ObserveOn extension method is detailed.later

public void EventDemo()
        {
            this.OutputTextBox.AppendText("\r\n Event demo");
            this.OutputTextBox.AppendText("\r\n Move the mouse around this box to observe when 10 is a factor of both mouse Point.X and Point.Y ");
            IObservable<EventPattern<mouseeventargs>> observableOfArgs =
                Observable.FromEventPattern<mouseeventargs>(this.OutputTextBox, "MouseMove");
          
            IObservable<point> points = observableOfArgs.Select(args => args.EventArgs.Location);
         var multiplesOf10= points.Where(point => point.X % 10 == 0 && point.Y % 10 == 0);
            IDisposable subscription =
                multiplesOf10.ObserveOn(new ControlScheduler(this))
                    .Subscribe(point => this.OutputTextBox.AppendText("\r\n"+point.ToString()));
            this.disposables.Add(subscription);
        }</point>

Testing The Observer Pattern

Rx has numerous methods that are useful for testing purposes. The output can be time stamped, delayed, throttled and sampled. A useful method for constructing an Event proxy is the Observable.Interval(TimeSpan.span). This produces a stream of Longs at interval span. The output is 0,1,2,3....It can be used like this.

  public static void PointsExample()
        {
            var points = new List<Point> { new Point(1, 2), new Point(3, 3), new Point(5, 6), new Point(7, 7) };

            IObservable<point> onePointPerSecondsObservable =
                Observable.Interval(TimeSpan.FromSeconds(1))
                    .Take(points.Count)
                    .Where(i => points[(int)i].X == points[(int)i].Y)
                    .Select(i => points[(int)i]);
            onePointPerSecondsObservable.Subscribe(
                point => Console.WriteLine(point.ToString()),
                () => Console.WriteLine("Completed"));
            Console.ReadKey();
        }
     //An alternative approach
    public static void PointsExampleA()
     {
       var points = new List<Point> { new Point(1, 2), new Point(3, 3), new Point(5, 6), new Point(7, 7) };
       IObservable<point> onePointPerSecondsObservable =
         Observable.Generate(0, i => i < points.Count, i => i + 1,  //this is the for loop
         i => points[i], i => TimeSpan.FromSeconds(1))//select points[i] after 1 second delay
         .Where(point => point.X == point.Y);
       onePointPerSecondsObservable.Subscribe(
         point => Console.WriteLine(point.ToString()),
          () => Console.WriteLine("Completed"));
          Console.ReadKey();
        }
        

        

The output is.

MoveMove Test

A Couple of Gotchas to watch out for

Rx has a couple of Elephant traps that are easy to fall into

It's cold when it needs to be hot.

Implementing the observer pattern like this doesn't work.

  private static void DemoColdA()
        {
            var oneNumberPerSecondObservable = Observable.Interval(TimeSpan.FromSeconds(1));
            var firstSubscriber = oneNumberPerSecondObservable.Subscribe(x=>Console.WriteLine("First observed "+x));
            //wait before subscribing the second observer
            Thread.Sleep(TimeSpan.FromSeconds(2));
            var secondSubscriber = oneNumberPerSecondObservable.Subscribe(x=>Console.WriteLine("Second observed "+x));
            Console.WriteLine("Press any key to stop the demo");
            Console.ReadKey();
            firstSubscriber.Dispose();
            secondSubscriber.Dispose();
            Console.WriteLine("Press any key to end the demo");
            Console.ReadKey();
        }

Cold Observable

It's clear that both observers are not observing the same values at the same time. The reason for this is that the observable is 'Cold'. Cold observables restart the data stream for each Observer and each gets its own stream. The solution is to convert a 'Cold' Observable to a 'Hot' Observable by using the Publish extension method.

 var oneNumberPerSecondObservableHot = oneNumberPerSecondObservableCold.Publish();

Observing on the wrong thread

Windows Forms, WPF and Silverlight all have thread defined operations. That is, operations that must be carried out on a specific thread. You will get an error if you try to do the following in Windows Forms

var subscription = obsLongs.Subscribe(x => form.Text = "Event generated " + x);

The form will throw a 'cross thread operation not valid' exception as it is being accessed from the Observer's thread. This problem used to apply to WPF and Silverlight projects but it seems to have been resolved in the latest version of Rx. However, there may still be occasions when it is desirable to define the thread that the Observers run on. This can be done by passing a scheduler into the ObserveOn extension method. Here is a solution for Windows Forms.

//The Control Scheduler is in System.Reactive.Windows.Forms.dll 
//The Nuget package id is Rx-WinForms
//It takes a Control as an argument
var subscription = obsLongs.ObserveOn(new ControlScheduler(form)).Subscribe(x => form.Text = "Event generated " + x);

With WPF and Silverlight you can use the static class, Scheduler

  //schedule on the current thread
 observable.ObserveOn(Scheduler.CurrentThread).Subscribe(x => label1.Content = x.X.ToString());
 //schedule on the plarform's default thread
  observable.ObserveOn(Scheduler.Default).Subscribe(x => label1.Content = x.X.ToString());
 //schedule to execute immediately on the current thread
 observable.ObserveOn(Scheduler.Immediate).Subscribe(x => label1.Content = x.X.ToString());
       

The Observer Pattern with Buffering and Linq

The real power of Rx lies in its ability to filter data in order to control both what the observers see and when they see it. In the following example, the data stream represents the water temperature output from a temperature controller. The observers are notified if two consecutive readings are above 40C. The data stream is split into segments using buffering. Linq is then used to process the segments before the data is passed to the observers.

  private static void DemoBuffered()
        {
            //Set up an observable of random numbers between 1 and 100 at a rate of 1 per second
            IObservable<long> oneNumberPerSecondObservable = Observable.Interval(TimeSpan.FromSeconds(1));
            var random = new Random();
            IConnectableObservable<long> randomNumberObservableHot =
                oneNumberPerSecondObservable.Select(n => (long)random.Next(1, 100)).Publish();

            //Subscribe an observer to output the number generated to the Console
            IDisposable randomNumberSubscription =
                randomNumberObservableHot.Subscribe(x => Console.WriteLine("Generated " + x));

            //Set up a rolling buffer of size 2 that advances the stream index by 1 each time
            //So that a stream of 0,1,2,3 would result in buffers of {0,1} {1,2} etc
            var buffered = from firstLast in randomNumberObservableHot.Buffer(2, 1)
                //filter the buffers by using Linq
                where firstLast[0] > 40 && firstLast[1] > 40
                select new { First = firstLast[0], Last = firstLast[1] };

            //Introduce a time out period of 8 seconds  
            //If nothing is observed for 8 seconds an exception will be thrown
            var observableWithTimeout = buffered.Timeout(TimeSpan.FromSeconds(8));

            //Subscribe two observers
            IDisposable firstSubscription =
                observableWithTimeout.Subscribe(
                    anon => Console.WriteLine("First observed {0} and {1} ", anon.First, anon.Last),
                    ex => Console.WriteLine("First observed an Exception. " + ex.Message));
            IDisposable secondSubscription =
                observableWithTimeout.Subscribe(
                    anon => Console.WriteLine("Second observed {0} and {1} ", anon.First, anon.Last),
                    ex => Console.WriteLine("Second observed an Exception. " + ex.Message));

            //start the ball rolling
            randomNumberObservableHot.Connect();
            Console.WriteLine("Press Enter to unsubscribe the first observer");
            Console.ReadLine();

            //Unsubscribe an observer
            firstSubscription.Dispose();
            Console.WriteLine("***first subscriber has unsubscribed ***");
            Console.WriteLine("Press Enter to end");
            Console.ReadLine();
            secondSubscription.Dispose();
            randomNumberSubscription.Dispose();
            Console.ReadKey();
        }

Buffered output

Conclusion

This brief article has only just scratched the surface of what can be achieved with Rx. If you wish to dig deeper, there are some excellent videos featuring two enthusiastic Rx developers, Wes Dyer and Bart De Smet.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

George Swan
Student
Wales Wales
No Biography provided

Comments and Discussions

 
Questionthat‘s great Pinmemberjoyhen12312317-Oct-14 0:03 
GeneralMy vote of 5 PinmemberHumayun Kabir Mamun2-Jun-14 22:44 
GeneralRe: My vote of 5 PinmemberGeorge Swan3-Jun-14 0:11 
Question[My vote of 2] Mentioned already in the article PinprofessionalRahul Dhammy2-Jun-14 21:43 
AnswerRe: [My vote of 2] Mentioned already in the article PinmemberGeorge Swan3-Jun-14 0:09 
GeneralRe: [My vote of 2] Mentioned already in the article PinprofessionalRahul Dhammy3-Jun-14 0:12 
QuestionRX is complicated PinmemberFatCatProgrammer30-May-14 7:42 
AnswerRe: RX is complicated PinmemberGeorge Swan30-May-14 8:58 
GeneralRe: RX is complicated PinmemberFatCatProgrammer4-Jun-14 5:13 
QuestionComing up as "old version of currently published article" PinprotectorCHill6030-May-14 3:21 
AnswerRe: Coming up as "old version of currently published article" PinmemberGeorge Swan30-May-14 4:21 
QuestionGood work! PinprofessionalVolynsky Alex30-May-14 0:49 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web03 | 2.8.1411023.1 | Last Updated 2 Jun 2014
Article Copyright 2014 by George Swan
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid