Click here to Skip to main content
Click here to Skip to main content
Go to top

Multithreaded Job Queue

, 21 May 2003
Rate this:
Please Sign up or sign in to vote.
An article to implement a Multithreaded Asynchronous Job Queue which also pools threads.

Introduction

In many of our applications we just want to get some thing done asynchronously. This is where some times we prefer to use job queue. Where we can just assign or add a job to the job queue and then it is the job queue's responsibilities to complete the job. This gives the flexibility to the main thread to concentrate on some thing else where the jobs can be processed at the background. If the job queue can be multithreaded and can process multiple jobs at the same time, it is better.

This article describes such an implementation of job queue. This implementation has the following features :

  • The job queue is multi threaded, so many jobs can be processed simultaneously. The job queue uses a pool of threads to execute the jobs.
  • The job queue supports priorities. The jobs with high priority will be added before the jobs with lower priority, in the queue. So they will be processed before other lower priority jobs.
  • The job queue can be paused, so that no new job can be processed from the time of pause. But the user still can add jobs to the queue. The jobs will be processed once the user choose to resume the job queue.
  • The number of threads that will be used by the job queue to process the jobs can be increased or decreased at run time.

Background

I thought of writing this article after getting feedback and suggestions for my earlier article A Method of Worker Thread Pooling. Thanks a lot to you all, who has gone through the article and given your feedback.

This article uses the same architecture for pooling the worker threads that are used by the job queue. You can refer back to know more about the architecture of thread pooling.

Architecture in brief

The main logical components of the architecture are:

  1. Thread pool to maintain and reuse the worker threads to execute the jobs.
  2. Priority queue to maintain the jobs. The user will just add the jobs to this queue.
  3. Observer thread to assign the jobs in queue to the free job executer threads from the thread pool.
  4. The job to be done. This will be the class that you will derive from the base class CJob.

The main architecture of the thread pool is given below in the class diagram.

About the classes

Class CJobExecuter

This class is responsible to execute the jobs. This class is associated with one worker thread. The thread is created in the constructor and the instance is passed to the thread as the parameter. The user doesn't have to create any instance of this class and also doesn't need to access or use any of its attributes or methods. This class is totally for internal use of the architecture.

Attributes
  • m_flag: This is an int attribute which is used as a flag. Two #define macro constants are defined for this variable, they are STOP_WORKING (Value is -1), KEEP_WORKING (value is 0 ). If the m_flag value is set to STOP_WORKING then the thread associated with the instance exists, else that thread is keep working.
  • m_pExecuterThread: This is a pointer of type CWinThread. This pointer keeps a reference to the thread that is associated with the class.
  • m_pJob2Do: This is a pointer of type CJob. This contains the reference to an instance of CJob or its child class. The executer remains idle if this is NULL. That is when there is no job to do and the executer is idle and the thread remains in suspended mode.
  • m_pJobQ: This is a pointer of type CMThreadedJobQ. This points to the CMThreadedJobQ to which this instance belongs.
Operations:
  • void execute(CJob* pJob): This operation is responsible for executing a job. The CMThreadedJobQ class will call this function to get a job done. User should not call this method, this is only for the use of the implementation of the multithreaded job queue.
  • void stop(): This function stops the Executer.
  • static UINT ThreadFunction(LPVOID pParam): This is the static thread function. This function is used while creating the thread in the constructor.

Class CJob

This class wraps a job. The job that can be added to the queue must be derived from this class. And the method execute() must be over ridden. The function execute() will be called back by the the executer thread.

Atrributes
  • m_Completed: This BOOL attribute represents the state of the job. If the job gets completed then the flag is TRUE, else the value is FALSE.
  • m_autoDeleteFlag: This is a BOOL attribute to represent whether the instance of CJob should be deleted or not, after the job gets completed. If TRUE the instance gets deleted automatically, so the user doesn't have to call delete again. By default this flag is TRUE.
  • m_priority : This int attribute represents the priority of the thread. The default value is 5.
  • m_ID: This is the Job ID. This ID is assigned in the constructor of the class. A static variable is maintained to keep the last used ID. So that, next ID can be generated by incrementing this.
Operations :
  • BOOL AutoDelete() : This method returns TRUE if the auto delete flag is TRUE. Else returns FALSE.
  • virtual void execute(): A pure virtual method. This method will be overridden by the child classes. This is the method which is the entry point for executing the job.
  • long getID(): This method returns the job ID.
  • int getPriority(): This method returns the priority of the job.
  • void setAutoDelete(BOOL autoDeleteFlag): This method sets the auto delete flag to either TRUE or FALSE depending on the parameter.
  • void setPriority(int priority): This methods sets the priority of the job. The priority should be changed before adding the job to the queue.

Class CMThreadedJobQ

This is the main class of the module. To use the multithreaded job queue you must create an instance of this. And then add a job to this instance, to get the job done. The job is an instance of a child class that is derived from the class CJob. This class is responsible to accept the jobs and maintain them in a priority queue. This class has an observer thread which observes the job queue and also the free executer threads. When a new executer is free and there is a new job then, immediately it assigns the new job to the executer thread.

Atrributes
  • m_NoOfExecuter: This int attribute is to maintain the number of executers that are present at a particular instance of time, to execute the jobs.
  • m_MaxNoOfExecuter: This int attribute represents the maximum limit of the executers that can be created to execute jobs. At any instance of time the total number of free, and busy executer threads all together should not exceed this number.
  • m_jobQList: This is the link list, to maintain the priority queue of jobs. This is an instance of CJobQList, which is an instantiated class of CTypedPtrList.
  • m_pause: The BOOL attribute is to know, whether the observer thread should process or not, the new jobs.
  • m_pObserverThread: This is the pointer to the thread (instance of CWinThread) which is responsible for observing and assigning the jobs to the free executer threads.
  • m_cs: This is an instance of CCriticalSection. This is used for synchronized access of shared resources between the thread.
  • m_pFreeEList: This is the pointer to the head of the linked list which maintains the free executer thread list.
  • m_pAllEList: This is the head of the linked list that maintains all the executer thread list.
Operations:
  • void addJob(CJob *pJob): This method adds a job to the job queue. The job is passed as the parameter.
  • void addJobExecuter(CJobExecuter *pEx): This method adds a CJobExecuter to the job executer linked list. This method is not required to be called by the user, this is only for the use of the implementation of the multithreaded job queue.
  • void addFreeJobExecuter(CJobExecuter *pEx): This method adds a job executer to the free job executer list. This method is not required to be used by the user, this is only for the use of the implementation of the multithreaded job queue.
  • void deleteJobExecuter(CJobExecuter *pEx): This method deletes a job executer. This method is not required to be used by the user. This is only for the use of the implementation of the multithreaded job queue.
  • CJobExecuter* getJobExecuter(): This method returns a free job executer. This method is not required to be called or used by the user. This is only for the implementation of the multithreaded job queue.
  • int getMaxNoOfExecuter(): This method returns the maximum number of allowed executer threads.
  • int getNoOfExecuter(): This method returns the number of executers present at any instance of time.
  • static UINT JobObserverThreadFunction(LPVOID pParam): This is the static thread function. This function is not to be used by the user. This function is used to create the observer thread.
  • void pause(): This pauses the multithreaded job queue, so that no new job gets processed. But the user can add new jobs to the job queue. The jobs will be processed when the user resumes again. But this does not pause currently running jobs.
  • void resume(): This resumes the multithreaded job queue. After resume all the pending jobs in the queue will be processed.
  • void setMaxNoOfExecuter(int value): This sets the maximum number of executers. If the user increases the maximum number of executers, then the multithreaded job queue will create more threads to execute more jobs concurrently. If the user decreases the max number of executers then the job queue brings down the number of executers. The bringing down process may not be immediate, if all the executers are busy in processing. In such a case the number will come down as soon as jobs are finished.

Using the code

It is very easy to use the Multithreaded Job Queue in your code. Below given are steps to use the code. Also include the sample code from the demo project.

  • Create an instance of the CMThreadedJobQ class. Refer the sample code below.
    CMThreadedJobQ m_jobQ;
  • Create a class as shown in the class diagram, which will inherit the CJob class and implement the pure virtual function void execute(). Refer the sample code below
    class CJobType1 : public CJob
    {
    public:
        CJobType1();
        virtual ~CJobType1();
        virtual void execute();    
    
    };
    
    //------------------------------
    void CJobType1::execute()
    {
    
        //Write your code here to do some process    
    }
  • Create an instance of your class, which you have defined as above. Refer the sample code below.
    CJobType1* pJob = new CJobType1();
  • If you want to change the priority of the job then change it before adding the same to the job queue. Refer the below code in which, the priority of the job is set to 6, that is one more than default. The default priority is 5.
    pJob->setPriority(6);
  • Once you created the job and initialized any thing you want, just add this job to the job queue. Once you added the job, then it is the responsibility of the job queue to complete the job. Refer the sample code below for adding a job to the job queue.
    m_jobQ.addJob(pJob);
  • To pause the job queue, so that no new jobs will be processed call the pause() method. Refer the sample code below.
    m_jobQ.pause(); 
  • To resume the job queue so that any pending or new jobs will be processed call the resume() method. Refer the sample code below.
    m_jobQ.resume();
  • To know the maximum number of threads that are allowed at any instance, call the method getMaxNoOfExecuter().
  • To change the maximum number of executer threads in the job queue call the method setMaxNoOfExecuter(). Refer the sample code below in which the maximum number of executer threads gets increased by one.
    m_jobQ.setMaxNoOfExecuter ( m_jobQ.getMaxNoOfExecuter()+1);
  • By default the m_autoDeleteFlag is TRUE. So the instance of the CJob child class will be deleted automatically when the job will get finished, that is when the execute() method returns. If you want that the instance should not be deleted automatically, then you should set the flag to false by calling the setAutoDelete() method. Refer the code given below.
    CJobType1* pJob = new CJobType1();
    pJob.setAutoDelete(FALSE)
  • If you have set the m_autoDeleteFlag flag to FALSE, then it is your responsibility to delete the job instance, you should not delete the instance before the job gets completed. You can check whether the job is completed or not from the m_Completed attribute. Refer the code given below.
    //wait till the job gets completed
    while( pJob.m_Completed != TRUE)                 
    {
        sleep(100);                                                              
    }
    //Now we can delete the job instance as the job got completed
    delete pJob;

