Click here to Skip to main content
Click here to Skip to main content

Queue Manager: Policy Based Design

, 21 Feb 2005
Rate this:
Please Sign up or sign in to vote.
Queue Manager class which automatically adjust the number of threads based on pending tasks in the queue. Policy based design concept is used to provide flexibility and extensibility.

Introduction

Building highly concurrent system such as internet application servers, telephony servers requires managing many information flows at once and maintaining peak throughput when demand exceeds the available resources. Many approaches to build concurrent systems have been proposed which generally fall into two categories namely, threaded and event driven programming.

In threaded programming, there is an overhead associated with creating threads and context switching among the threads for parallel execution.  Moreover, all the task can not be paralleled as input of one task might depend on output of other tasks. 

In event driven programming, if multiple CPUs are available in the system, you are not utilizing the system resources and gaining advantage of multiple CPU architecture.
There is an interesting article about concurrency in software by Herb Shutter "The Free Lunch Is Over: A fundamental Turn Toward Concurrency in Software".

Queue Base Approach:

The better approach would be to utilize both categories. The tasks which can be done in palallel should use thread based approach and those which can not be should use event-driven approach. e.g. In an application server, for an incoming client request, reading  a request from the socket, processing a request and responding back are sequential tasks and can not be paralleled. But multiple client requests can be processed in parallel. We can divide all these tasks in 3 stages. Each stage can be designed using a queue and number of threads which would be processing the tasks from the queue.

The main advantage of this approach is reducing the number of threads and context switch among threads. E.g. If an application supports 1000 sessions and if each session is handled by a separate thread, there will be 1000 threads and you can think of the overhead associated with these threads. In stage based approach, we will have multiple stages (in above example 3 stages) and each stage will have 1 or more threads based on the latency involved in each stage. For more information, see the SEDA architecture in reference section.

There are many queues and thread pools implementation available but I wanted a class which has a queue and its own thread pool  and can be easy to customize based on users requirement without modifying the class library. The class should know about the pending tasks in the queue and based on that automatically adjust the load defined by user's load balancing criteria.

Policy Based Design:

As the requirement states that it has to be easy to customize the behavior based on user defined criteria, the policy based design best suites our requirements. Users can write their own behavior and override the default one available in the library. You can read my other article Generic Pool: Policy Based Design . I will not go in to the details of policy based design. I have provided some links in the reference section and there are plenty of other resources available on the Internet.

NOTE: Readers are assumed to have good understanding of C++, templates, traits, STL, synchronization devices such as mutex and semaphore.

Requirements:

Let us identify the requirements to design Queue Manager class and see how can we fulfill each one.

  1. I have used std::list to store the task into a queue. But we might want to use std::vector or other custom container based on the application requirements.
  2. For a thread pool, I have provided a function to create threads. Also there is a function to destroy the threads if there are not many pending tasks but we have created too many threads. We might want to create and destroy threads our custom way. E.g. we might want to create non-cancelable threads or using other thread related attributes.
  3. I have provided a dummy function to process the task from the queue which just sleeps for few microseconds. Base on application, the processing task function will be different and we should be able to override this dummy function Smile | :)
  4. I have defined a load balancing criteria as if number of pending task in a queue divided by number of threads is more than 3, increase a thread in the thread pool.  If pending tasks / number of threads are less than 2 than decrease a thread from a thread pool. We defiantly need to change these load balancing criteria based on application requirements.
  5. When a queue is empty, all threads should be in sleep mode where it doesn't consume CPU. When a task is added to the queue, a thread should be signaled to process the task.
  6. Make sure all resources are synchronized as multiple threads will be adding tasks into a queue.
     

Class Design:

 Based on our requirements, we need to design a Queue Manager class which manages its own queue and a thread pool working on the queue. Let's look at each of the requirement and see how we can implement it.

  • 1st Requirement (Custom Queue container):
    Here I have used std::list as a container to store the task. But based on application requirements, we might want to use different type of container or custom developed. To allow custom container to be used without modifying the library code is little difficult. Don't worry. Policy and traits together can fulfill our requirement. By defining a  ContainerType policy and a QueueTraits will enable us to define custom container to store the task into a queue. I have provided default implementation for Container policy as a ListContainer.
    For more information about traits, see the reference section.
template<class T, typename QueueContainerPolicy>
class QueueContainerTraits;
  • 2nd Requirement (Thread Management): To create/destroy threads based on user requirements, we need to provide a way where user can override default behavior of the class. This can be implemented using ThreadCreationPolicy which has two methods, Create() and Destroy() as below. I have provided default implementation for this policy as  ThreadCreationDefault.
class ThreadManagerDefault
{
    public:
    // Create the thread
    static pthread_t Create(void* (*FuncPtr) (void*), void* obj) 
                                                  throw (QueueMgrException);
    static void Destroy();
 };
  • 3rd Requirement ( Processing a task): This requirement can be fulfilled by implementing ExecutionPolicy which has a method Process() as below. 
template<class T>
class Execution
{
    public:
        static void Process(T &obj) throw(QueueMgrException) ;
}
  • 4th Requirement (Load Balancing) : This can be implemented by defining  LoadBalancePolicy which has a method LoadBalance() which takes the number of pending tasks in the queue and the number of the threads in a thread pool as parameters. It returns 1 if a new thread should be created. It returns -1 if a thread needed to be reduced from the pool. Default return value is 0.
class LoadBalanceDefault
{
  public:
    static int LoadBalance(const unsigned nPendingTask, 
                                    const unsigned nCurrentNumThreads) ;
};
  
  • 5th Requirement (Thread Synchronization) : This can be implemented using a synchronization device like  semaphore given below.
class Semaphore
{
     sem_t m_oSem;
public:
    // initialize
    Semaphore(unsigned count = 0)
    { ... }
    // destroy
    ~Semaphore()
   { ... }
   // signal
   int Signal()
   { ... }
   // wait
   int Wait()
   { ... }

};
  • 6th Requirement (Resource Synchronization): In multithreaded applications, multiple threads will be adding tasks into our queue. We want to make sure that access to container is synchronized otherwise it will cause memory corruption. This can be implemented using mutex system device.
class Mutex
{
  pthread_mutex_t m_mMutex;
public:
  // initialize
  Mutex()
  { ... }
  // destroy
  ~Mutex()
  { ... }
  // lock
  void Lock()
  { ... }
  // Unlock
  void Unlock()
  { ... }

};
// Lock Guard class 
template<class T>
class Lock
{
  T *m_pQueueMgr;
public:
  Lock(T & obj):m_pQueueMgr(&obj)
  {
    m_pQueueMgr->Lock();
  }
  ~Lock()
  {
    m_pQueueMgr->Unlock();
  }
};
  

Let's put together all these classes and define the QueueMgr class as below. The Add() method is provided to add the task into Queue. When a task is added into the queue, one of the threads is signaled to release from wait mode. The other method provided is a  static InternalRun() method which will be executed by threads. This method remove the task from the Queue and executes the user defined Process() method. See the source code for more details.

template<class T, 
         typename ThreadCreationPolicy = ThreadCreationDefault,
         typename LoadBalancePolicy = LoadBalanceDefault,
         typename ContainerType = ListContainer,
         typename QueueTraits = QueueContainerTraits<T,ContainerType>,
         typename ExecutePolicy = Execution<T> 
          >
class QueueMgr
{
    ...............
public:
   QueueMgr(unsigned nThreads, int nMaxthreads=-1);
   void Add(T &obj) throw (QueueMgrException);
protected:
    void ResetAll();
    void CreateThreads(unsigned nNumThreads) throw (QueueMgrException);
    void DestroyThread() throw (QueueMgrException);
    static void* InternalRun(void* args) ;
};

Class Usage Example:

Here is the main() function to show the simple usage of QueueMgr class.

// My dummy task object
class MyTask
{
  public:
   void Print() { ... }
}
// Execution Policy
// Here dummy task sleeps for 100 mill sec and print the object
template<class T>
class MyProcessing
{
  public:
    static void Process(T &obj) throw(std::exception) {
        usleep(100000);
        obj.Print() ;
    }
};
// My custom load balancing criteria
class MyLoadBalanceCriteria
 {
   public:
         
     // Define your criteria about load balancing
     // Return value of this function:
     // 1: Create new thread and add to thread pool
     // -1: Remove the thread from the pool
     // 0 : No change in thread pool
     static int LoadBalance(const unsigned nPendingTask, 
             const unsigned nCurrentNumThreads)                      
     {
       if (nPendingTask/nCurrentNumThreads > 3) {
         return 1;
       }
       else if (nPendingTask/nCurrentNumThreads < 1) {
         return -1;
       }
       else return 0;
             
     }
};

int main()
{
    // Joshi is a namespace
    
    try {
        Joshi::QueueMgr<MyTask, MyProcessing<MyTask>, 
                                      MyLoadBalanceCriteria > oMgr(1);
        for(;;) 
        {
            MyTask oTask("Dummy task");
            oMgr.Add(oTask);
            usleep(1);
        
        }
    }
    catch(std::exception &oException)
    {
        std::cout << oException.what() << std::endl;
    }
    catch(...)
    {
        std::cout << "Unhandled exception" << std::endl;
    }
     return 0; 
    
}

