Introduction
Most of the systems today are based on
Client – Server architecture and the most commonly used mode of communication
between a client and a server is event subscription. Over the years, I have had
to work on various techniques of client server event subscriptions, each
implementation different from the other based on its task and business need.
This article notes down few essential techniques, their detailed implementation
with advantages and disadvantages.
Client - Server
A server can be
considered an entity which does some useful work and sends out a notification.
So our servers have two methods –
DoWork()
– In our examples we will simulate this
by putting the thread to sleep for a definite time framePublishEvent()
– A notification sent to
listeners that a work has been completed.
A client can be defined as an entity which performs some task upon
receiving some notification from the server. Our clients hence will have two
events.
Subscribe()
– Method used to subscribe to
server.ProcessEvent()
– Method where the client takes
appropriate action. In these examples we will put the thread to sleep for
illustration purposes.
Event
Subscription Techniques in a Client – Server Model
Below are descriptions of a few techniques to achieve this framework
with corresponding advantages and disadvantages.
1. Using Delegates
One of the classical methods
of subscribing to events in C# is using delegates. Our server has an event
associated called ServerEvent which is raised
using the delegate ServerEventHandler
public delegate void ServerEventHandler(object sender, EventArgs e);
public event ServerEventHandler ServerEvent;
The method looks as shown
below.
public void DoWork()
{
Thread.Sleep(5000);
PublishEvent();
}
The server publishes the
event after finishing its work to notify its clients.
private void PublishEvent()
{
this.ServerEvent(this, new EventArgs());
}
To receive the notification
from the server, our clients must be hooked on to it using the event exposed by
the server. The client takes a reference of the server in its constructor
public class Client
{
public int ClientId { get; set; }
public Client(Server s)
{
s.ServerEvent += new Server.ServerEventHandler(Subscribe);
}
}
The subscribe method is notified when an event is generated.
public void Subscribe(object sender, EventArgs e)
{
Console.WriteLine("Event received from server {0} @ {1} by Client {2}",
((Server)sender).ServerName, DateTime.Now.ToString("hh:mm:ss"),
this.ClientId);
ProcessEvent();
Console.WriteLine("Event requested processed {0} by Client {1} ",
DateTime.Now.ToString("hh:mm:ss"),this.ClientId);
}
public void ProcessEvent()
{
Thread.Sleep(2000);
}
The drawback with this approach is that multiple clients process the
event synchronously resulting in a linear delay proportional to the processing
time of the event for each client in the queue. One way to overcome this would
be to multithread the subscription process as shown below.
new Thread(new ThreadStart(() =>
{
ProcessEvent();
Console.WriteLine("Event requested processed {0} by Client {1} ",
DateTime.Now.ToString("hh:mm:ss"), this.ClientId);
})).Start();
In the main method, we
implement the clients and the server as shown below. We create a server and put
it in an infinite loop to generate events at random interval. A list of
clients, each with a reference to the server, is then initialized.
Server s = new Server() { ServerName = "MyServer" };
List<Client> clientList = new List<Client>()
{
new Client(s){ClientId=1},
new Client(s){ClientId=2},
new Client(s){ClientId=3}
};
while (true)
{
Thread.Sleep(new Random().Next(10) * 1000);
Console.WriteLine("Server Generating event {0}", DateTime.Now.ToString("hh:mm:ss"));
s.DoWork();
}
The output is as shown below

The advantage of
this model is simplicity but the story ends there. Although the server is not
attached to any of its clients, each client however is
very tightly coupled to the server. Any modification in the server would warrant
changes in each of the client. This is a painful design and would result in a
lot of unnecessary complexity in a larger project.
2. The Observer Pattern
There are volumes of
information available on the internet about the Observer Pattern so I will not
go into details of explaining it. The Observer pattern works on a publisher
subscriber mechanism with Subject publishing information and Observers
listening to their subject. The observers have to register with the subject
while initialization to get the notification. The class diagram looks as shown
below (taken from Wikipedia)

Let’s create the Subject first. We start with creating an Interface ISubject
which sets up the following contracts
- Add
- Remove
- Notify
public interface ISubject
{
void Add(IObserver Observer);
void Remove(IObserver observer);
void Notify();
}
Our server
implements the above interface and defines its contracts. Note the server has
to have an aggregation of its clients for sending the notification
public class Server : ISubject
{
public List<IObserver> Clients { get; set; }
public Server()
{
Clients = new List<IObserver>();
}
public void Add(IObserver Observer)
{
if (Clients != null)
Clients.Add(Observer);
}
public void Remove(IObserver observer)
{
if (Clients != null)
Clients.Remove(observer);
}
}
The Notify()
method iterates over the collection of clients
registered with the server. To make the notification async, we can use the
Parallel looping feature as shown below
public void Notify()
{
Parallel.ForEach<IObserver>(Clients, item =>
{
item.Update();
});
}
Finally, to run
this we initialize the server and its clients, register the clients with the
server and run an infinite loop generating server events at random intervals
Server s = new Server()
{
Clients= new List<IObserver>()
{
new Client(){ClientId=1},
new Client(){ClientId=2},
new Client(){ClientId=3}
}
};
while (true)
{
Thread.Sleep(new Random().Next(10) * 1000);
Console.WriteLine("Server Generating event {0}", DateTime.Now.ToString("hh:mm:ss"));
s.Notify();
}
The output can be
seen as below

