Click here to Skip to main content
11,415,711 members (87,528 online)
Click here to Skip to main content

EventRouter

, 5 Oct 2010 CPOL
Rate this:
Please Sign up or sign in to vote.
An EventRouter implementation with cross process event distribution.

Introduction

A couple of weeks ago, I attended a seminar on PRISM at Microsoft's office. Although I have not been working with GUI for more than two years now, I am always enthusiastic to hear about interesting architectural principles, and PRISM is indeed an interesting one. One of the key concepts of the PRISM architecture is the EventAggregator component whose responsibility is to enable different application modules to communicate without knowing each other in a publish/subscribe paradigm. It's not entirely a new concept as the earlier CAB (SCSF) also had a similar implementation, and there are also some other Open Source implementations of the EventAggregator pattern like EventBroker and MemBus.

So, you must be asking yourself, why do we need another implementation. Well, the answer lies in the little details of the implementation and on one important feature - the cross process event distribution.

The project I am involved in is a large command and control system which has a rich GUI smart client system running on four screens and divided into three processes. Therefore, in order to fully utilize the EventAggregator principle, I needed some kind of IPC (Inter Process Communication) enabled library for sending events between GUI instances on a single machine.

The other thing is that some of these existing implementations oblige you to inherit from some base class when you define your events, while the other is limited only to EventHandler<TEventArgs> event handlers or use an Observer pattern by implementing generic interfaces on subscriber objects.

What I wanted is a simple POCO event definition which I can easily define, serialize when needed for the cross process communication, and publish directly on the EventAgrregator object.

So the requirements are as follows:

  1. Subscription may be strong or weak (using the WeakRefenence class).
  2. Events are POCO objects.
  3. Event handlers may be static, public, and nonpublic.
  4. All event handlers limited to one and single parameter.
  5. Event handlers may be invoked on the publisher thread, worker thread, or UI thread (a/sync).
  6. IPC mechanism must be one to many without any configuration.
  7. Event handlers may be invoked with the derived types of the events (i.e., polymorphic invocation).
  8. Subscribers may use attributes to mark event handlers, or subscribe directly to some delegate.

Implementation

With all these in mind, I produced the following interface:

/// <span class="code-SummaryComment"><summary>
</span>/// Interface for the EventRouter implementation.
/// <span class="code-SummaryComment"></summary>
</span>public interface IEventRouter
{
  /// <span class="code-SummaryComment"><summary>
</span>  /// Adds subscriber's event handlers anotated with
  /// the EventConsumer attribute to the invokation list.
  /// <span class="code-SummaryComment"></summary>
</span>  /// <span class="code-SummaryComment"><param name="subscriber"></param>
</span>  /// <span class="code-SummaryComment"><exception cref="InvalidOperationException" />
</span>  void Register(object subscriber);

  /// <span class="code-SummaryComment"><summary>
</span>  /// Adds single event handler with EventConsumer attribute or with default options.
  /// <span class="code-SummaryComment"></summary>
</span>  /// <span class="code-SummaryComment"><typeparam name="T"></typeparam>
</span>  /// <span class="code-SummaryComment"><param name="action"></param>
</span>  void Register<T>(Action<T> action);

  /// <span class="code-SummaryComment"><summary>
</span>  /// Adds single event handler with the specified configuration options.
  /// <span class="code-SummaryComment"></summary>
</span>  /// <span class="code-SummaryComment"><typeparam name="T"></typeparam>
</span>  /// <span class="code-SummaryComment"><param name="action"></param>
</span>  /// <span class="code-SummaryComment"><param name="useStrongReference"></param>
</span>  /// <span class="code-SummaryComment"><param name="to"></param>
</span>  /// <span class="code-SummaryComment"><param name="id"></param>
</span>  /// <span class="code-SummaryComment"><exception cref="InvalidOperationException" />
</span>  void Register<T>(Action<T> action, bool useStrongReference, 
                   ThreadingOption tOption, string id);

  /// <span class="code-SummaryComment"><summary>
</span>  /// Sends event notification to all EventRouter instances on current machine.
  /// <span class="code-SummaryComment"></summary>
</span>  /// <span class="code-SummaryComment"><param name="notification"></param>
</span>  void Publish(object notification);

  /// <span class="code-SummaryComment"><summary>
</span>  /// Sends event notification on the current EventRouter
  /// <span class="code-SummaryComment"></summary>
</span>  /// <span class="code-SummaryComment"><param name="notification"></param>
</span>  void PublishLocal(object notification);

  /// <span class="code-SummaryComment"><summary>
</span>  /// Sends event notification to specific subscriber or group
  /// of subscribers on the current instance of EventRouter
  /// <span class="code-SummaryComment"></summary>
</span>  /// <span class="code-SummaryComment"><param name="notification"></param>
</span>  /// <span class="code-SummaryComment"><param name="subscriptionId"></param>
</span>  void SendTo(object notification, string subscriptionId);

  /// <span class="code-SummaryComment"><summary>
</span>  /// Removes all event handlers associated with current subscriber.
  /// <span class="code-SummaryComment"></summary>
</span>  /// <span class="code-SummaryComment"><param name="subscriber"></param>
</span>  void Unregister(object subscriber);

  /// <span class="code-SummaryComment"><summary>
</span>  /// Removes event handler with specified id.
  /// <span class="code-SummaryComment"></summary>
</span>   /// <span class="code-SummaryComment"><param name="subscriptionId"></param>
</span>  void Unregister(string subscriptionId);

  /// <span class="code-SummaryComment"><summary>
</span>  /// Gets or sets the value indicating whether
  /// to trigger event handlers with derived event types.
  /// <span class="code-SummaryComment"></summary>
</span>  bool PolymorphicInvokationEnabled { get; set; }

  /// <span class="code-SummaryComment"><summary>
</span>  /// Logging used for exceptions thrown by event handlers
  /// <span class="code-SummaryComment"></summary>
</span>  ILogger Logger { get; set; }
}

The EventRouter implementation should be singleton as you should use only one instance throughout your process space; however, I didn't implement it as a singleton because most of the EventRouter usage will be done using some IoC container (I am using Castle Windsor), and there it can be easily configured to singleton lifetime:

IWindsorContainer container = new WindsorContainer();

container.Register(Component.For<ILogger>().ImplementedBy<NLogger>());

container.Register(Component.For<IPCoordinator>().ImplementedBy<WindowsManager>());

container.Register(Component.For<IUISyncronizable>().
          ImplementedBy<WPFUISynchronizer>());

container.Register(Component.For<IEventRouter>().
          ImplementedBy<EventRouter>().ActAs(LifestyleType.Singleton));

IEventRouter eventRouter = container.Resolve<IEventRouter>();

IPC

The Windows platform offers a wide range of IPC technologies and APIs. Having in mind the requirement of one to many distribution and no configuration, I decided to go with the Windows Messages API, in particular, WM_COPYDATA and SendMessage. While browsing on the web, I found this great project called XDMessaging which does almost exactly what I needed: it uses the SendMessageTimeout API to send the WM_COPYDATA message, and uses EnumWindows to find all the top-level windows registered to receive this message. The only mismatch with this implementation is that it uses binary serialized strings to pass data across processes.

So I copy/pasted the core of this library functionality, and replaced its transport medium to binary serialized objects. The implementation itself is pretty simple: each instance creates a top-level zero size window, and uses its window procedure to find WM_COPYDATA messages.

public class WindowsManager : NativeWindow, IPCoordinator
{
  const string ChannelName = "MainChannel";
  List<IntPtr> m_windowHandles = new List<IntPtr>();
  object m_lock = new object();

  public WindowsManager()
  {
     CreateParams p = new CreateParams();
     p.Width = 0;
     p.Height = 0;
     p.X = 0;
     p.Y = 0;
     p.Caption = Guid.NewGuid().ToString();
     p.Parent = IntPtr.Zero;
     base.CreateHandle(p);

     Native.SetProp(base.Handle, ChannelName, (int)base.Handle);
  }

  protected override void WndProc(ref Message m)
  {
     base.WndProc(ref m);

     if (m.Msg == Native.WM_COPYDATA)
     {
        using (DataGram dataGram = DataGram.FromPointer(m.LParam))
        {
           if (NotifyEvent != null)
           {
              NotifyEvent(dataGram.Data);
           }
        }
     }
  }

  public void BroadcastMessage(object data)
  {
     IntPtr outPtr = IntPtr.Zero;

     lock (m_lock)
     {
        IEnumerable<IntPtr> windows = GetWindowHandles();

        using (DataGram dg = new DataGram(data))
        {
           Native.COPYDATASTRUCT dataStruct = dg.ToStruct();
           foreach (var handle in windows)
           {
              Native.SendMessageTimeout(handle, Native.WM_COPYDATA, 
                 IntPtr.Zero, ref dataStruct, 
                 Native.SendMessageTimeoutFlags.SMTO_ABORTIFHUNG, 
                 1000, out outPtr);
           }
        } 
     }
  }

  public IEnumerable<IntPtr> GetWindowHandles()
  {
     m_windowHandles.Clear();
     Native.EnumWindows(OnWindowEnum, IntPtr.Zero);
     return m_windowHandles;
  }
  
  private int OnWindowEnum(IntPtr hWnd, IntPtr lParam)
  {
     if (hWnd != base.Handle && Native.GetProp(hWnd, ChannelName) != 0)
     {
        m_windowHandles.Add(hWnd);
     }

     return 1;
  }

  #region IPCoordinator Members

  public void Broadcast(object notifictaion)
  {
      ThreadPool.QueueUserWorkItem(state => BroadcastMessage(notifictaion));
  }

  public event NotificationEventHandler NotifyEvent;

  #endregion
}

The message broadcasting executed on a thread pool thread since the SendMessageTimeout API uses a timeout of 1 second for each window, so it may block (rarely) for N seconds where N is the number of EventRouter instances.

You can apply any other IPC API by implementing the IPCoordinator interface and passing it to the EventRouter constructor:

/// <span class="code-SummaryComment"><summary>
</span>/// Interface used for Inter Process Communication 
/// <span class="code-SummaryComment"></summary>
</span>public interface IPCoordinator
{
  /// <span class="code-SummaryComment"><summary>
</span>  /// Broadcasts event among other EventRouter instances
  /// <span class="code-SummaryComment"></summary>
</span>  /// <span class="code-SummaryComment"><param name="notifictaion"></param>
</span>  void Broadcast(object notifictaion);

  /// <span class="code-SummaryComment"><summary>
</span>  /// Fired when event arrives from another EventRouter instance
  /// <span class="code-SummaryComment"></summary>
</span>  event NotificationEventHandler NotifyEvent;
}

Thread synchronization

Since most event consumer handlers would be inside some UI resource, it is necessary to synchronize thread execution in order to avoid the nasty Cross-Thread exception. The current EventRouter implementation contains thread synchronization for both Windows Forms applications by using System.Threading.SynchronizationContext, and WPF applications by using System.Windows.Threading.Dispatcher.

I prefer using the Dispatcher object since it is created always and never returns null like the SynchronizationContext.Current method. The only limitations are that it can only be used with .NET 3.0 or higher and must be constructed on the UI thread.

You can implement you own UI sync by implementing the IUISyncronizable interface:

/// <span class="code-SummaryComment"><summary>
</span>/// Interface used for UI thread synchronization.
/// <span class="code-SummaryComment"></summary>
</span>public interface IUISyncronizable
{
  /// <span class="code-SummaryComment"><summary>
</span>  /// Invokes method on the UI thread syncronously.
  /// <span class="code-SummaryComment"></summary>
</span>  /// <span class="code-SummaryComment"><param name="method"></param>
</span>  /// <span class="code-SummaryComment"><param name="args"></param>
</span>  void SyncInvoke(Delegate method, params object[] args);

  /// <span class="code-SummaryComment"><summary>
</span>  /// Invokes method on the UI thread asyncronously.
  /// <span class="code-SummaryComment"></summary>
</span>  /// <span class="code-SummaryComment"><param name="method"></param>
</span>  /// <span class="code-SummaryComment"><param name="args"></param>
</span>  void AsyncInvoke(Delegate method, params object[] args);
}

Exception handling

Exceptions thrown by the event handlers do not stop the event propagation, and if the IEventRouter.Logger property is initialized, will be logged according to the application configuration.

Using the code

Subscribing for event notifications

You can subscribe to events using the EventConsumer attribute:

[EventConsumer(ThreadingOption = ThreadingOption.UIAsync)]
private void OnNewOrder(OrderAdded notif)
{
    // do something...
}

and then call the IEventRouter.Register(object subscriber) method.

If you want to explicitly subscribe to some notification, use one of the IEventRouter.Register(Action<T> action) overloads:

router.Register<Notification>(delegate(Notification n)
{
  // do something...
}
, true, ThreadingOption.DefaultThread, null);

Publishing events

router.Publish(new OrderAdded());

Sending events to specific subscriber/s

router.SendTo(new Notification(), "MySpecialOne");

Using polymorphic invocation

When the EnablePolymorphicInvokation property is enabled (its disabled by default), your event handlers will be invoked also on event derived types:

public class CustomerRemoved
{
  public int CustomerID;
}

public class VipCustomerRemoved : CustomerRemoved
{
    ...
}

[EventConsumer]
private void OnCustomerChange(CustomerRemoved notif)
{
    // do something...
}

In this case, the OnCustomerChange method will be called when both CustomerRemoved and VipCustomerRemoved events are published.

Removing subscriptions

By default, all subscriptions are implemented using weak references, so when the subscriber object goes out of scope, it will be reclaimed. Nevertheless, you can explicitly unregister the subscriber object either by its reference or by its subscription ID, if it is present. Please note that subscription ID is used like a single or group subscription rather than a unique key.

Some implementation details

The registration of a subscriber object based on reflecting on method calls and finding the EventConsumer attribute:

public void Register(object subscriber)
{
    Type sType = subscriber.GetType();
    MethodInfo[] methods = sType.GetMethods(BindingFlags.Public | 
       BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Static);

    foreach (var method in methods)
    {
        EventConsumer attr = 
          (EventConsumer)Attribute.GetCustomAttribute(method, typeof(EventConsumer));
        if (attr == null)
        {
            continue;
        }

        ParameterInfo[] pInfo = method.GetParameters();
        if (pInfo.Count() != 1)
        {
           throw new InvalidOperationException("Method must have a single parameter");
        }

        Type notifyType = pInfo[0].ParameterType;
        SubscriptionInfo info = new SubscriptionInfo();
        info.ID = attr.SubscriptionID;
        info.Method = method;
        info.Target = subscriber;
        info.ThreadOption = attr.ThreadingOption;
        info.UseStrongReference = attr.UseStrongReference;

        AddSubscription(notifyType, info);
    }
}

The subscriptions are stored in a MultiMap object which is actually a dictionary with an event type as a key and a list of subscriptions as a value. MultiMap is a small extension to a HashList class from the excellent NGenerics project.

All the actions performed on the MultiMap instance (subscriptions, invocations, and un-subscriptions) are inside a lock statement, so the EventRouter implementation is thread safe.  

Notes

When using IEventRouter.Register(Action<T> action) with an anonymous method, it may be reclaimed right after the first method invocation since there are no strong references to it. So to be on the safe side, either use strong reference for anonymous methods or don't use them at all Smile | :)

References

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Roman Ginzburg
Software Developer (Senior)
Israel Israel
No Biography provided

Comments and Discussions

 
GeneralMy vote of 5 Pin
bravo212-Oct-10 5:21
memberbravo212-Oct-10 5:21 
GeneralRe: My vote of 5 Pin
roman_gin13-Oct-10 3:40
memberroman_gin13-Oct-10 3:40 
GeneralMy vote of 5 Pin
vytheese11-Oct-10 20:05
membervytheese11-Oct-10 20:05 
GeneralRe: My vote of 5 Pin
roman_gin12-Oct-10 2:16
memberroman_gin12-Oct-10 2:16 
GeneralLooks good Pin
DaveyM6911-Oct-10 13:28
mvpDaveyM6911-Oct-10 13:28 
GeneralRe: Looks good Pin
roman_gin12-Oct-10 2:16
memberroman_gin12-Oct-10 2:16 
GeneralMy vote of 5 Pin
Or Chubook10-Oct-10 7:24
memberOr Chubook10-Oct-10 7:24 
GeneralRe: My vote of 5 Pin
roman_gin11-Oct-10 3:34
memberroman_gin11-Oct-10 3:34 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web01 | 2.8.150427.4 | Last Updated 5 Oct 2010
Article Copyright 2010 by Roman Ginzburg
Everything else Copyright © CodeProject, 1999-2015
Layout: fixed | fluid