QueueMgr.zip : When you compile the main.cpp file, don't forget to add the pthread library as below. Add DEBUG option to see the debug info. QueuMgr.zip supports only UNIX like platforms. I have compiled and tested on Solaris and Linux OS.

g++ -c -DDEBUG main.cpp -o main.o
g++ -o queuemgr main.o -lpthread

QueueMgr_ACE.zip : I have provided platform independent version of QueueMgr using ACE framework. Change the include and lib path in Makefile as per your ACE_ROOT enviorment variable. 

Limitations:

  • Fine tuning: The number of threads to be created and the latency to process the added task need to be fine-tuned. Fine-tuning takes more effort as latency and thread pool criteria depends on the load on the system , application task and available system resources.

References:

Acknowldgement

Special thanks to Sathiya Thiruvengadathan for reviewing this article.

Please do vote and comment on this article !!.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Rohit Joshi
Software Developer
United States United States
Rohit Joshi is a software engineer working for a telecom company in USA. He has development expirience using C, C++ ,C#, VoiceXML, ASR, IMAP, LDAP, HTTP, SIP, H323 on unix/linux and platforms.

Comments and Discussions

 
GeneralGuys, This article is written for Unix platform where no support for IOCP PinmemberRohit Joshi24-Jun-06 8:59 
Generalworthless article PinmemberMaximE1-Feb-05 18:04 
GeneralRe: worthless article PinmemberRohit Joshi2-Feb-05 2:56 
GeneralRe: worthless article PinmemberAndyI22-Feb-05 2:35 
GeneralRe: worthless article Pinsussyafan22-Feb-05 4:29 
GeneralRe: worthless article PinmemberJohn M. Drescher22-Feb-05 5:28 
GeneralRe: worthless article Pinmemberyafan22-Feb-05 5:52 
GeneralRe: worthless article PinmemberJohn M. Drescher22-Feb-05 9:09 
GeneralRe: worthless article Pinsussyafan22-Feb-05 15:40 
GeneralRe: worthless article PinmemberAndyI22-Feb-05 22:20 
GeneralRe: A Bettern approach? PinmemberAndyI22-Feb-05 22:33 
GeneralAnd the answer is.. PinmemberNeville Franks23-Feb-05 0:36 
GeneralRe: And the answer is.. PinmemberAndyI24-Feb-05 11:37 
GeneralRe: worthless article PinmemberJohn M. Drescher24-Feb-05 8:33 
GeneralNice Article, it complements other articles here in CP :) Pinmemberrbid29-Jan-05 4:24 
GeneralRe: Nice Article, it complements other articles here in CP :) PinmemberRohit Joshi29-Jan-05 10:32 
GeneralRe: Nice Article, it complements other articles here in CP :) PinmemberNeville Franks21-Feb-05 9:13 
GeneralRe: Nice Article, it complements other articles here in CP :) PinmemberRohit Joshi22-Feb-05 12:33 
GeneralRe: Nice Article, it complements other articles here in CP :) PinmemberNeville Franks23-Feb-05 10:17 
GeneralRe: Nice Article, it complements other articles here in CP :) PinmemberJohn M. Drescher24-Feb-05 8:36 
GeneralCompiler support PinmemberNeville Franks27-Jan-05 10:03 
GeneralRe: Compiler support PinmemberRohit Joshi29-Jan-05 10:29 
QuestionGood Job. How do I output of my queue mgr to input to other queue? Pinmemberpeakdetector23-Jan-05 4:43 
Good Job.
As you mentioned in article, output of one queue will be input to other queue in sequencial processing. When I override ExecutionPolicy where it will process the current task but there is no way I can pass output of processing function to other queue.
Let me give you example.
I have three queues.
1. ReaderQueue
2. ProcessingQueue
3. WriterQueue
 
In ReaderQueue, I read the request from socket and I need to add this requrest to ProcessingQueue. similarlly, The output of ProcessingQueue needs to be added into WriterQueue which will be a return page to request. Here ExecutionPolicy function Process() doesn't take a parameter where I can pass the other queue.
 

AnswerRe: Good Job. How do I output of my queue mgr to input to other queue? PinmemberRohit Joshi23-Jan-05 7:50 
GeneralExcellent article Pinmemberpeakdetector22-Jan-05 18:48 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web01 | 2.8.140827.1 | Last Updated 21 Feb 2005
Article Copyright 2005 by Rohit Joshi
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid