A Synchronous Observer of Asynchronous Events





0/5 (0 vote)
In this article, we will write a simple multi-threaded example implementation of the Observer pattern, and show how to re-synchronise a past event to look current. Then we’ll demonstrate a technique to treat future events like they’re current, too.
Introduction
In the Observer design pattern, a subject holds a list of interested parties – the observers – which it will notify about changes in status. Simply put, it’s a form of subscription, and this design comes up in all sorts of places (which is one of the definitions of the term ‘design pattern‘). It’s well suited for handling asynchronous events, like user interaction in a GUI, sensor information, and so on.
There is, however, often a need to re-synchronise asynchronous events. For instance, you might keep the latest status update until it’s actually needed for display, storage or some calculation. By doing this, you disregard the asynchronous nature of its source, and treat it as just another variable, as if it had been read from the subject right then. In other words, you synchronise a status from the past with the present. Sometimes, though, you don’t want the last value, but the next, which is a bit more complex, as it requires you to wait for the future to happen before we can say it’s the present.
In this article, we will write a simple multi-threaded example implementation of the Observer pattern, and show how to re-synchronise a past event to look current. Then, we’ll demonstrate a technique to treat future events like they’re current, too.
Thread Safety
If, as is often the case, the observed subject and the part of the system that manages the observers run in different threads, we need to make sure that they can co-exist in a friendly manner. Specifically, we must make sure that adding or removing an observer – activities that take place outside the observed thread – does not interfere with the reporting of events.
In other words, accessing the list of observers is a critical section of the code, which may not be interrupted. Since this is a quite common situation, operating systems that support multi-threading also provide tools to handle it. On Windows, this is done with a CRITICAL_SECTION
object.
(If you use MFC, there is an eponymous wrapper for it. However, the implementations of the MFC synchronisation objects have been badly flawed in the past, and I believe they still are.)
Checking the documentation, we see that there are functions available to create and destroy CRITICAL_SECTION
, and to enter and leave a locked state. We also see that it can be entered and left recursively, as long as it’s the same thread. Knowing all this, we can write a C++ class to manage it.
Why not use the Windows API directly? The same reason for almost all C++ wrappers of OS objects – lifetime management. Instead of having to remember to call DeleteCriticalSection
everywhere, it might be needed, we can do that in the destructor, as per the Resource Acquisition Is Initialization idiom.
// Need CRITICAL_SECTION declaration
#include <Windows.h>
// Class wrapping Win32 CRITICAL_SECTION
class CriticalSection
{
public:
// Constructor
CriticalSection()
{
::InitializeCriticalSection(&cs_);
}
// Destructor
~CriticalSection()
{
::DeleteCriticalSection(&cs_);
}
// Enter critical section
void Enter()
{
::EnterCriticalSection(&cs_);
}
// Leave critical section
void Leave()
{
::LeaveCriticalSection(&cs_);
}
private:
// Hide copy operations
CriticalSection(const CriticalSection&);
CriticalSection& operator=(const CriticalSection&);
// Data member
CRITICAL_SECTION cs_;
};
Quite simple, really, with little overhead. And because every Enter
must be matched by a Leave
, the sensible thing to do is to write a RAII wrapper for that, too. If we don’t, odds are that at some point, we’ll alter the code using the CriticalSection
and introduce a new exit point, via a function return or exception, which won’t get the Leave
function called. A RAII wrapper helps code robustness.
// RAII Critical section lock
class CSLock
{
public:
// Constructor
CSLock(CriticalSection& section)
: section_(section)
{
section_.Enter();
}
// Destructor
~CSLock()
{
section_.Leave();
}
private:
// Hide copy operations
CSLock(const CSLock&);
CSLock& operator=(const CSLock&);
// Data member
CriticalSection& section_;
};
This automates the locking, so that we only need declare a CSLock
at the beginning of the scope we wish to lock for interruption, and will automatically unlock as we leave the scope and the destructor is called. We’ll see examples of how these are used below.
Something to See
Now that we have the tools to support a multi-threaded application, let’s write a simple Observer system. This requires something to observe, and something to do the observing. For this example, we’ll make a Subject
class, which declares an internal, abstract Subject::Observer
class, from which we’ll derive our observers. The Subject
notification in this example will send an integer to the observers.
#include <set>
#include "CSLock.h"
// Subject to be observed
class Subject
{
public:
// Abstract observer
class Observer
{
// The subject we're observing
Subject* subject_;
// The subject is a friend, so it can help manage our relationship
friend class Subject;
// Assign a subject we've been registered with
void SetSubject(Subject* subject)
{
// Is it the one we have already?
if (subject != subject_)
{
// Unregister from current subject, if any
if (0 != subject_)
{
subject_->Unregister(*this);
}
// Remember new subject
subject_ = subject;
}
}
// Kicked out by the subject
void ReleaseFromSubject(Subject* subject)
{
if (subject == subject_)
{
subject_ = 0;
}
}
protected:
// Constructor only available to derived classes
Observer()
: subject_(0)
{}
// Derived classes decide what to do with notifications
// (Still available to Subject class, as it's a friend)
virtual void Notify(int) = 0;
public:
// Base classes need virtual destructor
virtual ~Observer()
{
SetSubject(0);
}
};
// End of internal class definition
// Destructor
~Subject()
{
// Lock, as we're using the set of observers
CSLock lock(criticalsection_);
// Release all observers
for (std::set<Observer*>::iterator i = observers_.begin();
i != observers_.end(); ++i)
{
(*i)->ReleaseFromSubject(this);
}
}
// Add observer
void Register(Observer& observer)
{
// Lock, as we're manipulating the set of observers
CSLock lock(criticalsection_);
// Add to the set of observers
observers_.insert(&observer);
// Let it know we've accepted the registration
observer.SetSubject(this);
}
// Remove observer
void Unregister(Observer& observer)
{
// Lock, as we're manipulating the set of observers
CSLock lock(criticalsection_);
// Remove from the set of observers
observers_.erase(&observer);
// Let it know we've accepted the unregistration
observer.ReleaseFromSubject(this);
}
// Notify observers about new data
void Notify(int val) const
{
// Lock, as we're using the set of observers
CSLock lock(criticalsection_);
// Notify all
for (std::set<Observer*>::const_iterator i = observers_.begin();
i != observers_.end(); ++i)
{
(*i)->Notify(val);
}
}
// Check how many observers we have
size_t ObserverCount() const
{
return observers_.size();
}
private:
// The registered observers
std::set<Observer*> observers_;
// A critical section to guard the observers
mutable CriticalSection criticalsection_;
};
The first thing to note here is that the Subject
and Observer
are tightly coupled, which somewhat paradoxically is to help de-couple the derived observers from the Subject
. The logic and responsibility of maintaining the relationship is kept private, thanks to the friendship between the Subject
and Observer
, so that derived classes can’t affect it. This tight coupling is also the reason for making the Observer
an internal class, to emphasise this is not any old observer, but one for this particular Subject
.
Another thing worth noting is that just as the Subject::Observer
class leaves the actual handling of a Notify
call to a derived class, the Subject
class here isn’t concerned with the generation of values to notify observers with. That’s for someone else, this Subject
only handles its observers and getting notifications out to them. (Indeed, it would be a relatively trivial task to make the notification type (int
in this example) a template type, and make this a generic and re-usable Observer pattern implementation. To do so is left as an exercise to the reader. Just mind whether you notify by value, reference, or pointer.)
A final point worth making is that the CriticalSection
is declared to be mutable
. The reason for this is that it’s only altered during a function call, by the CSLock
, but at the end of the function call, it will have been restored to its previous state. By indicating it’s mutable, we can make the Notify
function const
.
So, let’s put it all together, with a custom observer that saves the latest value, a function to produce values, a thread, and a complete program.
#include <iostream>
class PastObserver : public Subject::Observer
{
// Last observed value
int value_;
// A critical section to guard the value
mutable CriticalSection criticalsection_;
public:
// Constructor,
PastObserver()
: value_(0)
{}
// Access last value
int GetLastValue() const
{
// Lock to prevent the value being modified
CSLock lock(criticalsection_);
return value_;
}
// Function called by observed Subject
virtual void Notify(int value)
{
{
// Lock to prevent the value being read while we're assigning
CSLock lock(criticalsection_);
// Store the value
value_ = value;
}
// Print it out
std::cout << "PastObserver notified: " << value_ << std::endl;
}
};
// A thread function to generate values
// Takes a Subject* as thread parameter
DWORD WINAPI ValueFunction(void* pParam)
{
Subject* subject = (Subject*)pParam;
std::cout << "Thread started with " <<
subject->ObserverCount() << " observers" << std::endl;
// Put a seed value in for the random number generator
int val = (int)time(0);
// Run until there are no more observers
while (0 < subject->ObserverCount())
{
// Get a random value to report
srand(val);
val = rand();
// Report it
subject->Notify(val);
// Take a little break
Sleep(100 * (val & 0x7));
}
std::cout << "Thread ended" << std::endl;
return 0;
}
void main()
{
// Create subject and observer
Subject subject;
PastObserver observer;
subject.Register(observer);
// Start the thread
CreateThread(NULL, NULL, &ValueFunction, &subject, NULL, NULL);
// Let it work for a bit
Sleep(1000);
// Close down and report
subject.Unregister(observer);
std::cout << "Last value: " << observer.GetLastValue() << std::endl;
// Wait for thread to terminate
::WaitForSingleObject(thread, INFINITE);
}
And that’s it. A reasonably small and clear illustration of how the Observer pattern works. In this example, we synchronise with the past, by reading the last value. Now, let’s synchronise with the future!
Reading the Future
So how do you read the future? Well, obviously, our observer has to wait for it to happen, so we’ll need another synchronisation object: the Event. This is a boolean object which can be set or reset, and waited for with WaitForSingleObject
. As it turns out, we’ll need two of those – one to indicate we’re waiting for data, and one to indicate we’ve received it.
class FutureObserver : public Subject::Observer
{
// Observed value
int value_;
// Events
HANDLE waiting_;
HANDLE newValue_;
public:
// Constructor
FutureObserver()
: value_(0),
waiting_(0),
newValue_(0)
{
// No security attributes, automatic reset, initially reset, no name
waiting_ = ::CreateEvent(0, 0, 0, 0);
newValue_ = ::CreateEvent(0, 0, 0, 0);
}
// Destructor
~FutureObserver()
{
CloseHandle(waiting_);
CloseHandle(newValue_);
}
// Wait for next value
int GetNextValue() const
{
// Indicate we're waiting
SetEvent(waiting_);
// Wait for a new value
if (WAIT_OBJECT_0 == ::WaitForSingleObject(newValue_, INFINITE))
{
// Success
return value_;
}
else
{
// Failures, could be WAIT_FAILED or WAIT_TIMEOUT
throw std::exception("Failed waiting for next value");
}
}
// Function called by observed Subject
virtual void Notify(int value)
{
// Print it out
std::cout << "FutureObserver notified: " << value << std::endl;
// Check to see if we're waiting
if (WAIT_OBJECT_0 == ::WaitForSingleObject(waiting_, 0))
{
// We were waiting, so keep the value...
value_ = value;
// ... and flag we have it
::SetEvent(newValue_);
}
}
};
This observer is a bit more complex, as it has to manage the two Event
objects, but the principle is simple enough. In the GetNextValue()
function, we set one event, and wait for the other. The next time Notify()
is called, it will see the waiting_
flag is set, so it will store the value and signal that newValue_
is ready. The events are created to reset automatically, as soon as a WaitForSingleObject
call is successful (e.g., when the event it is waiting for has been set).
The GetNextValue()
function waits infinitely here – it will not continue until it’s found a value – and so the exception should never happen, unless the FutureObserver
has been deleted in another thread. If you’d prefer a timeout, just overload the GetNextValue()
function:
// Access next value, with timeout and success indicator
int GetNextValue(DWORD millisecondTimeout, bool& timedOut) const
{
// Indicate we're waiting
SetEvent(waiting_);
// Wait for a new value
switch (::WaitForSingleObject(newValue_, millisecondTimeout))
{
case WAIT_OBJECT_0:
// Success
timedOut = false;
return value_;
case WAIT_TIMEOUT:
timedOut = true;
return 0;
default:
// WAIT_FAILED
throw std::exception("Failed waiting for next value");
}
}
The Notify()
function, in contrast, doesn’t wait at all. When WaitForSingleObject
is called with a timeout of zero milliseconds, it returns immediately, so we have to check the return value to see if we were successful. This means we’re not holding up the Subject::Nofify()
more than necessary.
Finally, let’s put it all together:
void main()
{
// Create subject and observers
Subject subject;
PastObserver past;
subject.Register(past);
FutureObserver future;
subject.Register(future);
// Start the thread
HANDLE thread = CreateThread(NULL, NULL, &ValueFunction, &subject, NULL, NULL);
// Let it work for a bit
Sleep(1000);
// Get the last and next value, twice
std::cout << "Last value: " << past.GetLastValue() << std::endl;
std::cout << "Next value: " << future.GetNextValue() << std::endl;
std::cout << "Last value: " << past.GetLastValue() << std::endl;
std::cout << "Next value: " << future.GetNextValue() << std::endl;
// Close down
subject.Unregister(past);
subject.Unregister(future);
// Wait for thread time to terminate
::WaitForSingleObject(thread, INFINITE);
}
As always, if you found this interesting or useful, or have suggestions for improvements, please let me know.
Filed under: Code, CodeProject
Tagged: C++, Multithreading, Win32