Note

Always create the job in the heap (as shown above by calling the new operator) not on the stack. If you are creating the job in the stack then set the m_autoDeleteFlag to FALSE. This is because if the m_autoDeleteFlag flag is TRUE then the job queue will try to delete the instance.

About the demo program

The demo program does the following:

  1. Creates one instance of CMThreadedJobQ in the dialog based application
  2. Creates two different type of job class. The CJobType1 and CJobType2 as required by the architecture. That is both are inherited from CJob and implements the execute() method.
  3. Provides different buttons for adding both types of jobs. The CJobType1 type jobs gets added with priority 5 where as the CJobType2 jobs gets added with priority 6 when the respective buttons are clicked.
  4. Provides two buttons to increase the maximum number of job executer threads to be used by the Multithreaded Job Queue.
  5. Provides a check button to pause or resume the Multithreaded Job Queue.
  6. Displays all the jobs that are currently waiting to be processed with there priority and job ID in a list box.

About CJobType1 and CJobType2

Both the CJobType1 and CJobType2 describes a new job type. As per the requirement this class inherits CJob and overrides the execute() method. In both the classes the execute() method just creates a new dialog having a progress bar and increases its progress bar slowly. When the progress bar reaches 100%, it closes the dialog and the job gets finished.

Executing the demo program

After you start the program just click on the "Add Job Type1" or "Add Job Type2" buttons to add jobs to the job queue. By default the job queue gets created with two executer threads in the multithreaded job queue.

You can increase the maximum numbers of threads that can be used by clicking the "+" button, or decrease the maximum number of threads that can be used by clicking the "-" button.

To pause the job queue so that no new jobs will be processed, check the "Pause [Do not process new Jobs]" check box.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here

Share

About the Author

Pradeep Kumar Sahu
Architect Wipro Technologies
United States United States
I am a fun loving person and believe in "Work With Fun".
I love to code and design. Althought I worked on several programming laguages but I love to work with OOPL like C++ and Java.

Comments and Discussions

 
GeneralUnnecessary locks PinmemberInge Eivind Henriksen8-Jun-13 8:26 
Generalold worker thread article not available Pinmemberare891012-Jul-10 1:34 
GeneralMy vote of 5 Pinmembergabizi1-Jul-10 4:03 
GeneralGet size of xml file while downloading to use in progress bar Pinmemberkcselvaraj23-May-06 3:38 
QuestionWhy use class CSingleLock? Pinmemberempirewood19-Sep-05 15:27 
AnswerRe: Why use class CSingleLock? PinmemberRick York19-Sep-05 16:50 
GeneralWhy don't work very well... Pinmemberspeleo7414-Sep-05 23:45 
GeneralRe: Why don't work very well... Pinmembercosfrist26-May-09 16:33 
GeneralFew fixe Pinmembertyounsi31-May-05 8:54 
GeneralSuggestion PinsussAnonymous20-Feb-05 14:09 
GeneralRe: Suggestion PinsussAnonymous20-Feb-05 14:10 
GeneralSuggestion PinmemberYongki C. A. Jong5-Feb-05 12:43 
GeneralGood Article - Need Help Pinmemberasv28-Aug-04 3:26 
GeneralRe: Good Article - Need Help Pinmemberdharani8-Sep-04 3:21 
GeneralGreat code man Pinmemberdharani23-Aug-04 2:31 
GeneralRace Condition Will Leave Jobs Unexecuted and Threads Frozen PinmemberMike O'Neill27-Jun-04 9:51 
GeneralRe: Race Condition Will Leave Jobs Unexecuted and Threads Frozen PinmemberNeville Franks27-Jun-04 12:01 
GeneralRe: Race Condition Will Leave Jobs Unexecuted and Threads Frozen PinmemberYongki C. A. Jong5-Feb-05 11:51 
QuestionWhy Is Number of Threads Limited to Eleven? PinmemberMike O'Neill11-Jun-04 8:13 
Questionsockets + threads ?? Pinmemberdharani31-May-04 19:19 
GeneralWell, but Activex control Pinmemberhajer14-Apr-04 7:11 
GeneralThe problem! Pinmemberzyz17-Mar-04 4:31 
GeneralRe: The problem! Pinmemberxifan24-May-05 19:26 
Generalrun error Pinmemberstevphen13-Feb-04 14:22 
GeneralPerfect, exactly what I want PinmemberNotProfessional6-Jan-04 16:04 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web02 | 2.8.140922.1 | Last Updated 22 May 2003
Article Copyright 2003 by Pradeep Kumar Sahu
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid