|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Announcements
Chapters
Services
Feature Zones
|
IntroductionAs usual, this essay was prompted by seeing the same question several times in one week on the newsgroup. I've been using semaphores since 1969, shortly after Edsgar Dijkstra first proposed them in his classic paper [E. W. Dijkstra. Cooperating Sequential Processes. Programming Languages (F. Genuys, ed.), Academic Press, New York, 1968]. I used them a lot in the C.mmp/Hydra system we did at CMU in the mid-1970s, and have used them extensively in Windows. They are actually easy to use. I have discovered that far too many people use an Event instead of a Semaphore to do synchronization. They are intended to solve different problems, and one does not address the problem domain of the other. You will see in the example below that I use both, for different purposes. You may also want to read my essay on worker threads. The solution here is fully general; it can work with multiple producers and multiple consumers. In the example code that accompanies it, I show one producer and two consumers, but the solution will generalize to multiple producers. What I did was create a Queue class in C++. Rather than do a specific class or do a template class (template classes have some problems in MFC), I just created one that takes You may ask, why am I using the API calls for synchronization and not the MFC synchronization classes like The code and a demo program are all available in a source download. The following section describes the user interface to the class. BOOL Queue::AddTail(LPVOID p) This adds an item to the head of the queue and notifies the waiting thread(s) that the queue is nonempty. Returns Note: The test program ignores the Boolean value in terms of recovery and simply discards the item it was enqueuing. You may wish to do otherwise. LPVOID Queue::RemoveHead() This removes an item from the queue. If the queue is empty, this call blocks. A call that is blocked on a void Queue::shutdown()
Causes all threads that are blocked on Note: I do not stop threads from putting things into the queue during a shutdown. This would involve a Boolean flag that would be set to indicate subsequent The code for Queue.h contains the entire queue class. There is no .cpp file. Here it is in its entirety: class Queue { public: //---------------- // Queue //---------------- Queue(UINT limit) { handles[SemaphoreIndex] = ::CreateSemaphore(NULL, // no security attributes 0, // initial count limit, // max count NULL); // anonymous handles[StopperIndex] = ::CreateEvent(NULL, // no security attributes TRUE, // manual reset FALSE, // initially non-signaled NULL); // anonymous ::InitializeCriticalSection(&lock); } // Queue //---------------- // ~Queue //---------------- ~Queue() { ::CloseHandle(handles[SemaphoreIndex]); ::CloseHandle(handles[StopperIndex]); ::DeleteCriticalSection(&lock); } // ~Queue //---------------- // AddTail //---------------- BOOL AddTail(LPVOID p) { BOOL result; ::EnterCriticalSection(&lock); queue.AddTail(p); result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL); if(!result) { /* failed */ // caller can use ::GetLastError to determine what went wrong queue.RemoveTail(); } /* failed */ ::LeaveCriticalSection(&lock); return result; } // AddTail //---------------- // RemoveHead //---------------- LPVOID RemoveHead() { LPVOID result; switch(::WaitForMultipleObjects(2, handles, FALSE, INFINITE)) { /* decode */ case StopperIndex: // shut down thread ExitThread(0); // kill thread return NULL; // return keeps C compiler happy case SemaphoreIndex: // semaphore ::EnterCriticalSection(&lock); result = queue.RemoveHead(); ::LeaveCriticalSection(&lock); return result; case WAIT_TIMEOUT: // not implemented default: ASSERT(FALSE); // impossible condition return NULL; } /* decode */ } // RemoveHead //---------------- // shutdown //---------------- void shutdown() { ::SetEvent(handles[StopperIndex]); } // shutdown protected: enum {StopperIndex, SemaphoreIndex}; HANDLE handles[2]; CRITICAL_SECTION lock; CList<LPVOID, LPVOID> queue; }; The basic idea is that the Now let's look at the code in more detail: Constructor Queue(UINT limit)
{
handles[SemaphoreIndex] = ::CreateSemaphore(NULL, // no security attributes
0, // initial count
limit, // max count
NULL); // anonymous
handles[StopperIndex] = ::CreateEvent(NULL, // no security attributes
TRUE, // manual reset
FALSE, // initially non-signaled
NULL); // anonymous
::InitializeCriticalSection(&lock);
} // Queue
We will need an array of two handles for a later enum {StopperIndex, SemaphoreIndex};
Note that an While the semaphore gives us protection against trying to execute on an empty queue, and blocks the waiting thread, it does not provide for thread safety in manipulating the queue. In this case, I also have to provide a mutual-exclusion primitive to keep multiple threads from concurrently trying to enqueue or dequeue data. To do this, I create a The Event is a manual-reset event, initially non-signaled. When it comes time to shut the threads down (all the threads), all that is necessary is that I call DestructorLike any good destructor, this one deallocates resources allocated by the constructor: ~Queue()
{
::CloseHandle(handles[SemaphoreIndex]);
::CloseHandle(handles[StopperIndex]);
::DeleteCriticalSection(&lock);
} // ~Queue
AddTailThe only trick in adding an element is that a semaphore always has an upper bound; attempting to add an item above that will always fail. We deal with this by returning Otherwise, we must make sure that no more than one thread at a time is manipulating the queue. We do this by doing an What we do is add the item to the queue and then attempt to release the semaphore. If we are within the limits of the semaphore, the This technique of doing a BOOL AddTail(LPVOID p)
{
BOOL result;
::EnterCriticalSection(&lock);
queue.AddTail(p);
result = ::ReleaseSemaphore(handles[SemaphoreIndex], 1, NULL);
if(!result)
{ /* failed */
// caller can use ::GetLastError to determine what went wrong
queue.RemoveTail();
} /* failed */
::LeaveCriticalSection(&lock);
return result;
} // AddTail
RemoveHeadThis function is perhaps overkill; I chose to actually shut down the thread rather than return What makes this work is the Note that even if I call Note the only manipulation of the queue variable is done in the scope of a I do not support timeout but added the case; it falls through to the LPVOID RemoveHead()
{
LPVOID result;
switch(::WaitForMultipleObjects(2, handles, FALSE, INFINITE))
{ /* decode */
case StopperIndex: // shut down thread
::ExitThread(0); // kill thread
return NULL; // return keeps C compiler happy
case SemaphoreIndex: // semaphore
::EnterCriticalSection(&lock);
result = queue.RemoveHead();
::LeaveCriticalSection(&lock);
return result;
case WAIT_TIMEOUT: // not implemented
default:
ASSERT(FALSE); // impossible condition
return NULL;
} /* decode */
} // RemoveHead
shutdownThis is actually a trivial routine. All it does is a This does not handle the case where there might be a long compute cycle you wish to abort. In that case, you should also set a Boolean variable as I have described in my essay on worker threads. void shutdown() That's it! You now have a thread-safe queue that supports multiple threads. To test this, I wrote a little test program that you get when you download the source (or you can simply copy-and-paste from this page, but you don't get the test program). To make this more realistic, I added in an artificial delay of 200-700ms, randomly selected, so as you push the button to queue up entries you actually have a chance of getting ahead of the dequeuing. Here's the thread routine from the test program: void CQueuetestDlg::run() { CString * s = new CString; s->Format(_T("Thread %08x started"), ::GetCurrentThreadId()); PostMessage(UWM_MESSAGE, (WPARAM)::GetCurrentThreadId(), (LPARAM)s); while(TRUE) { /* thread loop */ LPVOID p = q.RemoveHead(); long n = InterlockedDecrement(&count); showCount(n); PostMessage(UWM_MESSAGE, (WPARAM)::GetCurrentThreadId(), (LPARAM)p); ::Sleep(200 + rand() % 500); // make simulated delay } /* thread loop */ } // CQueuetestDlg::run I post a message to the main GUI thread (this is executing in the context of the main Each time I add something to the queue, I call The main loop retrieves an item from the queue and posts it back to the main thread. The items in the queue are all I then introduce the random Here's an example of the program running:
Note that the items do not strictly alternate; otherwise we would see all of the even-numbered messages in the thread 1 window and all the odd-numbered messages in the thread 2 window. What do those phrases mean? Well, I found this table of buzzwords and wrote a little buzzword generator. I carry it around from project to project when I need a way of generating sentences for various input contexts. Note that after you are done with this example you can use them in your business plan. Comparison to other mechanismsI have seen a number of really bad implementations of synchronization that do not use semaphores. I will discuss them here and show why they are not sufficient. PollingI've seen a number of attempts to work with polling. Sometimes it is something of the form while(counter <= 0) /* do nothing */ ; Then someplace else the programmer does counter++; and to remove something, the programmer does counter--; Now, the first thing to understand is that Aha, you say. You should have used So if you believe that you shouldn't use a Semaphore because it is "inefficient", I can guarantee that no matter what, using a Semaphore will always be more efficient than polling. In fact, the polling usually happens at the worst possible time in your program, the time when it should be working hard creating the next object to be queued up. Yes, you can poll. But do not ever believe that it is "more efficient". It almost certainly is not. And, I should point out, you still need a CRITICAL_SECTION blockingI've seen at least one implementation that did something of the following: if(count == 0) ::EnterCriticalSection(&wait); This will not work. For one thing, between the time the test is made ( Event blockingThis has exactly the same problem as Semaphores are not mutual exclusion!I've seen code of the form ::WaitForSingleObject(semaphore, INFINITE); queue.RemoveHead(p) where the argument is that "the semaphore protects the queue manipulation". No, semaphores do not protect the queue unless the maximum semaphore value is 1. In that case you have a special case of the semaphore, the binary semaphore, which is also known as a "mutex". Note that a true Mutex object (as in There's no substitute for a SemaphoreIf you think you have invented a clever, faster, more efficient, easier, or whatever way of doing a semaphore without actually using a Semaphore, the chances approach unity that you have simply fooled yourself. Read Dijkstra's earlier papers where he was developing the notion of synchronization primitives that were preemptive-threading safe, and there was no The views expressed in these essays are those of the author, and in no way represent, nor are they endorsed by, Microsoft. Send mail to newcomer@flounder.com with questions or comments about this article.
| ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| You must Sign In to use this message board. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|