Click here to Skip to main content
15,879,184 members
Articles / Programming Languages / C#
Article

EventRouter

Rate me:
Please Sign up or sign in to vote.
4.76/5 (13 votes)
5 Oct 2010CPOL5 min read 32.4K   415   35   9
An EventRouter implementation with cross process event distribution.

Introduction

A couple of weeks ago, I attended a seminar on PRISM at Microsoft's office. Although I have not been working with GUI for more than two years now, I am always enthusiastic to hear about interesting architectural principles, and PRISM is indeed an interesting one. One of the key concepts of the PRISM architecture is the EventAggregator component whose responsibility is to enable different application modules to communicate without knowing each other in a publish/subscribe paradigm. It's not entirely a new concept as the earlier CAB (SCSF) also had a similar implementation, and there are also some other Open Source implementations of the EventAggregator pattern like EventBroker and MemBus.

So, you must be asking yourself, why do we need another implementation. Well, the answer lies in the little details of the implementation and on one important feature - the cross process event distribution.

The project I am involved in is a large command and control system which has a rich GUI smart client system running on four screens and divided into three processes. Therefore, in order to fully utilize the EventAggregator principle, I needed some kind of IPC (Inter Process Communication) enabled library for sending events between GUI instances on a single machine.

The other thing is that some of these existing implementations oblige you to inherit from some base class when you define your events, while the other is limited only to EventHandler<TEventArgs> event handlers or use an Observer pattern by implementing generic interfaces on subscriber objects.

What I wanted is a simple POCO event definition which I can easily define, serialize when needed for the cross process communication, and publish directly on the EventAgrregator object.

So the requirements are as follows:

  1. Subscription may be strong or weak (using the WeakRefenence class).
  2. Events are POCO objects.
  3. Event handlers may be static, public, and nonpublic.
  4. All event handlers limited to one and single parameter.
  5. Event handlers may be invoked on the publisher thread, worker thread, or UI thread (a/sync).
  6. IPC mechanism must be one to many without any configuration.
  7. Event handlers may be invoked with the derived types of the events (i.e., polymorphic invocation).
  8. Subscribers may use attributes to mark event handlers, or subscribe directly to some delegate.

Implementation

With all these in mind, I produced the following interface:

C#
/// <summary>
/// Interface for the EventRouter implementation.
/// </summary>
public interface IEventRouter
{
  /// <summary>
  /// Adds subscriber's event handlers anotated with
  /// the EventConsumer attribute to the invokation list.
  /// </summary>
  /// <param name="subscriber"></param>
  /// <exception cref="InvalidOperationException" />
  void Register(object subscriber);

  /// <summary>
  /// Adds single event handler with EventConsumer attribute or with default options.
  /// </summary>
  /// <typeparam name="T"></typeparam>
  /// <param name="action"></param>
  void Register<T>(Action<T> action);

  /// <summary>
  /// Adds single event handler with the specified configuration options.
  /// </summary>
  /// <typeparam name="T"></typeparam>
  /// <param name="action"></param>
  /// <param name="useStrongReference"></param>
  /// <param name="to"></param>
  /// <param name="id"></param>
  /// <exception cref="InvalidOperationException" />
  void Register<T>(Action<T> action, bool useStrongReference, 
                   ThreadingOption tOption, string id);

  /// <summary>
  /// Sends event notification to all EventRouter instances on current machine.
  /// </summary>
  /// <param name="notification"></param>
  void Publish(object notification);

  /// <summary>
  /// Sends event notification on the current EventRouter
  /// </summary>
  /// <param name="notification"></param>
  void PublishLocal(object notification);

  /// <summary>
  /// Sends event notification to specific subscriber or group
  /// of subscribers on the current instance of EventRouter
  /// </summary>
  /// <param name="notification"></param>
  /// <param name="subscriptionId"></param>
  void SendTo(object notification, string subscriptionId);

  /// <summary>
  /// Removes all event handlers associated with current subscriber.
  /// </summary>
  /// <param name="subscriber"></param>
  void Unregister(object subscriber);

  /// <summary>
  /// Removes event handler with specified id.
  /// </summary>
   /// <param name="subscriptionId"></param>
  void Unregister(string subscriptionId);

  /// <summary>
  /// Gets or sets the value indicating whether
  /// to trigger event handlers with derived event types.
  /// </summary>
  bool PolymorphicInvokationEnabled { get; set; }

  /// <summary>
  /// Logging used for exceptions thrown by event handlers
  /// </summary>
  ILogger Logger { get; set; }
}

The EventRouter implementation should be singleton as you should use only one instance throughout your process space; however, I didn't implement it as a singleton because most of the EventRouter usage will be done using some IoC container (I am using Castle Windsor), and there it can be easily configured to singleton lifetime:

C#
IWindsorContainer container = new WindsorContainer();

container.Register(Component.For<ILogger>().ImplementedBy<NLogger>());

container.Register(Component.For<IPCoordinator>().ImplementedBy<WindowsManager>());

container.Register(Component.For<IUISyncronizable>().
          ImplementedBy<WPFUISynchronizer>());

container.Register(Component.For<IEventRouter>().
          ImplementedBy<EventRouter>().ActAs(LifestyleType.Singleton));

IEventRouter eventRouter = container.Resolve<IEventRouter>();

IPC

The Windows platform offers a wide range of IPC technologies and APIs. Having in mind the requirement of one to many distribution and no configuration, I decided to go with the Windows Messages API, in particular, WM_COPYDATA and SendMessage. While browsing on the web, I found this great project called XDMessaging which does almost exactly what I needed: it uses the SendMessageTimeout API to send the WM_COPYDATA message, and uses EnumWindows to find all the top-level windows registered to receive this message. The only mismatch with this implementation is that it uses binary serialized strings to pass data across processes.

So I copy/pasted the core of this library functionality, and replaced its transport medium to binary serialized objects. The implementation itself is pretty simple: each instance creates a top-level zero size window, and uses its window procedure to find WM_COPYDATA messages.

C#
public class WindowsManager : NativeWindow, IPCoordinator
{
  const string ChannelName = "MainChannel";
  List<IntPtr> m_windowHandles = new List<IntPtr>();
  object m_lock = new object();

  public WindowsManager()
  {
     CreateParams p = new CreateParams();
     p.Width = 0;
     p.Height = 0;
     p.X = 0;
     p.Y = 0;
     p.Caption = Guid.NewGuid().ToString();
     p.Parent = IntPtr.Zero;
     base.CreateHandle(p);

     Native.SetProp(base.Handle, ChannelName, (int)base.Handle);
  }

  protected override void WndProc(ref Message m)
  {
     base.WndProc(ref m);

     if (m.Msg == Native.WM_COPYDATA)
     {
        using (DataGram dataGram = DataGram.FromPointer(m.LParam))
        {
           if (NotifyEvent != null)
           {
              NotifyEvent(dataGram.Data);
           }
        }
     }
  }

  public void BroadcastMessage(object data)
  {
     IntPtr outPtr = IntPtr.Zero;

     lock (m_lock)
     {
        IEnumerable<IntPtr> windows = GetWindowHandles();

        using (DataGram dg = new DataGram(data))
        {
           Native.COPYDATASTRUCT dataStruct = dg.ToStruct();
           foreach (var handle in windows)
           {
              Native.SendMessageTimeout(handle, Native.WM_COPYDATA, 
                 IntPtr.Zero, ref dataStruct, 
                 Native.SendMessageTimeoutFlags.SMTO_ABORTIFHUNG, 
                 1000, out outPtr);
           }
        } 
     }
  }

  public IEnumerable<IntPtr> GetWindowHandles()
  {
     m_windowHandles.Clear();
     Native.EnumWindows(OnWindowEnum, IntPtr.Zero);
     return m_windowHandles;
  }
  
  private int OnWindowEnum(IntPtr hWnd, IntPtr lParam)
  {
     if (hWnd != base.Handle && Native.GetProp(hWnd, ChannelName) != 0)
     {
        m_windowHandles.Add(hWnd);
     }

     return 1;
  }

  #region IPCoordinator Members

  public void Broadcast(object notifictaion)
  {
      ThreadPool.QueueUserWorkItem(state => BroadcastMessage(notifictaion));
  }

  public event NotificationEventHandler NotifyEvent;

  #endregion
}

The message broadcasting executed on a thread pool thread since the SendMessageTimeout API uses a timeout of 1 second for each window, so it may block (rarely) for N seconds where N is the number of EventRouter instances.

You can apply any other IPC API by implementing the IPCoordinator interface and passing it to the EventRouter constructor:

C#
/// <summary>
/// Interface used for Inter Process Communication 
/// </summary>
public interface IPCoordinator
{
  /// <summary>
  /// Broadcasts event among other EventRouter instances
  /// </summary>
  /// <param name="notifictaion"></param>
  void Broadcast(object notifictaion);

  /// <summary>
  /// Fired when event arrives from another EventRouter instance
  /// </summary>
  event NotificationEventHandler NotifyEvent;
}

Thread synchronization

Since most event consumer handlers would be inside some UI resource, it is necessary to synchronize thread execution in order to avoid the nasty Cross-Thread exception. The current EventRouter implementation contains thread synchronization for both Windows Forms applications by using System.Threading.SynchronizationContext, and WPF applications by using System.Windows.Threading.Dispatcher.

I prefer using the Dispatcher object since it is created always and never returns null like the SynchronizationContext.Current method. The only limitations are that it can only be used with .NET 3.0 or higher and must be constructed on the UI thread.

You can implement you own UI sync by implementing the IUISyncronizable interface:

C#
/// <summary>
/// Interface used for UI thread synchronization.
/// </summary>
public interface IUISyncronizable
{
  /// <summary>
  /// Invokes method on the UI thread syncronously.
  /// </summary>
  /// <param name="method"></param>
  /// <param name="args"></param>
  void SyncInvoke(Delegate method, params object[] args);

  /// <summary>
  /// Invokes method on the UI thread asyncronously.
  /// </summary>
  /// <param name="method"></param>
  /// <param name="args"></param>
  void AsyncInvoke(Delegate method, params object[] args);
}

Exception handling

Exceptions thrown by the event handlers do not stop the event propagation, and if the IEventRouter.Logger property is initialized, will be logged according to the application configuration.

Using the code

Subscribing for event notifications

You can subscribe to events using the EventConsumer attribute:

C#
[EventConsumer(ThreadingOption = ThreadingOption.UIAsync)]
private void OnNewOrder(OrderAdded notif)
{
    // do something...
}

and then call the IEventRouter.Register(object subscriber) method.

If you want to explicitly subscribe to some notification, use one of the IEventRouter.Register(Action<T> action) overloads:

C#
router.Register<Notification>(delegate(Notification n)
{
  // do something...
}
, true, ThreadingOption.DefaultThread, null);

Publishing events

C#
router.Publish(new OrderAdded());

Sending events to specific subscriber/s

C#
router.SendTo(new Notification(), "MySpecialOne");

Using polymorphic invocation

When the EnablePolymorphicInvokation property is enabled (its disabled by default), your event handlers will be invoked also on event derived types:

C#
public class CustomerRemoved
{
  public int CustomerID;
}

public class VipCustomerRemoved : CustomerRemoved
{
    ...
}

[EventConsumer]
private void OnCustomerChange(CustomerRemoved notif)
{
    // do something...
}

In this case, the OnCustomerChange method will be called when both CustomerRemoved and VipCustomerRemoved events are published.

Removing subscriptions

By default, all subscriptions are implemented using weak references, so when the subscriber object goes out of scope, it will be reclaimed. Nevertheless, you can explicitly unregister the subscriber object either by its reference or by its subscription ID, if it is present. Please note that subscription ID is used like a single or group subscription rather than a unique key.

Some implementation details

The registration of a subscriber object based on reflecting on method calls and finding the EventConsumer attribute:

C#
public void Register(object subscriber)
{
    Type sType = subscriber.GetType();
    MethodInfo[] methods = sType.GetMethods(BindingFlags.Public | 
       BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.Static);

    foreach (var method in methods)
    {
        EventConsumer attr = 
          (EventConsumer)Attribute.GetCustomAttribute(method, typeof(EventConsumer));
        if (attr == null)
        {
            continue;
        }

        ParameterInfo[] pInfo = method.GetParameters();
        if (pInfo.Count() != 1)
        {
           throw new InvalidOperationException("Method must have a single parameter");
        }

        Type notifyType = pInfo[0].ParameterType;
        SubscriptionInfo info = new SubscriptionInfo();
        info.ID = attr.SubscriptionID;
        info.Method = method;
        info.Target = subscriber;
        info.ThreadOption = attr.ThreadingOption;
        info.UseStrongReference = attr.UseStrongReference;

        AddSubscription(notifyType, info);
    }
}

The subscriptions are stored in a MultiMap object which is actually a dictionary with an event type as a key and a list of subscriptions as a value. MultiMap is a small extension to a HashList class from the excellent NGenerics project.

All the actions performed on the MultiMap instance (subscriptions, invocations, and un-subscriptions) are inside a lock statement, so the EventRouter implementation is thread safe.  

Notes

When using IEventRouter.Register(Action<T> action) with an anonymous method, it may be reclaimed right after the first method invocation since there are no strong references to it. So to be on the safe side, either use strong reference for anonymous methods or don't use them at all :)

References

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
Israel Israel
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
QuestionEvents between processes of different name Pin
Member 1038880121-Mar-16 9:25
Member 1038880121-Mar-16 9:25 
GeneralMy vote of 5 Pin
bravo212-Oct-10 4:21
bravo212-Oct-10 4:21 
GeneralRe: My vote of 5 Pin
roman_gin13-Oct-10 2:40
roman_gin13-Oct-10 2:40 
Thanks.
GeneralMy vote of 5 Pin
vytheese11-Oct-10 19:05
professionalvytheese11-Oct-10 19:05 
GeneralRe: My vote of 5 Pin
roman_gin12-Oct-10 1:16
roman_gin12-Oct-10 1:16 
GeneralLooks good Pin
DaveyM6911-Oct-10 12:28
professionalDaveyM6911-Oct-10 12:28 
GeneralRe: Looks good Pin
roman_gin12-Oct-10 1:16
roman_gin12-Oct-10 1:16 
GeneralMy vote of 5 Pin
Or Chubook10-Oct-10 6:24
Or Chubook10-Oct-10 6:24 
GeneralRe: My vote of 5 Pin
roman_gin11-Oct-10 2:34
roman_gin11-Oct-10 2:34 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.