Click here to Skip to main content
Licence CPOL
First Posted 13 Nov 2011
Views 6,485
Downloads 193
Bookmarked 10 times

Work queue with Boost.Threads

By | 15 Nov 2011 | Article
Worker thread example with Boost.Threads and C++.

Introduction

Asynchronous worker queue(s) process(es) requests independent of the calling thread. These requests are pushed to a queue whose access is protected by a mutex. Independent worker thread(s) pop items from these queues to handle them. During processing in the worker thread the mutex is unlocked so that the queue is accessible by calling threads. A condition variable is used to signal that items need to be processed. As an optimization, it only signals a condition variable when all worker threads are idle.

queue1.PNG

In typical producer-consumer scenarios, the queue can get flooded when the consumer cannot handle requests as fast as the producer produces them. The queue then overflows. This class offers a virtual function 'Overflow' which gets called when the number of items reaches the maximum queue size. The default implementation drops the item, but an overridden function could use another strategy (e.g., prioritize items in the queue and drop other items).

The class presented here is named KBaseWorkQueue. The following section lists the most important parts.

Template parameter

  • T: template parameter to specify the item. The item gets copied to and from the queue. Use shared_ptr's if the copying of the item is too heavy (e.g., video images).

Data members

  • nThreads: specifies the number of worker threads.
  • nMaxSize: max queue size before an overflow is signaled.

Member functions

  • Add: function to add an item to the work queue
  • Process: pure virtual function which must be overridden to process items. Note that the mutex is unlocked when this function is called (in the context of the worker thread).
  • Overflow: called in the context of the calling thread in overflow cases.

Background

This article heavily uses the example as described in the excellent book 'Programming with POSIX Threads', Butenhof 7.2. Butenhof uses pthreads, while this one makes use of Boost.Threads as the library and therefore should be platform independent. Boost.Threads is inspired by many concepts of pthreads.

There are already multiple articles on www.codeproject.com describing worker queues:

Using the Code

Example:

//
// Test queue
//
#include "kbaseworkqueue.hpp"

class TestQueue : public KBaseWorkQueue<int>
{
   typedef KBaseWorkQueue<int> base;

public:
   TestQueue(size_t nThreads, size_t nMaxSize)
      : base(nThreads, nMaxSize)
      , m_n(0)
   {}

   virtual void Process(const int n)
   {
      if (m_nThreads > 1)
      {
         //variable is updated form multiple threads
         BOOST_INTERLOCKED_EXCHANGE_ADD(reinterpret_cast<long*>(&m_n), n);
      }
      else
      {
         m_n += n;
      }
   }

   bool Overflow(const int n)
   {
      //drop
      return base::Overflow(n);
   }
    
   int   m_n;
};


int main()
{
   TestQueue queue(1, 10);

   for (size_t n = 0; n != 50; ++n)
   {
      queue.Add(2);
   }

   queue.Stop();

   _ASSERT(queue.m_n <= 100);
   
   return 0;
}

Room for Improvement

Multi-threaded code is always extra complex. I have stress-tested this class on a quad core without noticing artifacts. Tests can't of course prove the absence of bugs. Also, it would be better to wrap the unlock/lock call sequence around the 'Process' call in a 'scope' object.

History

  • 13 November 2011 - Original version posted.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

Gast128

Software Developer (Senior)

Netherlands Netherlands

Member

Born in the Netherlands in 1971. Spent since 1998 years as a software engineer programming Windows application(s) using, UML, C++, MFC, STL, DirectShow and Boost.

Sign Up to vote   Poor Excellent
Add a reason or comment to your vote: x
Votes of 3 or less require a comment

Comments and Discussions

 
You must Sign In to use this message board. (secure sign-in)
 
Search this forum  
 FAQ
    Noise  Layout  Per page   
  Refresh
AnswerRe: Supreme Test for Health Law PinmemberMukit, Ataul0:03 15 Nov '11  

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Mobile
Web03 | 2.5.120517.1 | Last Updated 15 Nov 2011
Article Copyright 2011 by Gast128
Everything else Copyright © CodeProject, 1999-2012
Terms of Use
Layout: fixed | fluid