In many of our applications we just want to get some thing done asynchronously. This is where some times we prefer to use job queue. Where we can just assign or add a job to the job queue and then it is the job queue's responsibilities to complete the job. This gives the flexibility to the main thread to concentrate on some thing else where the jobs can be processed at the background. If the job queue can be multithreaded and can process multiple jobs at the same time, it is better.
This article describes such an implementation of job queue. This implementation has the following features :
- The job queue is multi threaded, so many jobs can be processed simultaneously. The job queue uses a pool of threads to execute the jobs.
- The job queue supports priorities. The jobs with high priority will be added before the jobs with lower priority, in the queue. So they will be processed before other lower priority jobs.
- The job queue can be paused, so that no new job can be processed from the time of pause. But the user still can add jobs to the queue. The jobs will be processed once the user choose to resume the job queue.
- The number of threads that will be used by the job queue to process the jobs can be increased or decreased at run time.
I thought of writing this article after getting feedback and suggestions for my earlier article A Method of Worker Thread Pooling. Thanks a lot to you all, who has gone through the article and given your feedback.
This article uses the same architecture for pooling the worker threads that are used by the job queue. You can refer back to know more about the architecture of thread pooling.
Architecture in brief
The main logical components of the architecture are:
- Thread pool to maintain and reuse the worker threads to execute the jobs.
- Priority queue to maintain the jobs. The user will just add the jobs to this queue.
- Observer thread to assign the jobs in queue to the free job executer threads from the thread pool.
- The job to be done. This will be the class that you will derive from the base class
The main architecture of the thread pool is given below in the class diagram.
About the classes
This class is responsible to execute the jobs. This class is associated with one worker thread. The thread is created in the constructor and the instance is passed to the thread as the parameter. The user doesn't have to create any instance of this class and also doesn't need to access or use any of its attributes or methods. This class is totally for internal use of the architecture.
m_flag: This is an
int attribute which is used as a flag. Two
#define macro constants are defined for this variable, they are
STOP_WORKING (Value is -1),
KEEP_WORKING (value is 0 ). If the
m_flag value is set to
STOP_WORKING then the thread associated with the instance exists, else that thread is keep working.
m_pExecuterThread: This is a pointer of type
CWinThread. This pointer keeps a reference to the thread that is associated with the class.
m_pJob2Do: This is a pointer of type
CJob. This contains the reference to an instance of
CJob or its child class. The executer remains idle if this is
NULL. That is when there is no job to do and the executer is idle and the thread remains in suspended mode.
m_pJobQ: This is a pointer of type
CMThreadedJobQ. This points to the
CMThreadedJobQ to which this instance belongs.
void execute(CJob* pJob): This operation is responsible for executing a job. The
CMThreadedJobQ class will call this function to get a job done. User should not call this method, this is only for the use of the implementation of the multithreaded job queue.
void stop(): This function stops the Executer.
static UINT ThreadFunction(LPVOID pParam): This is the static thread function. This function is used while creating the thread in the constructor.
This class wraps a job. The job that can be added to the queue must be derived from this class. And the method
execute() must be over ridden. The function
execute() will be called back by the the executer thread.
BOOL attribute represents the state of the job. If the job gets completed then the flag is
TRUE, else the value is
m_autoDeleteFlag: This is a
BOOL attribute to represent whether the instance of
CJob should be deleted or not, after the job gets completed. If
TRUE the instance gets deleted automatically, so the user doesn't have to call delete again. By default this flag is
m_priority : This
int attribute represents the priority of the thread. The default value is 5.
m_ID: This is the Job ID. This ID is assigned in the constructor of the class. A static variable is maintained to keep the last used ID. So that, next ID can be generated by incrementing this.
BOOL AutoDelete() : This method returns
TRUE if the auto delete flag is
TRUE. Else returns
virtual void execute(): A pure virtual method. This method will be overridden by the child classes. This is the method which is the entry point for executing the job.
long getID(): This method returns the job ID.
int getPriority(): This method returns the priority of the job.
void setAutoDelete(BOOL autoDeleteFlag): This method sets the auto delete flag to either
FALSE depending on the parameter.
void setPriority(int priority): This methods sets the priority of the job. The priority should be changed before adding the job to the queue.
This is the main class of the module. To use the multithreaded job queue you must create an instance of this. And then add a job to this instance, to get the job done. The job is an instance of a child class that is derived from the class
CJob. This class is responsible to accept the jobs and maintain them in a priority queue. This class has an observer thread which observes the job queue and also the free executer threads. When a new executer is free and there is a new job then, immediately it assigns the new job to the executer thread.
attribute is to maintain the number of executers that are present at a particular instance of time, to execute the jobs.
int attribute represents the maximum limit of the executers that can be created to execute jobs. At any instance of time the total number of free, and busy executer threads all together should not exceed this number.
m_jobQList: This is the link list, to maintain the priority queue of jobs. This is an instance of
CJobQList, which is an instantiated class of
BOOL attribute is to know, whether the observer thread should process or not, the new jobs.
m_pObserverThread: This is the pointer to the thread (instance of
CWinThread) which is responsible for observing and assigning the jobs to the free executer threads.
m_cs: This is an instance of
CCriticalSection. This is used for synchronized access of shared resources between the thread.
m_pFreeEList: This is the pointer to the head of the linked list which maintains the free executer thread list.
m_pAllEList: This is the head of the linked list that maintains all the executer thread list.
void addJob(CJob *pJob): This method adds a job to the job queue. The job is passed as the parameter.
void addJobExecuter(CJobExecuter *pEx): This method adds a
CJobExecuter to the job executer linked list. This method is not required to be called by the user, this is only for the use of the implementation of the multithreaded job queue.
void addFreeJobExecuter(CJobExecuter *pEx): This method adds a job executer to the free job executer list. This method is not required to be used by the user, this is only for the use of the implementation of the multithreaded job queue.
void deleteJobExecuter(CJobExecuter *pEx): This method deletes a job executer. This method is not required to be used by the user. This is only for the use of the implementation of the multithreaded job queue.
CJobExecuter* getJobExecuter(): This method returns a free job executer. This method is not required to be called or used by the user. This is only for the implementation of the multithreaded job queue.
int getMaxNoOfExecuter(): This method returns the maximum number of allowed executer threads.
int getNoOfExecuter(): This method returns the number of executers present at any instance of time.
static UINT JobObserverThreadFunction(LPVOID pParam): This is the static thread function. This function is not to be used by the user. This function is used to create the observer thread.
void pause(): This pauses the multithreaded job queue, so that no new job gets processed. But the user can add new jobs to the job queue. The jobs will be processed when the user resumes again. But this does not pause currently running jobs.
void resume(): This resumes the multithreaded job queue. After resume all the pending jobs in the queue will be processed.
void setMaxNoOfExecuter(int value): This sets the maximum number of executers. If the user increases the maximum number of executers, then the multithreaded job queue will create more threads to execute more jobs concurrently. If the user decreases the max number of executers then the job queue brings down the number of executers. The bringing down process may not be immediate, if all the executers are busy in processing. In such a case the number will come down as soon as jobs are finished.
Using the code
It is very easy to use the Multithreaded Job Queue in your code. Below given are steps to use the code. Also include the sample code from the demo project.
- Create an instance of the
CMThreadedJobQ class. Refer the sample code below.
- Create a class as shown in the class diagram, which will inherit the
CJob class and implement the pure virtual function
void execute(). Refer the sample code below
class CJobType1 : public CJob
virtual void execute();
- Create an instance of your class, which you have defined as above. Refer the sample code below.
CJobType1* pJob = new CJobType1();
- If you want to change the priority of the job then change it before adding the same to the job queue. Refer the below code in which, the priority of the job is set to 6, that is one more than default. The default priority is 5.
- Once you created the job and initialized any thing you want, just add this job to the job queue. Once you added the job, then it is the responsibility of the job queue to complete the job. Refer the sample code below for adding a job to the job queue.
- To pause the job queue, so that no new jobs will be processed call the
pause() method. Refer the sample code below.
- To resume the job queue so that any pending or new jobs will be processed call the
resume() method. Refer the sample code below.
- To know the maximum number of threads that are allowed at any instance, call the method
- To change the maximum number of executer threads in the job queue call the method
setMaxNoOfExecuter(). Refer the sample code below in which the maximum number of executer threads gets increased by one.
m_jobQ.setMaxNoOfExecuter ( m_jobQ.getMaxNoOfExecuter()+1);
- By default the
TRUE. So the instance of the
CJob child class will be deleted automatically when the job will get finished, that is when the
execute() method returns. If you want that the instance should not be deleted automatically, then you should set the flag to
false by calling the
setAutoDelete() method. Refer the code given below.
CJobType1* pJob = new CJobType1();
- If you have set the
m_autoDeleteFlag flag to
FALSE, then it is your responsibility to delete the job instance, you should not delete the instance before the job gets completed. You can check whether the job is completed or not from the
m_Completed attribute. Refer the code given below.
while( pJob.m_Completed != TRUE)
Always create the job in the heap (as shown above by calling the
new operator) not on the stack. If you are creating the job in the stack then set the
FALSE. This is because if the
m_autoDeleteFlag flag is
TRUE then the job queue will try to delete the instance.
About the demo program
The demo program does the following:
- Creates one instance of
CMThreadedJobQ in the dialog based application
- Creates two different type of job class. The
CJobType2 as required by the architecture. That is both are inherited from
CJob and implements the
- Provides different buttons for adding both types of jobs. The
CJobType1 type jobs gets added with priority 5 where as the
CJobType2 jobs gets added with priority 6 when the respective buttons are clicked.
- Provides two buttons to increase the maximum number of job executer threads to be used by the Multithreaded Job Queue.
- Provides a check button to pause or resume the Multithreaded Job Queue.
- Displays all the jobs that are currently waiting to be processed with there priority and job ID in a list box.
About CJobType1 and CJobType2
CJobType2 describes a new job type. As per the requirement this class inherits
CJob and overrides the
execute() method. In both the classes the
execute() method just creates a new dialog having a progress bar and increases its progress bar slowly. When the progress bar reaches 100%, it closes the dialog and the job gets finished.
Executing the demo program
After you start the program just click on the "Add Job Type1" or "Add Job Type2" buttons to add jobs to the job queue. By default the job queue gets created with two executer threads in the multithreaded job queue.
You can increase the maximum numbers of threads that can be used by clicking the "+" button, or decrease the maximum number of threads that can be used by clicking the "-" button.
To pause the job queue so that no new jobs will be processed, check the "Pause [Do not process new Jobs]" check box.