IObserver and IObservable - A New addition to BCL






4.90/5 (3 votes)
Here in the post, I am going to discuss about IObserver and IObservable interfaces and its connection to Push based approach on Reactive Framework.
Introduction
With the introduction of new interfaces, it is time to get it on with discussion. With the current release of VS 2010, there were two interfaces that were introduced viz, IObservable
and IObserver
. Here in the post, I am going to discuss about these interfaces and its connection to Push based approach on Reactive Framework.
IObserver and IObservable as a Dual to Enumerables
First, it should be noted, IObserver
and IObservable
are actually the mathematical dual of IEnumerable
and IEnumerator
. Based on iterator pattern, IEnumerable
is actually a repository of elements that made up the objects. The IEnumerable
holds all the objects and it uses IEnumerator
to get each individual objects from the repository. The few methods in IEnumerator
which the IEnumerable
uses are MoveNext
and Current
. So for each iteration, the Enumerator
calls MoveNext
and assigns it to Current
which is later on sent back to the external environment.
So if you consider the interface IEnumerable
and IEnumerator
, it looks like:
public interface IEnumerator<out T> : IDisposable
{
T Current { get; }
bool MoveNext();
void Reset();
}
public interface IEnumerable<out T> : IEnumerable
{
IEnumerator<T> GetEnumerator();
}
So the IEnumerator
has MoveNext
which is called every time when we need to yield next element from the store. The MoveNext
sets the Current
item and sends it back to the Environment
. So IEnumerable
might be considered as a Pull based approach and it is used for sequential retrieval of objects.
IObservable
and IObserver
introduced to BCL recently as stated are the mathematical dual of IEnumerable
and IEnumerator
. Let's see the interfaces a bit:
public interface IObserver<in T>
{
void OnCompleted();
void OnError(Exception error);
void OnNext(T value);
}
and for IObservable
, it is:
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
Hence, if you see the difference between the two Interfaces, IEnumerator
has Current
and MoveNext
. These methods are used to Pull objects from the repository. IObserver
has OnNext
which is used to Push objects to the repository. Again, if you look into IEnumerable
, it uses GetEnumerator
to pull back the object of IEnumerable
, while IObservable
has a Subscribe
method which is used to push an Observer
to the Observable
. Hence you can easily say, Observable
interfaces in BCL are a dual to Enumerables
where the former uses Push based approach and the later uses pull based approach.
Where Are They Useful?
If you recollect the Observer design pattern, you should know it already. Observer pattern actually holds a list of dependent objects known as Observer and notifies when certain state of the Observer changes. We have already got an idea of ObservableCollection
which notifies the change to the collection to the external environment. IObserver
and IObservable
gives you a chance to enhance this flexibility more. Let's see a few lines of code:
public class CustomObserver
{
private ObservableCollection<int> myrepository;
public ObservableCollection<int> MyRepository
{
get
{
this.myrepository = this.myrepository ?? new ObservableCollection<int>();
return this.myrepository;
}
}
public void LoadRepository(int item)
{
this.MyRepository.Add(item);
}
private List<int> filteredcollection;
public List<int> FilteredCollection
{
get
{
this.filteredcollection = this.filteredcollection ?? new List<int>();
return filteredcollection;
}
}
public IDisposable GetObserved()
{
IObservable<IEvent<NotifyCollectionChangedEventArgs>> numberObserver =
Observable.FromEvent<NotifyCollectionChangedEventArgs>
(this.MyRepository, "CollectionChanged");
Action<IEvent<NotifyCollectionChangedEventArgs>> subscriptionAction = item =>
{
switch (item.EventArgs.Action)
{
case NotifyCollectionChangedAction.Add:
var newValues = item.EventArgs.NewItems.Cast<int>().Where
(n => n % 2 == 0);
this.FilteredCollection.AddRange(newValues);
break;
case NotifyCollectionChangedAction.Remove:
foreach (int n in item.EventArgs.OldItems)
this.FilteredCollection.Remove(n);
break;
case NotifyCollectionChangedAction.Replace:
foreach (int n in item.EventArgs.OldItems)
this.FilteredCollection.Remove(n);
goto case NotifyCollectionChangedAction.Add;
}
};
return numberObserver.Subscribe(subscriptionAction);
}
}
So in this class, I have implemented a simple collection of objects and registered the PropertyChanged
event of ObservableCollection
. Hence the Action
method SubscriptionAction
will be called automatically and reevaluate the list FilteredCollection
whenever the Observer MyRepository
gets an object.
So to demonstrate, let's look at the Main
method:
static void Main(string[] args)
{
CustomObserver myobj = new CustomObserver();
IDisposable unregisterobj = myobj.GetObserved();
bool DoContinue = false;
do
{
try
{
Console.Write("Enter a Value:");
int item = Convert.ToInt32(Console.ReadLine());
myobj.LoadRepository(item);
Console.WriteLine("Filtered List counter : {0}",
myobj.FilteredCollection.Count);
Console.WriteLine("Do you want to continue?[1/0]");
DoContinue = Convert.ToInt32(Console.ReadLine()) == 1;
}
catch { continue; }
}
while (DoContinue);
Console.WriteLine("Disposing Registration...");
unregisterobj.Dispose();
Console.ReadKey();
}
Being so straightforward, we create an object of out class CustomObserver
and called the method GetObserved
which in turn calls Subscribe
method from the Observer
and returns the Disposable
object. You must remember the Subscribe
method returns a IDisposable
which will be used to unsubscribe the method by just calling the Dispose
method of it.
Just as you might expect, if you run the application, the application allows you to enter numeric values each of which is observed and added to the FilteredCollection
. Hence when you enter Even values, the FilteredCollection
gets updated.
The FromEvent
is a method from Reactive framework that generates an Observable
from an event. In the call, the Observer
will observe the collection for the event CollectionChanged
. We will discuss more about the Reactive framework later.
I hope you like my demonstration. Feel free to write your comments and feedback. Thank you for reading.