Click here to Skip to main content
Click here to Skip to main content

Tagged as

Go to top

Client Server Event Subscription Techniques in C#

, 18 Sep 2012
Rate this:
Please Sign up or sign in to vote.
This article notes down few essential techniques, their detailed implementation with advantages and disadvantages.

Introduction

Most of the systems today are based on Client – Server architecture and the most commonly used mode of communication between a client and a server is event subscription. Over the years, I have had to work on various techniques of client server event subscriptions, each implementation different from the other based on its task and business need. This article notes down few essential techniques, their detailed implementation with advantages and disadvantages.

Client - Server

A server can be considered an entity which does some useful work and sends out a notification. So our servers have two methods –

  1. DoWork() – In our examples we will simulate this by putting the thread to sleep for a definite time frame
  2. PublishEvent() – A notification sent to listeners that a work has been completed.

A client can be defined as an entity which performs some task upon receiving some notification from the server. Our clients hence will have two events.

  1. Subscribe() – Method used to subscribe to server.
  2. ProcessEvent() – Method where the client takes appropriate action. In these examples we will put the thread to sleep for illustration purposes.

Event Subscription Techniques in a Client – Server Model

Below are descriptions of a few techniques to achieve this framework with corresponding advantages and disadvantages.

1. Using Delegates

One of the classical methods of subscribing to events in C# is using delegates. Our server has an event associated called ServerEvent which is raised using the delegate ServerEventHandler

public delegate void ServerEventHandler(object sender, EventArgs e);
public event ServerEventHandler ServerEvent;

The method looks as shown below.

public void DoWork()
{
    Thread.Sleep(5000);
    PublishEvent();
    
}

The server publishes the event after finishing its work to notify its clients.

private void PublishEvent()
{
    this.ServerEvent(this, new EventArgs());
}

To receive the notification from the server, our clients must be hooked on to it using the event exposed by the server. The client takes a reference of the server in its constructor

public class Client
{
    public int ClientId { get; set; }
	
    public Client(Server s)
    {
        s.ServerEvent += new Server.ServerEventHandler(Subscribe);
    }
}

The subscribe method is notified when an event is generated.

public void Subscribe(object sender, EventArgs e)
{
    Console.WriteLine("Event received from server {0} @ {1} by Client {2}", 
        ((Server)sender).ServerName, DateTime.Now.ToString("hh:mm:ss"),
        this.ClientId);
        
        ProcessEvent();
        Console.WriteLine("Event requested processed {0} by Client {1} ", 
            DateTime.Now.ToString("hh:mm:ss"),this.ClientId);            
}
        
public void ProcessEvent()
{
    // Do Work
    Thread.Sleep(2000);
}

The drawback with this approach is that multiple clients process the event synchronously resulting in a linear delay proportional to the processing time of the event for each client in the queue. One way to overcome this would be to multithread the subscription process as shown below.

new Thread(new ThreadStart(() =>
{
    ProcessEvent();
    Console.WriteLine("Event requested processed {0} by Client {1} ", 
        DateTime.Now.ToString("hh:mm:ss"), this.ClientId);
})).Start();

In the main method, we implement the clients and the server as shown below. We create a server and put it in an infinite loop to generate events at random interval. A list of clients, each with a reference to the server, is then initialized.

Server s = new Server() { ServerName = "MyServer" };

    List<Client> clientList = new List<Client>()
    {
        new Client(s){ClientId=1},
        new Client(s){ClientId=2},
        new Client(s){ClientId=3}
    };

while (true)
{
    Thread.Sleep(new Random().Next(10) * 1000);
    Console.WriteLine("Server Generating event {0}", DateTime.Now.ToString("hh:mm:ss"));
    s.DoWork();
}

The output is as shown below

The advantage of this model is simplicity but the story ends there. Although the server is not attached to any of its clients, each client however is very tightly coupled to the server. Any modification in the server would warrant changes in each of the client. This is a painful design and would result in a lot of unnecessary complexity in a larger project.

2. The Observer Pattern

There are volumes of information available on the internet about the Observer Pattern so I will not go into details of explaining it. The Observer pattern works on a publisher subscriber mechanism with Subject publishing information and Observers listening to their subject. The observers have to register with the subject while initialization to get the notification. The class diagram looks as shown below (taken from Wikipedia)

Let’s create the Subject first. We start with creating an Interface ISubject which sets up the following contracts

  1. Add
  2. Remove
  3. Notify
public interface ISubject
{        
    void Add(IObserver Observer);
    void Remove(IObserver observer);
    void Notify();
}

Our server implements the above interface and defines its contracts. Note the server has to have an aggregation of its clients for sending the notification

public class Server : ISubject
{
    public List<IObserver> Clients { get; set; }
 
    public Server()
    {
        Clients = new List<IObserver>();
    }
 
    public void Add(IObserver Observer)
    {
    if (Clients != null)
        Clients.Add(Observer);
    }
 
    public void Remove(IObserver observer)
    {
    if (Clients != null)
        Clients.Remove(observer);
    }
 
}

The Notify() method iterates over the collection of clients registered with the server. To make the notification async, we can use the Parallel looping feature as shown below

public void Notify()
{

    //foreach (IObserver item in Clients)
    //{
    //    item.Update();
    //};
    Parallel.ForEach<IObserver>(Clients, item =>
        {
            item.Update();

        });           
}

Finally, to run this we initialize the server and its clients, register the clients with the server and run an infinite loop generating server events at random intervals

Server s = new Server()
{
    Clients= new List<IObserver>()
    {
        new Client(){ClientId=1},
        new Client(){ClientId=2},
        new Client(){ClientId=3}
    }
};

while (true)
{
    Thread.Sleep(new Random().Next(10) * 1000);
    Console.WriteLine("Server Generating event {0}", DateTime.Now.ToString("hh:mm:ss"));
    s.Notify();
}

The output can be seen as below

Although we have managed to decouple the client from the server, the server still retails an aggregation of the client to notify each of them. Although good for a few handfuls of clients, the asynchronous update to clients will be a serious problem if their number increases. The registration process of the clients to the server also does not help in creating a client – server model completely decoupled from each other. 

3. WaitHandle

The WaitHandle class in System.Threading namespace is a great tool for achieving event generations and subscriptions. Ideally a wait handle is used to wait on tasks to synchronize multiple threads running concurrently. Using this, an event can be signaled which can then be processed by multiple clients waiting on the server for that signal.

The WaitHandle abstract class is inherited by two concrete classes – ManualResetEvent and AutoResetEvent. They both work on the principle that concurrent threads wait on an event using the method WaitOne(). The end of wait is signaled by calling the Set() method. The handles are reset to their original state by calling the Reset() method. For an AutoResetEvent, the reset happens automatically. 

The below code explains its implementation. We have a server which publishes an event after doing some heavy work.

public class Server
{
    public Server() { }

    public void DoWork(int TaskId)
    {
        Thread.Sleep(2000);
        Console.WriteLine("Publishing event with task ID {0}", TaskId);
        PublishEvent();
    }

    private void PublishEvent()
    {
        WaitHandler.GetManualEventHandler().Set();
    }
}

Our clients are waiting for the server to signal an event on concurrent running threads. When the server signals an event, the wait handles on all waiting threads are released so that each client can then process the event. When all clients are done processing the event, the event door is reset, allowing the server to generate the next event. Since the clients are listening on a continuously running loop, the challenge is to stop the clients from acting on same event more than once. To solve, this we associate each event with an event ID. Each client retains a token of the event it has processed through this event ID. If the event generated has a different event ID the one already processed, the client won’t act on it. Also, it is important for the client to know when it is safe to reset the wait handle so that the server can signal its next event. This can be achieved by updating a unique task list just after receiving the event and then removing the item from the task list after acting on the event. When the task count of the task list is 0, it can be inferred that all clients have processed the event and the wait handle can now be reset.

The above can be summarized as

  1. Server publishes event with an event id
  2. Clients receive the event and check if it is same as the event already processed
  3. If not, clients update a task list and process the event
  4. When finished, clients remove item from the task list to signal completion
  5. When the task list is empty, the last client will know that it is time to reset the handle so that server can signal next event
