A class to synchronise thread completions






4.70/5 (28 votes)
Synchronising thread completion the easy way
Introduction
It can be very useful to break a complex task into a series of small sub-tasks; as software developers we do this all the time. Some sub-tasks must be performed in sequence whilst other sub-tasks can be processed in parallel. And sometimes there are tasks that can be performed in parallel in the lead up to some other task that cannot start until all the sub-tasks have been completed.If this sounds like multithreading you'd be right. It can be tempting, especially if you have a multi-processor machine, to launch two or more background threads to perform particular tasks. But then comes the crunch! Because each thread runs independently of the main application thread it becomes necessary to synchronise the threads to ensure that all required sub-tasks have been completed before moving on to the next. This usually involves a bunch of code to track thread handles and a call to WaitForMultipleObjects()
to wait until all threads have completed. The code itself is simple enough but it can become tedious writing it every time you need it. Hence these classes.
Basic threading API's
Before we look at the classes let's briefly review the basic threading API, _beginthreadex()
, used by the classes. This API lets us launch a thread specifying the address of the thread procedure, a void
pointer which can be used to pass data to the thread (typically a pointer to a struct
containing the data the thread needs as defined by you the programmer), the stack size for the thread, and whether the thread should run immediately or if it should be suspended until sometime later. If suspended, it won't run at all until we give it the go ahead.
There are two other parameters I'm not particularly interested in, one which lets us specify whether the thread handle can be inherited by a child process, and another that optionally points to a 32 bit location that will recieve the thread identifer. See MSDN for the details of those parameters.
What we get back from _beginthreadex()
is a HANDLE
we can later use to detect if and when the thread has terminated. It's an open handle so at some time in the future the handle needs to be closed. (Parenthetically, a very common error I've seen is programs that don't close thread handles when the thread they represent has terminated. You can detect this by opening Task Manager and adding the threads
and handles
columns to the processes display. As threads are created and terminate you see the handle count rise and keep on rising. Handles are a finite resource even on Windows NT and derived operating systems and if your application runs long enough without cleaning up its thread handles it will eventually bring the system to its knees).
We can detect if and when the thread has terminated by calling WaitForSingleObject()
passing the handle returned by _beginthreadex
. If the thread has terminated WaitForSingleObject()
will return immediately with a return value of WAIT_OBJECT_0
. If the thread is still running then WaitForSingleObject()
waits until the thread terminates or until a timeout we get to specify in the call to WaitForSingleObject()
has elapsed. If the timeout elapses WaitForSingleObject()
returns WAIT_TIMEOUT
. Simple stuff! It looks like this in code.
HANDLE hHandle = _beginthreadex(NULL, 0, MyTheadProc, NULL, 0, NULL);
switch (WaitForSingleObject(hHandle, 50000))
{
case WAIT_OBJECT_0:
// The thread has terminated - do something
break;
case WAIT_TIMEOUT:
// The timeout has elapsed but the thread is still running
// do something appropriate for a timeout
break;
}
CloseHandle(hHandle);
all of which is pretty straightforward. The 50000 is the timeout expressed in milliseconds. On the other hand, remember that you have to track the thread handle, make sure you close it and write a switch
statement to handle the two cases.
It gets more complex if you have more than one thread to track. In this case you need to use WaitForMultipleObjects()
passing it an array of thread handles. The return value from WaitForMultipleObjects()
will be either WAIT_TIMEOUT
, some error code or a value between WAIT_OBJECT_0
and WAIT_OBJECT_0 + number of handles being waited upon - 1
. The only error I've ever encountered in this situation is an invalid handle error code which indicates that one of the handles I was waiting on had been closed. Of course the error code doesn't indicate which handle has been closed!
How could the handle have been closed? Either by the programmer (me) explicitly closing the handle prematurely, or by calling _beginthread()
, which is an initially attractive thread API inasmuch as it involves rather less typing. You call _beginthread()
specifying the thread procedure address, the stacksize and the aforementioned void
pointer to data the thread needs to know about. After a call to _beginthread()
the thread is running (no suspension is possible). The gotcha is that _beginthread()
closes the thread handle before it returns to your code - which means that you cannot wait on the handle. If you try then either of the wait functions (WaitForSingleObject()
or WaitForMultipleObjects()
will immediately return with the invalid error code!
Finally, why use _beginthreadex()
instead of CreateThread()
? Quite simply, because _beginthreadex()
manages thread local storage for us, both on creating the thread and later on, when ending the thread.
CBaseThread
This class simply encapsulates a single thread. Later classes use CBaseThread
to synchronously or asynchronously wait until a given set of threads have terminated. The class is a thin wrapper around the _beginthreadex()
function. It takes care of the details of tracking the thread handle returned by _beginthreadex()
, ensuring the handle is eventually closed, and provides some member functions to access the thread handle and wait on it. The class looks like this:
class CBaseThread : public CObject
{
DECLARE_DYNAMIC(CBaseThread);
public:
CBaseThread(HANDLE hStopEvent, volatile bool *bStop,
unsigned(__stdcall *thread)(void *),
bool bWait = false, LPVOID data = NULL);
~CBaseThread();
bool IsWaiting() const { return m_bWaiting; }
volatile bool Stop() const { return *m_bStop; }
HANDLE StopEvent() const { return m_hStopEvent; }
HANDLE ThreadHandle() const { return m_hThreadHandle; }
LPVOID UserData() const { return m_pvUserData; }
virtual bool Wait(DWORD dwTimeout = INFINITE) const;
bool Run() const
{ return ResumeThread(m_hThreadHandle) == 1; }
UINT ThreadID() const { return m_uiThreadID; }
private:
LPVOID m_pvUserData;
HANDLE m_hStopEvent,
m_hThreadHandle;
volatile bool *m_bStop,
m_bWaiting;
UINT m_uiThreadID;
};
In the constructor the hStopEvent
handle and the bStop
variable are used to provide a way for the outside world to signal the thread that it ought to stop running. I'll discuss these a bit later in the article. The third parameter is the address of the thread procedure. The fourth parameter, the bWait
parameter, defaults to false
, indicating that the thread should run as soon as it's created. If bWait
is true
the thread is created but suspended and it won't run until the Run()
or the Wait()
methods are called. The data
parameter is the aforementioned void
pointer to data the thread needs. The class places no interpretation on this parameter.
Run()
lets the thread run (if it had been created suspended). Wait()
runs the thread if it had been created suspended and then waits until either the thread has terminated or until the timeout expires.
The destructor closes the thread handle.
There is a new class (added October 16th 2004) called CUserThread
which is described at the end of the article.
CSyncRendevouz
is a class that encapsulates the work of creating multiple threads, tracking their thread handles and ensuring all threads have completed before allowing the next step in processing to commence. This class performs synchronously. In other words, once you've used the class to launch a set of threads and called theWait()
method, execution in the thread that called Wait()
stops until all the threads launched via the class instance have terminated.
The class looks like this:
class CSyncRendevouz : public CObject
{
DECLARE_DYNAMIC(CSyncRendevouz);
public:
CSyncRendevouz(void);
~CSyncRendevouz(void);
void Stop()
{ m_bStop = TRUE; SetEvent(m_hStopEvent); }
virtual bool Wait(DWORD dwTimeout = INFINITE);
void AddThread(unsigned(__stdcall *thread)(void *),
bool bWait = false,
LPVOID data = NULL);
bool AddHandle(HANDLE hHandle);
protected:
CArray<HANDLE, HANDLE> m_handleArray;
CList<CBaseThread*, CBaseThread *> m_threads;
HANDLE m_hStopEvent;
volatile bool m_bStop;
};
and usage might look like this: CSyncRendevouz rendevouz;
rendevouz.AddThread(Thread1);
rendevouz.AddThread(Thread2);
rendevouz.Wait(50000);
which creates a CSyncRendevouz
object, adds Thread1
and Thread2
to the object and then calls Wait()
. In the example it waits up to 50 seconds for the threads to execute. If both threads terminate before the timeout has elapsed the Wait()
call returns true
, otherwise it returns false
.
I said above that the code snippet adds Thread1
and Thread2
to the object. A close examination of the function prototype should reveal that what's actually passed to the AddThread()
method is the address of the thread procedure, implying that the AddThread()
call is where the thread is created. This is exactly what happens, via the creation of one CBaseThread
object per thread. Let's look at the code for AddThread()
.
bool CSyncRendevouz::AddThread(unsigned(__stdcall *thread)(void *),
volatile bool bWait,
LPVOID data)
{
if (m_handleArray.GetCount() > MAXIMUM_WAIT_OBJECTS - 1)
return false;
ASSERT(thread);
CBaseThread *pThread = new CBaseThread(m_hStopEvent, &m_bStop, thread,
bWait,
data);
ASSERT(pThread);
ASSERT_KINDOF(CBaseThread, pThread);
m_threads.AddTail(pThread);
m_handleArray.Add(pThread->ThreadHandle());
return true;
}
This first checks how many handles are being monitored by this instance of CSyncRendevouz
, returning false
if we've reached the limit which is currently 64. (The limit is set by Windows). If we're not yet at the limit the function creates a new CBaseThread
object passing it a combination of user data, user parameters and a couple of objects that exist as part of the CSyncRendevouz
object. The newly created CBaseThread
object saves the handles and pointers and launches a new thread, passing its own address as the thread data. We then add the new object to a list of objects (for later deletion) and add the thread handle to an array.
When all the threads have been created it's time to monitor them. Remember from our previous discussion that the threads may be created suspended, or may be already running by the time we're ready to call the Wait()
method. The Wait()
method must, therefore, traverse the list of thread objects and allow all suspended threads to run first. This is accomplished by this code.
bool CSyncRendevouz::Wait(DWORD dwTimeout)
{
CBaseThread *pThread;
POSITION pos = m_threads.GetHeadPosition();
while (pos != POSITION(NULL))
{
pThread = m_threads.GetNext(pos);
ASSERT(pThread);
ASSERT_KINDOF(CBaseThread, pThread);
if (pThread->IsWaiting())
pThread->Run();
}
return WaitForMultipleObjects(m_handleArray.GetCount(),
m_handleArray.GetData(),
TRUE, dwTimeout) != WAIT_TIMEOUT;
}
Notice that there's also an AddHandle()
member which lets you add any waitable handle to the wait list.
CAsyncRendevouz
This class is derived from CSyncRendevouz
. When you call the Wait()
method on this class it creates another thread which performs the Wait()
function and returns immediately. When the wait has terminated (either by all threads terminating or by the timeout elapsing), the worker thread sends a message to a window. The message and the target window handle are both specified when creating the CAsyncRendevouz
object. This class looks like this:
class CAsyncRendevouz : public CSyncRendevouz
{
DECLARE_DYNAMIC(CAsyncRendevouz);
public:
CAsyncRendevouz(HWND wndTarget, UINT uiMsg,
LPVOID pvUserData = NULL);
~CAsyncRendevouz();
virtual bool Wait(DWORD dwTimeout = INFINITE);
private:
static unsigned __stdcall WaitProc(LPVOID data);
HWND m_wndTarget;
UINT m_uiMsg;
DWORD m_dwTimeout;
LPVOID m_pvUserData;
CBaseThread *m_pThread;
};
This looks pretty straightforward except, perhaps, for that LPVOID
parameter in the constructor. This is not to be confused with the parameter of the same name and type in the CSyncRendevouz::AddThread()
method. The parameter in the CSyncRendevouz::AddThread()
call is user data that's passed to the thread procedure. The parameter in the CAsyncRendevouz
constructor is data that's passed as the LPARAM
data in the windows message posted to the specified window.
CAsyncRendevouz::Wait()
looks like this.
bool CAsyncRendevouz::Wait(DWORD dwTimeout)
{
m_dwTimeout = dwTimeout;
m_pThread = new CBaseThread(m_hStopEvent, &m_bStop, WaitProc, 0,
LPVOID(this));
return TRUE;
}
which creates a CBaseThread
object passing the CAsyneRendevouz
object as the user data. The thread procedure looks like this. unsigned __stdcall CAsyncRendevouz::WaitProc(LPVOID data)
{
{
DEREF(data);
CAsyncRendevouz *pThis = (CAsyncRendevouz *) pThread->UserData();
ASSERT(pThis);
ASSERT_KINDOF(CAsyncRendevouz, pThis);
bool bResult = pThis->CSyncRendevouz::Wait(pThis->m_dwTimeout);
if (IsWindow(pThis->m_wndTarget))
::PostMessage(pThis->m_wndTarget, pThis->m_uiMsg,
WPARAM(bResult), LPARAM(pThis->m_pvUserData));
}
_endthreadex(0);
// Not reached
return 0;
}
which has a couple of things that are not immediately obvious. The first is the extra pair of braces. Strictly speaking they are not necessary in this procedure but I've learned to always use them. Why? Because I'm terminating the thread with a call to _endthreadex()
. Go look at the MSDN documentation on the function. No wiser? I'm not surprised. What they don't tell you in that documentation is that calling _endthreadex()
instantly terminates the thread and doesn't run destructors for any objects created at the same scope as the call to _endthreadex(()
. I found this out the hard way. So I always enclose the working code of a thread with a pair of braces and ensure the _endthreadex()
call is outside the scope of the working code.
CAsyncRendevouz::Wait()
calls CSyncRendevouz::Wait()
and posts the user defined message value to the designated window once the wait call exits. The exit status (timeout or all threads terminated) is passed as the WPARAM
value and the user data is passed as the LPARAM
value.
The other non-obvious thing is the DEREF
macro. It's defined thusly.
#define DEREF(data) \
rendevouz::CBaseThread *pThread = (rendevouz::CBaseThread *) data; \
ASSERT(pThread); \
ASSERT_KINDOF(CBaseThread, pThread);
All the macro does is define a variable called pThread
which is a pointer to a CBaseThread
object. Remember that I said earlier that CBaseThread
creates the thread and passes it's own address as the thread data? You're about to find out why.
Controlling a thread
There comes a time when it's necessary to terminate a thread before it's finished it's normal execution. Perhaps it's a thread processing image data that takes minutes to process. Or perhaps it's a thread monitoring a remote connection that sends data once per hour. Either way, you've followed good practice and thrown up a dialog box showing progress and a cancel button. The user clicks the cancel button. How to terminate the thread?A very bad way is to use the TerminateThread()
API.
TerminateThread
. TerminateThread is used to cause a thread to exit. When this
occurs, the target thread has no chance to execute any user-mode code and its
initial stack is not deallocated. DLLs attached to the thread are not notified
that the thread is terminating.
TerminateThread is a dangerous function that should only be
used in the most extreme cases. You should call TerminateThread
only if you know exactly what the target thread is doing, and you control all of
the code that the target thread could possibly be running at the time of the
termination. For example, TerminateThread can result in the
following problems:
- If the target thread owns a critical section, the critical section will not be released.
- If the target thread is allocating memory from the heap, the heap lock will not be released.
- If the target thread is executing certain kernel32 calls when it is terminated, the kernel32 state for the thread's process could be inconsistent.
- If the target thread is manipulating the global state of a shared DLL, the state of the DLL could be destroyed, affecting other users of the DLL.
Sounds pretty bad. In fact it's so bad that I never use TerminateThread()
. Frankly, I'd rather throw up a panic error messagebox and terminate the entire process. Just as one example, if you happen to terminate a thread when it's aquired a lock on the programs global heap all subsequent memory allocation attempts will block waiting for a thread that's no longer running to release the lock. It ain't running so it's never going to release that lock!
The only way to make a thread safely terminable is to make it aware of the outside world and to make it either poll an external variable or to include an external event object in the list of objects it's waiting on. Many threads don't need to know about the outside world. They do some work, wait a short time if necessary, timeout and exit. But other threads may be performing work that might take minutes or hours. Even if you don't give the user the option to directly terminate work in progress you still have to allow for the possibility that the user may want to terminate the entire program. The program, however, won't terminate if one of it's threads fails to terminate. Even if the user interface is torn down the program is still running and still shows in Task Manager.
In One use for Overlapped IO[^] I explained how to use an event handle to signal a thread from the outside world that it was time to terminate. That's the purpose of the event handle that's passed from CSyncRendevouz
into CBaseThread
! Your thread should execute a WaitForMultipleObjects
waiting on handles it knows about as part of the task it's performing plus the handle returned from a call to CBaseThread::StopEvent()
. When the handle from the CBaseThread::StopEvent()
is signalled it's time to cleanup and exit. Example code in your thread might look like this:
unsigned __stdcall MyThreadProc(LPVOID data)
{
{
DEREF(data);
// Create some handle we're going to wait on as part
// of normal processing
HANDLE hFile = CreateFile(PARAMETERS);
HANDLE hWaitArray[2];
bool bExit = false;
hWaitArray[0] = hFile;
hWaitArray[1] = pThread->StopHandle();
while (bExit == false)
{
// Wait on two handles. The first handle is our file handle,
// the second handle is the stop handle which is part of
// the CBaseThread object controlling us.
switch (WaitForMultipleObjects(2, hWaitArray, FALSE, dwTimeout))
{
case WAIT_TIMEOUT:
// Do something relevant
break
case WAIT_OBJECT_0:
// Something happened on the file so do something relevant;
break;
case WAIT_OBJECT_1:
// The stop event was signalled, it's time to exit.
CloseHandle(hFile);
bExit = true;
break;
}
}
}
_endthreadex(0);
// Not reached
return 0;
}
On the other hand, it's entirely possible that your thread never waits on some external event but, instead, is executing a loop. In that case it should periodically check if it should terminate by calling CBaseThread::Stop()
. If a call to CBaseThread::Stop()
returns true
it's time to cleanup and exit! Example code might look like this: unsigned __stdcall MyThreadProc(LPVOID data)
{
{
DEREF(data);
while (pThread->Stop() == false)
{
// Do some work..
}
}
_endthreadex(0);
// Not reached
return 0;
}
Of course, all of this works only if you write your threads to actually use the stop mechanisms. It also only works if code elsewhere in your program has a chance to call the CSyncRendevouz::Stop()
method. It would be a mistake to create the CSyncRendevouz
object in your main thread and then call the Wait()
method from that thread with the expectation that you could call the Stop()
method. Remember that once a thread calls Wait()
that thread stops running until all the threads controlled by the CSyncRendevouz
object have terminated! The demo project doesn't have this problem (the demo project uses threads that sleep for a maximum of 5 seconds) but real world use of the classes will. Thus most usage of the classes will be via CAsyncRendevouz
where an extra thread is created within the object to perform the wait. The main thread continues to execute and can call the Stop()
method at will. The main thread would then create a bunch of threads (via the CSyncRendevouz::AddThread()
) method to perform work and wait for the message that all threads have terminated before it schedules the next piece of work.
CUserThread
This is a class I've added for version 2 of therendevouz
library. It doesn't actually have anything to do with thread synchronisation; I added the class because I found myself using CBaseThread
in other projects (and other articles) and the original design of the class was somewhat awkward when one didn't have related threads. The awkwardness arises from the need to pass a stop handle and a pointer to a bool
variable to the constructor. If you're controlling multiple threads it makes sense to share the stop handle et al but if you're using CBaseThread
to control just one thread it becomes a bit of a pain to have to declare those variables within every consumer class. On the other hand, I've found the class makes it so much easier to control threads that I don't want to stop using it. So after about the fifteenth new project using CBaseThread
where I found myself adding the stop handle and bool
I found myself thinking there must be an easier way. The class is extremely simple and looks like this: class CUserThread : public CBaseThread
{
DECLARE_DYNAMIC(CUserThread);
public:
CUserThread(unsigned(__stdcall *thread)(void *),
bool bWait = false, LPVOID data = NULL);
virtual ~CUserThread();
void TerminateThread();
private:
volatile bool m_bStopVar;
};
The constructor omits the stop handle and the bool
variable in the parameter list but makes sure they're set up correctly for CBaseThread
thusly; CUserThread::CUserThread(unsigned(__stdcall *thread)(void *),
bool bWait, LPVOID data)
: CBaseThread(NULL, &m_bStopVar, thread, bWait, data)
{
m_bStopVar = false;
m_hStopEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
}
The destructor ensures the class cleans up after itself and the TerminateThread()
method sets both the event handle and the bool
variable to ensure the thread controlled by the object is signalled to stop.
History
- July 16 2004 - Initial version.
- July 17 2004 - Changed the wait thread in the
CAsyncRendevouz
class to pass the result of the wait (timeout or all threads terminated) as theWPARAM
member of the windows message. - October 16 2004 - Added the
CUserThread
class.