Although we have
managed to decouple the client from the server, the server still retails an
aggregation of the client to notify each of them. Although good for a few handfuls
of clients, the asynchronous update to clients will be a serious problem if
their number increases. The registration process of the clients to the server
also does not help in creating a client – server model completely decoupled
from each other.
3. WaitHandle
The WaitHandle
class in
System.Threading namespace is a great tool for achieving event generations and
subscriptions. Ideally a wait handle is used to wait on tasks to synchronize
multiple threads running concurrently. Using this, an event can be signaled
which can then be processed by multiple clients waiting on the server for that
signal.
The WaitHandle
abstract
class is inherited by two concrete classes – ManualResetEvent and
AutoResetEvent
. They both work on the principle that concurrent threads wait on
an event using the method W<code>
aitOne()
. The end of wait is signaled by calling
the Set()
method. The handles are reset to their original state by calling the
Reset()
method. For an AutoResetEvent
, the reset happens automatically.
The below code explains its
implementation. We have a server which publishes an event after doing some
heavy work.
public class Server
{
public Server() { }
public void DoWork(int TaskId)
{
Thread.Sleep(2000);
Console.WriteLine("Publishing event with task ID {0}", TaskId);
PublishEvent();
}
private void PublishEvent()
{
WaitHandler.GetManualEventHandler().Set();
}
}
Our clients are waiting
for the server to signal an event on concurrent running threads. When the
server signals an event, the wait handles on all waiting threads are released
so that each client can then process the event. When all clients are done
processing the event, the event door is reset, allowing the server to generate
the next event. Since the clients are listening on a continuously running loop,
the challenge is to stop the clients from acting on same event more than once.
To solve, this we associate each event with an event ID. Each client retains a
token of the event it has processed through this event ID. If the event
generated has a different event ID the one already processed, the client won’t
act on it. Also, it is important for the client to know when it is safe to
reset the wait handle so that the server can signal its next event. This can be
achieved by updating a unique task list just after receiving the event and then
removing the item from the task list after acting on the event. When the task
count of the task list is 0, it can be inferred that all clients have processed
the event and the wait handle can now be reset.
The above can be
summarized as
- Server publishes event with an event id
- Clients receive the event and check if it is
same as the event already processed
- If not, clients update a task list and process
the event
- When finished, clients remove item from the task
list to signal completion
- When the task list is empty, the last client
will know that it is time to reset the handle so that server can signal next
event
public class Client
{
public Client(int ClientId)
{
this.ClientId = ClientId;
this.TaskId = -1;
new Task((o) => SubscribeToAsyncEvents(),
new System.Threading.CancellationToken()).Start();
}
public int ClientId { get; set; }
private int TaskId { get; set; }
private void SubscribeToAsyncEvents()
{
while (true)
{
if (WaitHandler.GetManualEventHandler().WaitOne())
{
if (WaitHandler.TaskId != TaskId)
{
TaskId = WaitHandler.TaskId;
object taskObject = new object();
lock (WaitHandler.TaskList)
{
WaitHandler.TaskList.Add(taskObject);
}
Console.WriteLine("Event Received by client {0} for Task Id - {1} @ {2}",
ClientId, TaskId, DateTime.Now.ToString("hh:mm:ss"));
ProcessEvent();
Console.WriteLine("Event Processed by client {0} for Task Id - {1} @ {2}",
ClientId, TaskId, DateTime.Now.ToString("hh:mm:ss"));
lock (WaitHandler.TaskList)
{
WaitHandler.TaskList.Remove(taskObject);
}
if (WaitHandler.TaskList.Count == 0)
{
WaitHandler.GetManualEventHandler().Reset();
Console.WriteLine("All events have been processed @ {0}", DateTime.Now.ToString("hh:mm:ss"));
}
}
}
}
}
void ProcessEvent()
{
System.Threading.Thread.Sleep(2000);
}
}
It is worth noting that a
single instance of the ManualResetEvent
Handler should be used by both the
client and the server.
public static class WaitHandler
{
private static System.Threading.ManualResetEvent manual = new System.Threading.ManualResetEvent(false);
private static System.Threading.AutoResetEvent auto = new System.Threading.AutoResetEvent(false);
public static System.Threading.AutoResetEvent GetAutoEventHandler()
{
return auto;
}
public static System.Threading.ManualResetEvent GetManualEventHandler()
{
return manual;
}
public static int TaskId { get; set; }
public static List<object> TaskList { get; set; }
}
The below code shows how
the server and the clients are initialized and events are generated and
notified. Note that in this implementation, the client and server are
completely agnostic of each other. The server fires an event and forgets. The
clients wait for signalling of the event and act on it before signalling to the
server that it is safe again to publish another event.
static void Main(string[] args)
{
Server s = new Server();
WaitHandler.TaskList = new List<object>();
List<Client> c = new List<Client>()
{
new Client(1),
new Client(2),
new Client(3)
};
while (true)
{
Thread.Sleep(new Random().Next(20) * 1000);
int eventID = new Random().Next(1000);
WaitHandler.TaskId = eventID;
Console.WriteLine("Server Generating event {0} @ {1}",eventID, DateTime.Now.ToString("hh:mm:ss"));
s.DoWork(eventID);
}
}
}
Below is a snapshot of the
output.

The only problem with this
design is - what happens when the server is generating events faster than the
clients can process? Remember that the client will reset the wait handle only
when all clients have finished processing the event. If an event is generated
in the meantime, we have to come up with some technique of putting it in a
queue and release it only when the handle is reset.
We have an Event class to
identify each event.
public class ServerEvent
{
public int EventID { get; set; }
}
The server exposes a
method
RegisterEvent(ServerEvent
e)
to register events in its queue
public void RegisterEvent(ServerEvent e)
{
lock (syncLock)
{
EventQueue.Enqueue(e);
}
}
The Sever’s method ManageEvents()
is run concurrently monitoring for a new event to be added to the
queue. It also deletes any event has been processed by all clients
private void ManageEvents()
{
while (true)
{
if (this.EventQueue!=null && this.EventQueue.Count>0)
{
if (!WaitHandler.GetManualEventHandler().WaitOne(0))
{
int eventID = ((ServerEvent)this.EventQueue.Peek()).EventID;
if (this.EventQueue.Count > 1)
{
lock (syncLock)
{
this.EventQueue.Dequeue();
}
Console.WriteLine("Server Unregistering event {0} @ {1}", eventID, DateTime.Now.ToString("hh:mm:ss"));
}
eventID = ((ServerEvent)this.EventQueue.Peek()).EventID;
WaitHandler.TaskId = eventID;
Console.WriteLine("Server Generating event {0} @ {1}", eventID, DateTime.Now.ToString("hh:mm:ss"));
this.DoWork(eventID);
}
}
}
}
Finally the event generation is modified so that now when
an event is generated, we need to add it to the server’s event
queue and the rest is taken care on its own.
while (true)
{
Thread.Sleep(1000);
int eventID = new Random().Next(500);
s.RegisterEvent(new ServerEvent() { EventID = eventID });
}
Although this works fine,
there are too many threads and concurrent operations running. In case of the
number of clients increasing, there can be serious load on process and memory.
4. Event Aggragator
The event
aggregator is a pattern that (in my opinion) has revolutionized the client –
server event subscription technique. Refer to the below diagram

The event
aggregator can be considered as an event hub. The servers publishing an event
do so by publishing it to the hub. The clients, interested in an event, do so
by registering with the hub. When an event is published, the hub (event
aggregator) looks up to find out the clients interested in listening to the
event. Only those clients are then notified.
The Event
aggregator has the following features
- The client and the server can be completely detached
bearing no knowledge of each other.
- Multiple servers can publish events concurrently
to the aggregator without caring about who receives them
- Multiple clients can listen to the events from
the aggregator without caring about the servers publishing those events.
- The clients can choose on whatever events they
want to listen.
In a nutshell, the
clients are hooked on to the messages that they are interested in without
caring about the server publishing the events. Similarly, the server’s job is
to publish the message without caring about the clients. In the heart of this,
managing the entire process is the event aggregator.
To listen to a
message, the client has to listen to a particular channel through which the
message is transmitted. We define a generic class Channel<TMessage>
which has a property MessageLoad
carrying the message.
public class Channel<TMessage>
{
public TMessage MessageLoad { get; set; }
}
The clients
subscribe to the event aggregator by generating a subscription token.
We define a class
which will generate the subscription tokens. This can be considered a stamp
giving the client authorization to listen to a particular message (event) over
a particular channel. As explained earlier, the client is not bothered about
who or how that event or message is generated. When the client goes to the
event subscriber to subscribe a particular event, it has to furnish this token
and has to inform the aggregator about the address where it would be listening.
It does that by carrying an action delegate in the subscription token as shown
below.
public class EventSubscription<TChannel, TMessage>
where TChannel : Channel<TMessage>
{
private Action<TChannel> subscribeAction;
public EventSubscription(Action<TChannel> DelAction)
{
if (DelAction != null)
{
this.subscribeAction = DelAction;
}
else
throw new Exception("Argument to Subscription cannot be null");
}
public void Execute(TChannel Channel)
{
subscribeAction(Channel);
}
}
The event
aggregator is implemented as shown in the following code. It has three methods
- Publish
- Subscribe
- Unsubscribe
It generates a subscription token for every client willing to
subscribe and keeps a record in a dictionary messageList. This is thus a
repository of all clients willing to listen to corresponding events and the
addresses of where they are listening. The dictionary is maintained by taking
the type of the message as a key and the lookup is a generic list of
subscription tokens. When a message is generated, it looks up for the
subscription tokens registered with the message type and notifies each
interested client. Note that the EventAggregator
class has to be singleton.
public class EventAggregator
{
private static EventAggregator eventAgg = new EventAggregator();
private IDictionary<System.Type, IList> messageList;
private EventAggregator()
{
messageList = new Dictionary<System.Type, IList>();
}
public static EventAggregator CreateInstance()
{
return eventAgg;
}
public void Publish<TChannel,TMessage>(TChannel Channel, TMessage Message)
where TChannel : Channel<TMessage>
{
Channel.MessageLoad = Message;
if (Channel != null)
{
System.Type messageType = Channel.GetType();
if (messageList.ContainsKey(messageType))
{
foreach (EventSubscription<TChannel, TMessage> s in (List<EventSubscription<TChannel, TMessage>>)messageList[messageType])
s.Execute(Channel);
}
}
else
throw new Exception("Unable to publish a null message");
}
public void Subscribe<TChannel, TMessage>(Action<TChannel> Delegate)
where TChannel : Channel<TMessage>
{
EventSubscription<TChannel, TMessage> subscriptionToken = new EventSubscription<TChannel, TMessage>(Delegate);
System.Type channelType = typeof(TChannel);
if (messageList.ContainsKey(channelType))
{
messageList[channelType].Add(subscriptionToken);
}
else
messageList.Add(channelType, new List<EventSubscription<TChannel, TMessage>>() { subscriptionToken });
}
public void Unsubscribe<TChannel, TMessage>(Action<TChannel> Delegate)
where TChannel:Channel<TMessage>
{
EventSubscription<TChannel, TMessage> subscriptionToken = new EventSubscription<TChannel, TMessage>(Delegate);
System.Type channelType = typeof(TChannel);
if (messageList.ContainsKey(channelType))
{
if (messageList[channelType].Contains(subscriptionToken))
messageList[channelType].Remove(subscriptionToken);
else
throw new Exception("Subscription not present");
}
}
}
To see this in
action, we first need to define our messages and channels. We have two messages
defined – MessageA
and MessageB
with properties MessageAProperty
,
MessageBProperty
respectively.
public class ChannelA:Channel<MessageA>
{
public ChannelA() {}
}
public class ChannelB:Channel<MessageB>
{
public ChannelB() { }
}
public class MessageA
{
public MessageA(){}
public string MessageAProperty { get; set; }
public override string ToString()
{
return MessageAProperty;
}
}
public class MessageB
{
public MessageB() { }
public string MessageBProperty { get; set; }
public override string ToString()
{
return MessageBProperty;
}
}
Let’s define our Server
public class Server
{
public Server() { }
public string Name { get; set; }
public void PublishEvent<TChannel,TMessage>(TChannel Channel, TMessage Message)
where TChannel : Channel<TMessage>
{
Console.WriteLine("Message published by {0} ---- {1}", this.Name, Message.GetType().ToString());
EventAggregator.CreateInstance().Publish<TChannel,TMessage>(Channel, Message);
}
}
Let’s define our Server
We then initialize
our server and get our clients subscribe to the event aggregator to whatever
messages they are interested in listening.
sList[0].PublishEvent<ChannelA, MessageA>(new ChannelA(),new MessageA() { MessageAProperty = "This is Message A" });
sList[1].PublishEvent<ChannelB, MessageB>(new ChannelB(), new MessageB() { MessageBProperty = "This is Message B" });
Console.Read();
We then have to
make our servers publish these events
List<Server> sList = new List<Server>()
{
new Server(){Name="Server 1"},
new Server(){Name="Server 2"}
};
List<Server> cList = new List<Server>()
{
new Server(){Name="Client 1"},
new Server(){Name="Client 2"},
};
EventAggregator eventAggregator = EventAggregator.CreateInstance();
eventAggregator.Subscribe<ChannelA, MessageA>((chA) => { ProcessEventA(chA); });
eventAggregator.Subscribe<ChannelB, MessageB>((chB) => { ProcessEventB(chB); });
We then have to make our servers publish these events
sList[0].PublishEvent<ChannelA, MessageA>(new ChannelA(),new MessageA() { MessageAProperty = "This is Message A" });
sList[1].PublishEvent<ChannelB, MessageB>(new ChannelB(), new MessageB() { MessageBProperty = "This is Message B" });
Console.Read();
The event
aggregator can serve as a powerful tool for event subscription techniques in a
client – server model. It can be particularly useful when one has to implement
communication between View Models in WPF. Note that, there is no clear boundary
between a client and a server. An entity can publish an event and also seek to
listen to another event from another entity.