Asynchronous worker queue(s) process(es) requests independent of the calling thread. These requests are pushed to a queue whose access is protected by a mutex.
Independent worker thread(s) pop items from these queues to handle them. During processing in the worker thread the mutex is unlocked so that the queue is accessible
by calling threads. A condition variable is used to signal that items need to be processed. As an optimization, it only signals a condition variable when all worker threads are idle.
In typical producer-consumer scenarios, the queue can get flooded when the consumer cannot handle requests as fast as the producer
produces them. The queue then overflows. This class offers a virtual function '
Overflow' which gets called when the number of items reaches the maximum queue size.
The default implementation drops the item, but an overridden function could use another strategy (e.g., prioritize items in the queue and drop other items).
The class presented here is named
KBaseWorkQueue. The following section lists the most important parts.
T: template parameter to specify the item. The item gets copied to and from the queue. Use shared_ptr's if the copying of the item is too heavy (e.g., video images).
nThreads: specifies the number of worker threads.
nMaxSize: max queue size before an overflow is signaled.
Add: function to add an item to the work queue
Process: pure virtual function which must be overridden to process items. Note that the mutex is unlocked when this function is called (in the context of the worker thread).
Overflow: called in the context of the calling thread in overflow cases.
This article heavily uses the example as described in the excellent book 'Programming with POSIX Threads', Butenhof 7.2. Butenhof uses pthreads, while this
one makes use of Boost.Threads as the library and therefore should be platform independent.
Boost.Threads is inspired by many concepts of pthreads.
There are already multiple articles on www.codeproject.com describing worker queues:
Using the Code
class TestQueue : public KBaseWorkQueue<int>
typedef KBaseWorkQueue<int> base;
TestQueue(size_t nThreads, size_t nMaxSize)
: base(nThreads, nMaxSize)
virtual void Process(const int n)
if (m_nThreads > 1)
m_n += n;
bool Overflow(const int n)
TestQueue queue(1, 10);
for (size_t n = 0; n != 50; ++n)
_ASSERT(queue.m_n <= 100);
Room for Improvement
Multi-threaded code is always extra complex. I have stress-tested this class on a quad core without noticing artifacts. Tests can't of course prove the absence of bugs.
Also, it would be better to wrap the unlock/lock call sequence around the '
Process' call in a 'scope' object.
- 13 November 2011 - Original version posted.