public class Client
    {
        public Client(int ClientId) 
        {
            this.ClientId = ClientId;
            this.TaskId = -1;
            new Task((o) => SubscribeToAsyncEvents(), 
                new System.Threading.CancellationToken()).Start();            
        }

        public int ClientId { get; set; }
        private int TaskId { get; set; }

        private void SubscribeToAsyncEvents()
        {
            while (true)
            {
                if (WaitHandler.GetManualEventHandler().WaitOne())
                {
                    if (WaitHandler.TaskId != TaskId)
                    {                        
                        TaskId = WaitHandler.TaskId;

                        object taskObject = new object();

                        lock (WaitHandler.TaskList)
                        {
                            WaitHandler.TaskList.Add(taskObject);
                        }

                        Console.WriteLine("Event Received by client {0} for Task Id - {1} @ {2}",
                            ClientId, TaskId, DateTime.Now.ToString("hh:mm:ss"));

                        ProcessEvent();

                        Console.WriteLine("Event Processed by client {0} for Task Id - {1} @ {2}",
                            ClientId, TaskId, DateTime.Now.ToString("hh:mm:ss"));

                        lock (WaitHandler.TaskList)
                        {
                            WaitHandler.TaskList.Remove(taskObject);
                        }

                        if (WaitHandler.TaskList.Count == 0)
                        {
                            // condition that all threads have executed
                            WaitHandler.GetManualEventHandler().Reset();
                            Console.WriteLine("All events have been processed @ {0}", DateTime.Now.ToString("hh:mm:ss"));
                        }
                    }
                }

            }

        }

        void ProcessEvent()
        {
            System.Threading.Thread.Sleep(2000);
        }
    }

It is worth noting that a single instance of the ManualResetEvent Handler should be used by both the client and the server.

public static class WaitHandler
    {
        private static System.Threading.ManualResetEvent manual = new System.Threading.ManualResetEvent(false);
        private static System.Threading.AutoResetEvent auto = new System.Threading.AutoResetEvent(false);

        public static System.Threading.AutoResetEvent GetAutoEventHandler()
        {
            return auto;
        }

        public static System.Threading.ManualResetEvent GetManualEventHandler()
        {
            return manual;
        }

        public static int TaskId { get; set; }

        public static List<object> TaskList { get; set; }
    }

The below code shows how the server and the clients are initialized and events are generated and notified. Note that in this implementation, the client and server are completely agnostic of each other. The server fires an event and forgets. The clients wait for signalling of the event and act on it before signalling to the server that it is safe again to publish another event.

static void Main(string[] args)
{
    Server s = new Server();
    WaitHandler.TaskList = new List<object>();

    List<Client> c = new List<Client>()
    {
        new Client(1),
        new Client(2),
        new Client(3)
    };

    while (true)
    {
        Thread.Sleep(new Random().Next(20) * 1000);
        int eventID = new Random().Next(1000);                
        WaitHandler.TaskId = eventID;
        Console.WriteLine("Server Generating event {0} @ {1}",eventID, DateTime.Now.ToString("hh:mm:ss"));
        s.DoWork(eventID);
    }
  }
}

Below is a snapshot of the output.

The only problem with this design is - what happens when the server is generating events faster than the clients can process? Remember that the client will reset the wait handle only when all clients have finished processing the event. If an event is generated in the meantime, we have to come up with some technique of putting it in a queue and release it only when the handle is reset.

We have an Event class to identify each event.

public class ServerEvent
{
    public int EventID { get; set; }
}

The server exposes a method RegisterEvent(ServerEvent e) to register events in its queue

public void RegisterEvent(ServerEvent e)
{
    lock (syncLock)
    {
        EventQueue.Enqueue(e);
    }
            
}

The Sever’s method ManageEvents() is run concurrently monitoring for a new event to be added to the queue. It also deletes any event has been processed by all clients

private void ManageEvents()
{
            
            while (true)
            {
                if (this.EventQueue!=null && this.EventQueue.Count>0)
                {
                    if (!WaitHandler.GetManualEventHandler().WaitOne(0))
                    {
                        // Handle is reset - No events are currently being processed

                        int eventID = ((ServerEvent)this.EventQueue.Peek()).EventID;
                        if (this.EventQueue.Count > 1)
                        {
                            // remove the event that hass been processed
                            lock (syncLock)
                            {
                                this.EventQueue.Dequeue();
                            }
                            
                            Console.WriteLine("Server Unregistering event {0} @ {1}", eventID, DateTime.Now.ToString("hh:mm:ss"));
                        }

                        eventID = ((ServerEvent)this.EventQueue.Peek()).EventID;

                        // process any new event in queue
                        WaitHandler.TaskId = eventID;
                        Console.WriteLine("Server Generating event {0} @ {1}", eventID, DateTime.Now.ToString("hh:mm:ss"));
                        this.DoWork(eventID);
                    }
                        
                }
            }
        }

Finally the event generation is modified so that now when an event is generated, we need to add it to the server’s event queue and the rest is taken care on its own.

	while (true)
            {                
                Thread.Sleep(1000);
                int eventID = new Random().Next(500);
                
                s.RegisterEvent(new ServerEvent() { EventID = eventID });
            }

Although this works fine, there are too many threads and concurrent operations running. In case of the number of clients increasing, there can be serious load on process and memory.

4. Event Aggragator 

The event aggregator is a pattern that (in my opinion) has revolutionized the client – server event subscription technique. Refer to the below diagram

The event aggregator can be considered as an event hub. The servers publishing an event do so by publishing it to the hub. The clients, interested in an event, do so by registering with the hub. When an event is published, the hub (event aggregator) looks up to find out the clients interested in listening to the event. Only those clients are then notified.

The Event aggregator has the following features

  1. The client and the server can be completely detached bearing no knowledge of each other.
  2. Multiple servers can publish events concurrently to the aggregator without caring about who receives them
  3. Multiple clients can listen to the events from the aggregator without caring about the servers publishing those events.
  4. The clients can choose on whatever events they want to listen.

In a nutshell, the clients are hooked on to the messages that they are interested in without caring about the server publishing the events. Similarly, the server’s job is to publish the message without caring about the clients. In the heart of this, managing the entire process is the event aggregator.

To listen to a message, the client has to listen to a particular channel through which the message is transmitted. We define a generic class Channel<TMessage> which has a property MessageLoad carrying the message.

public class Channel<TMessage>
    {
        public TMessage MessageLoad { get; set; }
    }

The clients subscribe to the event aggregator by generating a subscription token.

We define a class which will generate the subscription tokens. This can be considered a stamp giving the client authorization to listen to a particular message (event) over a particular channel. As explained earlier, the client is not bothered about who or how that event or message is generated. When the client goes to the event subscriber to subscribe a particular event, it has to furnish this token and has to inform the aggregator about the address where it would be listening. It does that by carrying an action delegate in the subscription token as shown below.

public class EventSubscription<TChannel, TMessage>
        where TChannel : Channel<TMessage>        
    {
        private Action<TChannel> subscribeAction;

        public EventSubscription(Action<TChannel> DelAction)
        {
            if (DelAction != null)
            {                
                this.subscribeAction = DelAction;
            }
            else
                throw new Exception("Argument to Subscription cannot be null");
        }
                
        public void Execute(TChannel Channel)
        {
            subscribeAction(Channel);
        }
    }

The event aggregator is implemented as shown in the following code. It has three methods

  1. Publish
  2. Subscribe
  3. Unsubscribe

It generates a subscription token for every client willing to subscribe and keeps a record in a dictionary messageList. This is thus a repository of all clients willing to listen to corresponding events and the addresses of where they are listening. The dictionary is maintained by taking the type of the message as a key and the lookup is a generic list of subscription tokens. When a message is generated, it looks up for the subscription tokens registered with the message type and notifies each interested client. Note that the EventAggregator class has to be singleton.

public class EventAggregator
    {
        // ensure a singleton instance
        private static EventAggregator eventAgg = new EventAggregator();

        private IDictionary<System.Type, IList> messageList;

        private EventAggregator()
        {
            messageList = new Dictionary<System.Type, IList>();
        }

        public static EventAggregator CreateInstance()
        {
            return eventAgg;
        }
                
        public void Publish<TChannel,TMessage>(TChannel Channel, TMessage Message)
            where TChannel : Channel<TMessage>            
        {
            Channel.MessageLoad = Message;
            if (Channel != null)
            {
                System.Type messageType = Channel.GetType();
                if (messageList.ContainsKey(messageType))
                {
                    foreach (EventSubscription<TChannel, TMessage> s in (List<EventSubscription<TChannel, TMessage>>)messageList[messageType])
                        s.Execute(Channel);
                }
            }
            else
                throw new Exception("Unable to publish a null message");

        }
                
        public void Subscribe<TChannel, TMessage>(Action<TChannel> Delegate)
            where TChannel : Channel<TMessage>            
        {
            EventSubscription<TChannel, TMessage> subscriptionToken = new EventSubscription<TChannel, TMessage>(Delegate);
                        
            System.Type channelType = typeof(TChannel);
            if (messageList.ContainsKey(channelType))
            {
                messageList[channelType].Add(subscriptionToken);
            }                
            else
                messageList.Add(channelType, new List<EventSubscription<TChannel, TMessage>>() { subscriptionToken });
        }

        public void Unsubscribe<TChannel, TMessage>(Action<TChannel> Delegate)
             where TChannel:Channel<TMessage>        
        {
            EventSubscription<TChannel, TMessage> subscriptionToken = new EventSubscription<TChannel, TMessage>(Delegate);

            System.Type channelType = typeof(TChannel);
            if (messageList.ContainsKey(channelType))
            {
                if (messageList[channelType].Contains(subscriptionToken))
                    messageList[channelType].Remove(subscriptionToken);
                else
                    throw new Exception("Subscription not present");
            }
        }

    }

To see this in action, we first need to define our messages and channels. We have two messages defined – MessageA and MessageB with properties MessageAProperty, MessageBProperty respectively.

public class ChannelA:Channel<MessageA>
    {
        public ChannelA() {}            
    }

    public class ChannelB:Channel<MessageB>
    {
        public ChannelB() { }
    }
    
    public class MessageA 
    {
        public MessageA(){}
        
        public string MessageAProperty { get; set; }

        public override string ToString()
        {
            return MessageAProperty;
        }
    }

    public class MessageB 
    {
        public MessageB() { }

        public string MessageBProperty { get; set; }

        public override string ToString()
        {
            return MessageBProperty;
        }
    }

Let’s define our Server

	public class Server
    {
        public Server() { }

        public string Name { get; set; }       

        public void PublishEvent<TChannel,TMessage>(TChannel Channel, TMessage Message)
            where TChannel : Channel<TMessage>            
        {
            Console.WriteLine("Message published by {0} ---- {1}", this.Name, Message.GetType().ToString()); 
            EventAggregator.CreateInstance().Publish<TChannel,TMessage>(Channel, Message);
        }
        
    }

Let’s define our Server

We then initialize our server and get our clients subscribe to the event aggregator to whatever messages they are interested in listening.

// Server 1 publishes Message A via Channel A
            sList[0].PublishEvent<ChannelA, MessageA>(new ChannelA(),new MessageA() { MessageAProperty = "This is Message A" });
            // Server 2 publiches Message B via Channel B
            sList[1].PublishEvent<ChannelB, MessageB>(new ChannelB(), new MessageB() { MessageBProperty = "This is Message B" });
            
            Console.Read();

We then have to make our servers publish these events

List<Server> sList = new List<Server>()
                {
                    new Server(){Name="Server 1"},
                    new Server(){Name="Server 2"}                    
                };

            List<Server> cList = new List<Server>()
                {
                    new Server(){Name="Client 1"},
                    new Server(){Name="Client 2"},                    
                };

            EventAggregator eventAggregator = EventAggregator.CreateInstance();
             
            // Client 1 subscribes to Message A via Channel A
            eventAggregator.Subscribe<ChannelA, MessageA>((chA) => { ProcessEventA(chA); });

            //Client 2 subscribes to Message B via channel B
            eventAggregator.Subscribe<ChannelB, MessageB>((chB) => { ProcessEventB(chB); });

We then have to make our servers publish these events

// Server 1 publishes Message A via Channel A
            sList[0].PublishEvent<ChannelA, MessageA>(new ChannelA(),new MessageA() { MessageAProperty = "This is Message A" });
            // Server 2 publiches Message B via Channel B
            sList[1].PublishEvent<ChannelB, MessageB>(new ChannelB(), new MessageB() { MessageBProperty = "This is Message B" });
            
            Console.Read();

The event aggregator can serve as a powerful tool for event subscription techniques in a client – server model. It can be particularly useful when one has to implement communication between View Models in WPF. Note that, there is no clear boundary between a client and a server. An entity can publish an event and also seek to listen to another event from another entity.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

anshudutta
Software Developer (Senior)
Australia Australia
I am a Senior Software Developer / Technical Consultant in a leading software company.

Comments and Discussions

 
QuestionThreadSafe PinmemberFatCatProgrammer19-May-14 3:21 
GeneralExcellent Article PinmemberAmirLevel227-Jul-13 15:14 
QuestionSame pattern in Prism? PinmemberHarold Chattaway3-Jan-13 3:24 
AnswerRe: Same pattern in Prism? Pinmemberanshudutta13-Jan-13 15:08 
GeneralMy vote of 5 PinmembervSoares19-Sep-12 15:03 
GeneralMy vote of 5 PinmvpKanasz Robert19-Sep-12 4:42 
GeneralMy vote of 5 PinmemberZeroDotNet19-Sep-12 3:00 
GeneralMy vote of 5 PinmemberBurak Özdiken18-Sep-12 22:39 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web02 | 2.8.140916.1 | Last Updated 18 Sep 2012
Article Copyright 2012 by anshudutta
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid