
Introduction
Notifiers make possible anonymous communication between
objects in a system. Because they are anonymous, objects communicating have no
knowledge of one another and therefore are independent of the object they are
communicating with. They are also easy to understand providing a seamless
migration as new developers are introduced to a project. Other languages
(notably smalltalk) have this feature built in; C++ gives users the freedom to
create their own.
Design
Those interested in sending messages to subscribers do so
through the notifier class methods; those interested in receiving messages
implement the subscriber interface and register with the appropriate notifier.
The notifier is responsible for message routing; for each type of message,
there exists a different notifier. Notifier makes it possible to uncouple
senders and receivers (subscribers) of messages.

Figure 1. Notifier collaboration diagram
A thread-safe notifier
The notifier must fulfill three requirements:
- simple
- extensible
- thread-safe
Simplicity is a vague term. I choose to define it in
terms of complexity – the less complex a piece of code is, the easier it is to
maintain, explain, optimize and test. The less complex a piece of code is, the
simpler it is. Convenient? ; )
Extensibility is a difficult metric to gauge – although
there are many definitions, the common theme amongst them is the ability to add
and incorporate new functionality, or technical advances, into an existing code
base. My ideal notion of extensibility is code that allows me to maintain it
over time without major work to the original code. Templates make this
possible by parameterizing key types and allowing one to work with those
outside of the implementation.
Thread-safety relates to the fact that clients
using a notifier in a multithreaded environment should not have to be concerned
with thread synchronization issues – subscribers will need to, but the notifier
should not.
The notifier has the following responsibilities:
- registering subscribers
- unregistering subscribers
- notifying subscribers
To do so, a subscriber map is maintained. The subscriber
serves as the key, a flag indicating whether or not the subscriber is registered
serves as the data. A flag is used so that unregistration does not affect
(remove an entry from) the subscription map. This is important if subscribers
choose to unsubscribe during synchronous notification.
Upon registration the subscriber is inserted into the
subscriber map; if it already exists, a check is made to determine whether or
not it has been unregistered. If the subscriber has been flagged for
unregistration but has not been removed from the subscriber map, its state is
reset to 'registered'.
Notification involves obtaining a lock to the critical
section, iterating through the map and notifying the subscribers. Any
subscriber flagged as unregistered will not be notified. Once subscribers have
been notified, all subscribers flagged as unregistered from the subscriber map
are removed.
Unregistration involves flagging the subscriber for
unregistration. The next time notification occurs, the subscriber will be
removed from the map.
Asynchronous notification
Asynchronous notification is straightforward using the
thread pool created in
Windows Thread Pooling. A work unit is defined who, while processing, notifies
subscribers of the event.
void async_notifier<T_state>::process() {
notifier<T_state>::instance().notify(m_state);
}
That is all there is to it!
Demonstration
Our need for notifiers arose when we created a service
that starts a socket listening. While waiting for incoming connections the
calling thread sleeps as it awaits notification:
WSANETWORKEVENTS c_events = {0};
wait_for_event(e_event_accept, c_events, INFINITE);
SOCKADDR_IN sa_client = {0};
int n_sa = sizeof(sa_client);
smart_socket_handle sp_conn(accept((SOCKADDR*)&c_sa_client, n_sa));
If the service is to shutdown cleanly, we need a way of
signaling the waiting socket to shutdown. Moreover, there were several objects
that required notification of the service shutdown. Notifiers turned out to be
the best solution as we could quickly bind them to the shutdown event.
namespace events
{
enum service
{
service_start,
service_pause,
service_resume,
service_stop
};
typedef notifier<service> service_notifier;
events::service is the notification data; publishers simply need to
inform our subscriber:
template<class T_connection>
struct listener :
socket,
core::work_unit,
subscriber<events::service>
of the event:
void on_shutdown()
{
events::service_notifier::instance()->notify(events::service_stop);
The handler in listener takes care of the dirty work:
void on_notify(const state& event)
{
switch (event)
{
case events::service_start:
case events::service_resume:
start_listening();
break;
case events::service_stop:
case events::service_pause:
stop_listening();
break;
default: throw std::invalid_argument("listener::on_notify");
}
}
Using the code
To create a subscriber, derive your class from subscriber. You must implement the
on_notify method.
struct work : core::work_unit, subscriber<events>
{
work();
void process();
void on_notify(const events& e);
bool m_work;
double m_data;
};
To receive notification of events your subscriber must subscribe using the
subscribe method.
void work::process()
{
subscribe();
To discontinue notification of events your subscriber must
unsubscribe using the unsubscribe method.
void work::on_notify(const events& e)
{
switch (e)
{
case die:
unsubscribe();
std::cout << "dying: " << this << std::endl;
m_work = false;
break;
To synchronously notify subscribers, use notifier::notify.
void input::process()
{
std::cout << "(S)pawn/(D)ie: ";
switch (::tolower(::getch()))
{
case 'd':
notifier<events>::instance().notify(die);
To asynchronously notify subscribers, create and queue an instance of async_notifier.
global::thread_pool::instance().queue_request(new async_notifier<events>(die));
About the demo project
The demo project models the scenario above. Workers are
created who perform work until notified of shutdown. Upon notification, the
workers die and the system gracefully shuts down. More workers may be spawned
to simulate a heavy load on the code.
- The thread pool is initialized.
- Work is queued and is killed asynchronously using the notifier.
- A spawn work unit and an input work unit are queued.
- The main thread waits on an event signaled when the input work unit dies.
There are some important considerations to keep in mind
when going through the testbed. Our main thread waits on an event that is
signaled when input is dying. Depending on the amount of work pending, exiting
the application immediately could be disastrous.
To understand, consider the following scenario:
the user has instructed input to die; input synchronously
notifies all subscribers to die and signals our main thread to die.
For the sake of argument, assume there are 100 work units
queued and pending. Threads in the thread pool will not die until all work is
extinguished and they have received the shutdown flag. Meanwhile, main is unwinding
and destroys the process heap. Our work units are finally released as the last bit of
work trickles out. Without a heap, what happens?
To keep things simple, I pause the main thread during
which the machinery gently shuts down. More elaborate synchronization
mechanisms may be employed. However, during shutdown I am not encumbered by
performance requirements and prefer the simple implementation. Your mileage
may vary.
Conclusions
Notifiers are a great way of coupling parts of a system
that are interested in state changes but don't want to be bound to one
another. Templates provide a natural mechanism for reuse of the notification
subsystem with different state data. Synchronous and asynchronous notification
provides the flexibility required for all but the most esoteric situations.
Whether you choose to use a freely available
implementation or to roll your own, notifiers make building complex solutions
in any language more manageable.
Happy Coding!
History
- 09/05/2004 Links updated; thanks to Phillipe Lhoste for pointing out broken links.
- 27/04/2004 Article posted.