Click here to Skip to main content
Click here to Skip to main content

A synchronous observer of asynchronous events

, 13 Nov 2012
Rate this:
Please Sign up or sign in to vote.
In this article, we will write a simple multi-threaded example implementation of the Observer pattern, and show how to re-synchronise a past event to look current. Then we’ll demonstrate a technique to treat future events like they’re current, too.

Introduction

In the Observer design pattern, a subject holds a list of interested parties – the observers – which it will notify about changes in status. Simply put, it’s a form of subscription, and this design comes up in all sorts of places (which is one of the definitions of the term ‘design pattern‘). It’s well suited for handling asynchronous events, like user interaction in a GUI, sensor information, and so on.

There is, however, often a need to re-synchronise asynchronous events. For instance, you might keep the latest status update until it’s actually needed for display, storage or some calculation. By doing this, you disregard the asynchronous nature of its source, and treat it as just another variable, as if it had been read from the subject right then. In other words, you synchronise a status from the past with the present. Sometimes, though, you don’t want the last value, but the next, which is a bit more complex, as it requires you to wait for the future to happen before we can say it’s the present.

In this article, we will write a simple multi-threaded example implementation of the Observer pattern, and show how to re-synchronise a past event to look current. Then we’ll demonstrate a technique to treat future events like they’re current, too.

Thread safety

If, as is often the case, the observed subject and the part of the system that manages the observers run in different threads, we need to make sure that they can co-exist in a friendly manner. Specifically, we must make sure that adding or removing an observer – activities that takes place outside the observed thread – does not interfere with the reporting of events.

In other words, accessing the list of observers is a critical section of the code, which may not be interrupted. Since this is a quite common situation, operating systems that support multi-threading also provide tools to handle it. On Windows, this is done with a CRITICAL_SECTION object.

(If you use MFC, there is an eponymous wrapper for it. However, the implementations of the MFC synchronisation objects have been badly flawed in the past, and I believe they still are.)

Checking the documentation, we see that there are functions available to create and destroy CRITICAL_SECTION, and to enter and leave a locked state. We also see that it can be entered and left recursively, as long as it’s the same thread. Knowing all this, we can write a C++ class to manage it.

Why not use the Windows API directly? The same reason for almost all C++ wrappers of OS objects – lifetime management. Instead of having to remember to call DeleteCriticalSection everywhere it might be needed, we can do that in the destructor, as per the Resource Acquisition Is Initialization idiom.

// Need CRITICAL_SECTION declaration
#include <Windows.h>

// Class wrapping Win32 CRITICAL_SECTION
class CriticalSection
{
public:
  // Constructor
  CriticalSection()
  { 
    ::InitializeCriticalSection(&cs_); 
  }

  // Destructor
  ~CriticalSection()
  { 
    ::DeleteCriticalSection(&cs_); 
  }
  
  // Enter critical section
  void Enter()
  { 
    ::EnterCriticalSection(&cs_); 
  }

  // Leave critical section
  void Leave()
  { 
    ::LeaveCriticalSection(&cs_);
  }

private:
  // Hide copy operations
  CriticalSection(const CriticalSection&);
  CriticalSection& operator=(const CriticalSection&);
  
  // Data member
  CRITICAL_SECTION cs_;
};

Quite simple, really, with little overhead. And because every Enter must be matched by a Leave, the sensible thing to do is to write a RAII wrapper for that, too. If we don’t, odds are that at some point, we’ll alter the code using the CriticalSection and introduce a new exit point, via a function return or exception, which won’t get the Leave function called. A RAII wrapper helps code robustness.

// RAII Critical section lock
class CSLock
{
public:
  // Constructor
  CSLock(CriticalSection& section)
  : section_(section) 
  { 
    section_.Enter(); 
  }
  
  // Destructor
  ~CSLock()
  { 
    section_.Leave(); 
  }
    
private:
  // Hide copy operations
  CSLock(const CSLock&);
  CSLock& operator=(const CSLock&);
    
  // Data member
  CriticalSection& section_;
};

This automates the locking, so that we only need declare a CSLock at the beginning of the scope we wish to lock for interruption, and will automatically unlock as we leave the scope and the destructor is called. We’ll see examples of how these are used below.

Something to see

Now that we have the tools to support a multi-threaded application, let’s write a simple Observer system. This requires something to observe, and something to do the observing. For this example, we’ll make a Subject class, which declares an internal, abstract Subject::Observer class, from which we’ll derive our observers. The Subject notification in this example will send an integer to the observers.

#include <set>
#include "CSLock.h"

// Subject to be observed
class Subject
{
public:
  // Abstract observer
  class Observer
  {
    // The subject we're observing
    Subject* subject_;
    // The subject is a friend, so it can help manage our relationship
    friend class Subject;

    // Assign a subject we've been registered with
    void SetSubject(Subject* subject)
    {
      // Is it the one we have already?
      if (subject != subject_)
      {
        // Unregister from current subject, if any
        if (0 != subject_)
        {
          subject_->Unregister(*this);
        }
        // Remember new subject
        subject_ = subject;
      }
    }

    // Kicked out by the subject
    void ReleaseFromSubject(Subject* subject)
    {
      if (subject == subject_)
      {
        subject_ = 0;
      }
    }
  protected:
    // Constructor only available to derived classes
    Observer()
    : subject_(0)
    {}

    // Derived classes decide what to do with notifications
    // (Still available to Subject class, as it's a friend)
    virtual void Notify(int) = 0;

  public:
    // Base classes need virtual destructor
    virtual ~Observer()
    {
      SetSubject(0);
    }
  };
  // End of internal class definition

  // Destructor
  ~Subject()
  {
    // Lock, as we're using the set of observers
    CSLock lock(criticalsection_);
    // Release all observers
    for (std::set<Observer*>::iterator i = observers_.begin(); 
         i != observers_.end(); ++i)
    {
      (*i)->ReleaseFromSubject(this);
    }
  }

  // Add observer
  void Register(Observer& observer)
  {
    // Lock, as we're manipulating the set of observers
    CSLock lock(criticalsection_);
    // Add to the set of observers
    observers_.insert(&observer);
    // Let it know we've accepted the registration
    observer.SetSubject(this);
  }
  
  // Remove observer
  void Unregister(Observer& observer)
  {
    // Lock, as we're manipulating the set of observers
    CSLock lock(criticalsection_);
    // Remove from the set of observers
    observers_.erase(&observer);
    // Let it know we've accepted the unregistration
    observer.ReleaseFromSubject(this);
  }

  // Notify observers about new data
  void Notify(int val) const
  {
    // Lock, as we're using the set of observers
    CSLock lock(criticalsection_);
    // Notify all
    for (std::set<Observer*>::const_iterator i = observers_.begin(); 
         i != observers_.end(); ++i)
    {
      (*i)->Notify(val);
    }
  }

  // Check how many observers we have
  size_t ObserverCount() const
  {
    return observers_.size();
  }

private:
  // The registered observers
  std::set<Observer*> observers_;
  // A critical section to guard the observers
  mutable CriticalSection criticalsection_;
};

The first thing to note here is that the Subject and Observer are tightly coupled, which somewhat paradoxically is to help de-couple the derived observers from the Subject. The logic and responsibility of maintaining the relationship is kept private, thanks to the friendship between the Subject and Observer, so that derived classes can’t affect it. This tight coupling is also the reason for making the Observer an internal class, to emphasise this is not any old observer, but one for this particular Subject.

Another thing worth noting is that just as the Subject::Observer class leaves the actual handling of a Notify call to a derived class, the Subject class here isn’t concerned with the generation of values to notify observers with. That’s for someone else, this Subject only handles its observers and getting notifications out to them. (Indeed, it would be a relatively trivial task to make the notification type (int in this example) a template type, and make this a generic and re-usable Observer pattern implementation. To do so is left as an exercise to the reader. Just mind whether you notify by value, reference, or pointer.)

A final point worth making is that the CriticalSection is declared to be mutable. The reason for this is that it’s only altered during a function call, by the CSLock, but at the end of the function call it will have been restored to its previous state. By indicating it’s mutable, we can make the Notify function const.

So, let’s put it all together, with a custom observer that saves the latest value, a function to produce values, a thread, and a complete program.

#include <iostream>

class PastObserver : public Subject::Observer
{
  // Last observed value
  int value_;
  // A critical section to guard the value
  mutable CriticalSection criticalsection_;

public:
  // Constructor, 
  PastObserver()
  : value_(0)
  {}

  // Access last value
  int GetLastValue() const
  {
    // Lock to prevent the value being modified
    CSLock lock(criticalsection_);
    return value_;
  }

  // Function called by observed Subject
  virtual void Notify(int value)
  {
    {
      // Lock to prevent the value being read while we're assigning
      CSLock lock(criticalsection_);
      // Store the value
      value_ = value;
    }
    // Print it out
    std::cout << "PastObserver notified: " << value_ << std::endl;
  }
};

// A thread function to generate values
// Takes a Subject* as thread parameter
DWORD WINAPI ValueFunction(void* pParam)
{
  Subject* subject = (Subject*)pParam;

  std::cout << "Thread started with " << 
    subject->ObserverCount() << " observers" << std::endl;

  // Put a seed value in for the random number generator
  int val = (int)time(0);
  // Run until there are no more observers
  while (0 < subject->ObserverCount())
  {
    // Get a random value to report
    srand(val);
    val = rand();
    // Report it
    subject->Notify(val);
    // Take a little break
    Sleep(100 * (val & 0x7));
  }
  std::cout << "Thread ended" << std::endl;
  return 0;
}

void main()
{
  // Create subject and observer
  Subject subject;
  PastObserver observer;
  subject.Register(observer);
  
  // Start the thread
  CreateThread(NULL, NULL, &ValueFunction, &subject, NULL, NULL);

  // Let it work for a bit
  Sleep(1000);

  // Close down and report
  subject.Unregister(observer);
  std::cout << "Last value: " << observer.GetLastValue() << std::endl;

  // Wait for thread to terminate
  ::WaitForSingleObject(thread, INFINITE);
} 

And that’s it. A reasonably small and clear illustration of how the Observer pattern works. In this example, we synchronise with the past, by reading the last value. Now, let’s synchronise with the future!

Reading the future

So how do you read the future? Well, obviously, our observer has to wait for it to happen, so we’ll need another synchronisation object: the Event. This is a boolean object which can be set or reset, and waited for with WaitForSingleObject. As it turns out, we’ll need two of those – one to indicate we’re waiting for data, and one to indicate we’ve received it.

class FutureObserver : public Subject::Observer
{
  // Observed value
  int value_;

  // Events 
  HANDLE waiting_;
  HANDLE newValue_;

public:
  // Constructor
  FutureObserver()
  : value_(0),
    waiting_(0),
    newValue_(0)
  {
    // No security attributes, automatic reset, initially reset, no name
    waiting_ = ::CreateEvent(0, 0, 0, 0);
    newValue_ = ::CreateEvent(0, 0, 0, 0);
  }

  // Destructor
  ~FutureObserver()
  {
    CloseHandle(waiting_);
    CloseHandle(newValue_);
  }

  // Wait for next value
  int GetNextValue() const
  {
    // Indicate we're waiting
    SetEvent(waiting_);

    // Wait for a new value
    if (WAIT_OBJECT_0 == ::WaitForSingleObject(newValue_, INFINITE))
    {
      // Success
      return value_;
    }
    else
    {
      // Failures, could be WAIT_FAILED or WAIT_TIMEOUT
      throw std::exception("Failed waiting for next value");
    }
  }

  // Function called by observed Subject
  virtual void Notify(int value)
  {
    // Print it out
    std::cout << "FutureObserver notified: " << value << std::endl;
    // Check to see if we're waiting
    if (WAIT_OBJECT_0 == ::WaitForSingleObject(waiting_, 0))
    {
      // We were waiting, so keep the value...
      value_ = value;
      // ... and flag we have it
      ::SetEvent(newValue_);
    }
  }
};

This observer is a bit more complex, as it has to manage the two Event objects, but the principle is simple enough. In the GetNextValue() function, we set one event, and wait for the other. The next time Notify() is called, it will see the waiting_ flag is set, so it will store the value and signal that newValue_ is ready. The events are created to reset automatically, as soon as a WaitForSingleObject call is successful (eg when the event it is waiting for has been set).

The GetNextValue() function waits infinitely here – it will not continue until it’s found a value – and so the exception should never happen, unless the FutureObserver has been deleted in another thread. If you’d prefer a timeout, just overload the GetNextValue() function:

  // Access next value, with timeout and success indicator
  int GetNextValue(DWORD millisecondTimeout, bool& timedOut) const
  {
    // Indicate we're waiting
    SetEvent(waiting_);

    // Wait for a new value
    switch (::WaitForSingleObject(newValue_, millisecondTimeout))
    {
    case WAIT_OBJECT_0:
      // Success
      timedOut = false;
      return value_;
    case WAIT_TIMEOUT:
      timedOut = true;
      return 0;
    default:
      // WAIT_FAILED
      throw std::exception("Failed waiting for next value");
    }
  }

The Notify() function, in contrast, doesn’t wait at all. When WaitForSingleObject is called with a timeout of zero milliseconds, it returns immediately, so we have to check the return value to see if we were successful. This means we’re not holding up the Subject::Nofify() more than necessary.

Finally, let’s put it all together:

void main()
{
  // Create subject and observers
  Subject subject;
  PastObserver past;
  subject.Register(past);
  FutureObserver future;
  subject.Register(future);
  
  // Start the thread
  HANDLE thread = CreateThread(NULL, NULL, &ValueFunction, &subject, NULL, NULL);

  // Let it work for a bit
  Sleep(1000);

  // Get the last and next value, twice
  std::cout << "Last value: " << past.GetLastValue() << std::endl;
  std::cout << "Next value: " << future.GetNextValue() << std::endl;
  std::cout << "Last value: " << past.GetLastValue() << std::endl;
  std::cout << "Next value: " << future.GetNextValue() << std::endl;

  // Close down
  subject.Unregister(past);
  subject.Unregister(future);

  // Wait for thread time to terminate
  ::WaitForSingleObject(thread, INFINITE);
} 

As always, if you found this interesting or useful, or have suggestions for improvements, please let me know.


Filed under: Code, CodeProject Tagged: C++, Multithreading, Win32

License

This article, along with any associated source code and files, is licensed under The BSD License

Share

About the Author

Orjan Westin

United Kingdom United Kingdom
Orjan has worked as a professional developer - in Sweden and England - since 1993, using a wide range of languages (C++, Pascal, Delphi, C, C#, Visual Basic, PHP, Python and x86 assembler), but tends to return to C++.

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Mobile
Web02 | 2.8.140827.1 | Last Updated 13 Nov 2012
Article Copyright 2012 by Orjan Westin
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid