Click here to Skip to main content
15,880,608 members
Articles / Programming Languages / C#

Event Aggregator with Specialized Listeners

Rate me:
Please Sign up or sign in to vote.
4.94/5 (13 votes)
17 Jan 2010CPOL6 min read 49.6K   971   39   9
This article attempts to show a variation of the Event Aggregator design pattern with specialized listeners.
event_aggregator.jpg

Introduction

The definition says that an Event Aggregator "Channels events from multiple objects into a single object to simplify registration for clients." Martin Fowler

In other words, the Event Aggregator pattern functions like a publish/subscribe hub, which takes care of subscriptions and event distribution in a loosely-coupled way.

This article attempts to show the Event Aggregator design pattern, and how I have used it in some of my personal projects.

The code I'm about to present here was very influenced by the Event Aggregator from StoryTeller, and a lot of code was actually copy/pasted borrowed from it.

Event Aggregator

My implementation of the Event Aggregator looks like this:

C#
public interface IEventAggregator
{
    void SendMessage<T>(T message) where T : IEvent;
    void SendMessage<T>() where T : IEvent, new();
    void AddListener(IListenTo listener);
    void RemoveListener(IListenTo listener);
}

It basically has methods to handle the subscription and removal of listeners, and methods to send messages.

The subscriptions are in fact handled by the IoC container (in this case I'm using StructureMap), and I'll explain how it's done a bit later.

Publishing Messages

In order to publish messages, classes will only need a reference to the Event Aggregator (IEventAggregator in this case). Notice that as the Event Aggregator is the one responsible for managing the subscriptions for the listeners, it is important that the same Event Aggregator object that holds the subscription information is the one who receives the messages. You certainly could play with different objects and scopes in here, but in my case I'm using EA as a singleton, and each object that needs a reference to that gets it by construtor injection using the IoC container to wire it up.

C#
public class Publisher
{
    private IEventAggregator _eventAggregator;
		
    public Publisher(IEventAggregator eventAggregator)
    {
        _eventAggregator = eventAggregator;
    }
    //...
}

Once an object holds a reference to the EA singleton, sending a message only needs a call to the method IEventAggregator.SendMessage():

C#
_eventAggregator.SendMessage(new SomeEvent { SomeProperty = "some value"} );

Or in the case your event is just a marker, meaning it contains no information, you could simply do this:

C#
_eventAggregator.SendMessage<SomeEvent>();

When called, the EA will figure who is interested in that message out, and call the appropriate methods on each object.

C#
public void SendMessage<T>(T message) where T : IEvent
{
	SendAction(() => All().CallOnEach<IListenTo<T>>(x => x.Handle(message)));
}

public void SendMessage<T>() where T : IEvent, new()
{
	SendMessage(new T());
}

The extension method CallOnEach<T>() will be responsible for filtering among all listeners, those that subscribed to the message type sent. (As I mentioned above, this article in heavily influenced by the StoryTeller implementation, and this extension method is better explained on here).

The method All() is simply a helper that wraps the subscription list locking it for thread-safety.

C#
private IEnumerable<IListenTo> All()
{
    lock (_locker)
    {
        return _listeners;
    }
}

The SendAction() method is also a wrapper, for calling the listeners within the synchonization context in hand (again I'll point you to the StoryTeller implementation where the thread marshalling is explained).

Listening to an Event

Objects interested in receiveng events, must implement the interface IListenTo<t>, being T the type of event they're interested on, and the method Handle(T message) will be called by the EA at the occurence of an event.

C#
public interface IListenTo
{
}

public interface IListenTo<T> : IListenTo where T : IEvent
{
    void Handle(T message);    
}

public class ConcreteListener : IListenTo<SomeEvent>
{
    void Handle(SomeEvent message)
    {
        //...
    }		
}

Once this interface is implemented, the only thing left to do is to subscribe on the EA for the event type of interest. As mentioned earlier, this is done by the IoC container, but could alternatively be done in the constructor of each object, by calling AddListener() on the Event Aggregator singleton:

C#
EventAggregator.AddListener(this);

Listeners Registration

Ok, now that I said that I handle the subscriptions via IoC container, let's see how it's done (for this I'm using StructureMap as my IoC container).

As the only thing I need to setup registration for the listeners is a call to AddListener() on the time of the listener's creation, and as I also create my objects via IoC container, it's convenient to tell the IoC to register them automatically for me. This was done creating a TypeInterceptor telling StructureMap, by convention, to call AddListener() on every object created that implements any IListenTo<> interface:

C#
public class EventAggregatorListenerInterceptor : TypeInterceptor
{
    public object Process(object target, IContext context)
    {
        context.GetInstance<IEventAggregator>().AddListener((IListenTo)target);
        return target;
    }

    public bool MatchesType(Type type)
    {
        return type.ImplementsInterfaceTemplate(typeof(IListenTo<>));
    }
}

And that's it. Now every listener will be automatically wired up to the Event Aggregator when created by the IoC container. Well, not really, we still need to configure the container to use our new interceptor, and to use EA as singleton, so let's do that:

C#
public class EventAggregatorRegistry : Registry
{
    public EventAggregatorRegistry()
    {
        For<IEventAggregator>().Singleton().Use<EventAggregator>();

        RegisterInterceptor(new EventAggregatorListenerInterceptor());
    }
}

This Registry class above, is responsible for configuring StructureMap via its internal DSL, and we will call it on the entry point of the application to actually configure the container.

Testing the Notifications

Now let's test our EA for message distribuition. This is a simple test asking if the EA could sucessfully deliver the event to all subscribers. Note that in this test I'm explicitly adding the listeners to the EA, once I do not create them via StructureMap, but from Rhino.Mocks MockRepository class.

C#
[Test]
public void all_listeners_must_be_notified_of_published_messages()
{
    ObjectFactory.Initialize(x => x.AddRegistry<EventAggregatorRegistry>());
    var _eventAggregator = ObjectFactory.GetInstance<IEventAggregator>();
		
    var listener1 = MockRepository.GenerateMock<IListenTo<SomeEvent>>();
    var listener2 = MockRepository.GenerateMock<IListenTo<SomeEvent>>();

    var someEvent = new SomeEvent();
    listener1.Expect(x => x.Handle(someEvent)).Repeat.Once();
    listener2.Expect(x => x.Handle(someEvent)).Repeat.Once();

    _eventAggregator.AddListener(listener1);
    _eventAggregator.AddListener(listener2);
    _eventAggregator.SendMessage(someEvent);

    listener1.VerifyAllExpectations();
    listener2.VerifyAllExpectations();
}

Ok, Now Let's Add to It!

Until now, we didn't see anything but a simplified version of the Event Aggregator used by StoryTeller. Now I needed to create some different kinds of listeners:

  • Default Listener: the listener we already had, listens to all messages of a specific type;
  • Filtered Listener: can define a filter to receive messages. Implements a function that takes a message of a specific type, and returns a bool indicating if the message should be processed;
  • Directed Message Listeners: listen to messages sent directly to them. The message sent to this listener must implement another interface, which has a Receiver property. That way, listeners will be notified only of messages sent to them.

The Filtered Listener I think is pretty obvious regarding its need, but maybe the Directed Message Listeners may be more obscure. Here is an example of where I needed it (you can skip that if you want):

I'm creating a network-on-chip simulator where various components collaborate with each other direclty. This communication could be done directly, once the nature of the communication is in fact direct between components, but doing it by the Event Aggregator that way, I could create some metrics evaluators that listen to all the messages, while the actual components only receive their direct messages.

Ok. Let's see how to do this. In order to still mantain a similar syntax, I preserve the IListenTo<T>, but transform that into a class, creating the interfaces to define what kind of listener I wanted, inside that class. It looks like this:

C#
public interface IListenTo
{
}
	
public class IListenTo<t> where T : IEvent
{
    public interface All : IListenTo
    {
        void Handle(T message);
    }

    public interface SentToMe : IListenTo 
    {
        void Handle(T message);
    }

    public interface ThatSatisfy : IListenTo
    {
        void Handle(T message);
        bool SatisfiedBy(T message);
    }
}

This way, to implement the IListenTo interfaces, it looks exactly like before, but then I type "." (dot) and intellisense shows me:

intellissense.jpg

That will do it for the implementation of the listeners, now we only need to refactor our Event Aggregator to properly distribute our messages. In order to do that, the only change has to be made on the SendMessage() method, it will look like this:

C#
public void SendMessage<T>(T message) where T : IEvent
{
    SendAction(() => All().CallOnEach<IListenTo<T>.All>(x =>
    {
        x.Handle(message);
    }));

    SendAction(() => All().CallOnEach<IListenTo<T>.ThatSatisfy>(x =>
    {
        if (x.SatisfiedBy(message))  
            x.Handle(message);
    }));

    var directedMessage = message as IDirectedEvent;
    if (directedMessage != null)
    {
        SendAction(() => All().FirstOrDefault(x => directedMessage.Receiver.Equals(x))
            .CallOn<IListenTo<T>.SentToMe>(x => x.Handle(message)));
    }
}

Note that to send direct messages, now there's another event interface IDirectedEvent:

C#
public interface IDirectedEvent : IEvent
{
    object Receiver { get; set; }
}

And then, our listeners look like this:

C#
public class DefaultListener : IListenTo<SomeEvent>.All
{
    public void Handle(SomeEvent message)
    {
    }
}

public class DirectedListener : IListenTo<SomeEvent>.SentToMe
{
    public void Handle(SomeEvent message)
    {
    }
}

public class FilteredListener : IListenTo<SomeEvent>.ThatSatisfy
{
    public void Handle(SomeEvent message)
    {
    }

    public bool SatisfiedBy(SomeEvent message)
    {
    }
}

Testing the Notifications

And now let's see how our tests look like:

C#
[Test]
public void 
    directed_message_listeners_must_be_notified_only_when_message_were_sent_to_them()
{
    var listener1 = MockRepository.GenerateMock<IListenTo<SomeDirectedEvent>.SentToMe>();
    var listener2 = MockRepository.GenerateMock<IListenTo<SomeDirectedEvent>.SentToMe>();
    var listener3 = MockRepository.GenerateMock<IListenTo<SomeDirectedEvent>.All>();

    var someEvent = new SomeDirectedEvent { Receiver = listener1 };
		
    listener1.Expect(x => x.Handle(someEvent)).Repeat.Once();
    listener2.Expect(x => x.Handle(someEvent)).Repeat.Never();
    listener3.Expect(x => x.Handle(someEvent)).Repeat.Once();

    _eventAggregator.AddListener(listener1);
    _eventAggregator.AddListener(listener2);
    _eventAggregator.AddListener(listener3);

    _eventAggregator.SendMessage(someEvent);

    listener1.VerifyAllExpectations();
    listener2.VerifyAllExpectations();
    listener3.VerifyAllExpectations();
}
	
[Test]
public void 
	filtered_listeners_must_be_notified_only_when_condition_were_satisfied()
{
    var listener1 = MockRepository.GenerateMock<
    	IListenTo<SomeEvent>.ThatSatisfy>();
    var listener2 = MockRepository.GenerateMock<
    	IListenTo<SomeEvent>.ThatSatisfy>();
    var listener3 = MockRepository.GenerateMock<
    	IListenTo<SomeEvent>.All>();

    var someEvent = new SomeEvent();

    listener1.Expect(x => x.Handle(someEvent)).Repeat.Once();
    listener1.Expect(x => x.SatisfiedBy(someEvent)).Return(true);

    listener2.Expect(x => x.Handle(someEvent)).Repeat.Never();
    listener2.Expect(x => x.SatisfiedBy(someEvent)).Return(false);

    listener3.Expect(x => x.Handle(someEvent)).Repeat.Once();

    _eventAggregator.AddListener(listener1);
    _eventAggregator.AddListener(listener2);
    _eventAggregator.AddListener(listener3);

    _eventAggregator.SendMessage(someEvent);

    listener1.VerifyAllExpectations();
    listener2.VerifyAllExpectations();
    listener3.VerifyAllExpectations();
}

Conclusions

Well, that's about it. We cover the creation of a basic Event Aggregator, and extend it to create filtered listeners, and to be able to send messages to directed listeners. I hope you've enjoyed reading. Please leave a note and/or a comment, I would be very glad to hear it. Thanks.

History

  • 01/17/2010: First version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer ThoughtWorks
Brazil Brazil
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
QuestionHaving a hard time with form using AddListener Pin
Leach723825-May-21 10:54
Leach723825-May-21 10:54 
QuestionThread issue Pin
AlexZieg718-May-12 21:55
AlexZieg718-May-12 21:55 
Hi,

thanks for this approach! But I think there is a thread issue in the Aggregator:
private IEnumerable<IListenTo> All()
{
    lock (_locker)
    {
        return _listeners;
    }
}

will lock only the return. It cannot lock the operation you do outside the lock; the foreach loop is not thread safe! I guess we would have to lock the complete loop which will maybe slow down everything on extensive use. I am currently thinking of creating a lookup table for the IListenTo<T> to optimize access to specific types. It will be more complicated but solve the threading problem as well as the performance issue. What do ou think?
QuestionEA Request Analyser Pin
Comfizzy13-Mar-12 3:34
Comfizzy13-Mar-12 3:34 
GeneralNewbie issue Pin
Khaniya28-Jan-11 19:55
professionalKhaniya28-Jan-11 19:55 
GeneralJust a small remark Pin
philippe dykmans22-Nov-10 6:43
philippe dykmans22-Nov-10 6:43 
GeneralUsing IoC register Pin
dreamgarden10-Mar-10 1:59
dreamgarden10-Mar-10 1:59 
GeneralRe: Using IoC register Pin
Caio Kinzel Filho18-Mar-10 16:06
Caio Kinzel Filho18-Mar-10 16:06 
GeneralOpen Generics Registration Pin
cbone22-Jan-10 4:42
cbone22-Jan-10 4:42 
GeneralRe: Open Generics Registration [modified] Pin
Caio Kinzel Filho22-Jan-10 10:23
Caio Kinzel Filho22-Jan-10 10:23 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.