Click here to Skip to main content
Click here to Skip to main content

Use Command Pattern for Handling Queued Jobs in a Multi-threaded Environment

, 19 Nov 2008 CPOL
Rate this:
Please Sign up or sign in to vote.
This is an article on how to use command pattern in a multi-threaded environment where jobs are queued for execution. With this, the thread scheduling using basic synchonizing methods, semaphores and mutex is also described.

Introduction

This article is written to describe one of the basic design patterns widely used in software development. The command pattern is a useful pattern in organizing computing tasks and managing them. It basically wraps an action or set of actions to a single command object which can be used as an execution entity. One of the usages of this pattern is to manage a queue of different types of jobs to be executed by several threads in a multi-threaded environment. This article describes a basic scenario with the sample application how this can be achieved using the command pattern.

Background

A Bit on Command Pattern for Managing Thread Pools

The command design pattern has many uses such as integrating a sequence of operations for a single command object, storing the operations temporarily for pending parameters or implementing "undo" for a sequence of operations. My take on command pattern is to manage different types of queued jobs to be executed from a single thread pool. With the command pattern used, it gives the flexibility to add different types of requests later to be queued to the same thread pool easily. The thread pool will just extract the command objects which wraps the actual work and execute them without caring about the specific operations.

Sample Image - maximum width is 600 pixels

The command pattern:

Sample Image - maximum width is 600 pixels

This is how the sample application looks like:

Sample Image - maximum width is 600 pixels

The basic classes are a set of counter classes which are used to create counter objects. These counters are executed by a fixed number of threads and these threads will take counting operations from the objects and execute them. To achieve this, a command_interface is defined which serves as the base class for the command objects. The concrete command objects wrap a specific counter and execute its counting operation for a number of specific cycles. These counter objects are created by the command_generator class and the command_schedular is the class which creates the threads and sets the number of cycles to run per each command object.

The Code

This is the command interface class. The method execute() is overridden in concrete command classes:

#include <iostream>
#include <windows.h>
#ifndef COMMAND_INTERFACE_H
#define COMMAND_INTERFACE_H
class command_interface{
public:
	virtual ~command_interface() {};

	//methods to implement with the specific action or set of actions
	virtual void execute() = 0;

	//this is just for convenience
	virtual void set_cycles(int iCycles)= 0;
};
#endif /*COMMAND_INTERFACE_H*/

This is the piece of code which creates the set of threads for the thread pool:

void work_schedular::register_threads(int iThreadCount){
	arr_threads = new HANDLE[iThreadCount];
	i_threadCount = iThreadCount;
	for ( int i = 0; i < iThreadCount; i++){
		 thread_st* thread_ptr = new thread_st;	
		 thread_ptr->p_schedular = this;
		 HANDLE tHandle = ::CreateThread(NULL, 0,
                               (unsigned long (__stdcall *)(void *))this->thread_function, 
			    (void*)thread_ptr, 0, NULL);
		 if (tHandle){	 
			 thread_ptr->thread_handle = tHandle;
			 thread_ptr->i_numOfCommands = 0;
			 arr_threads[i] = tHandle;
			 map_threads[i] = thread_ptr;
		 }
		 else{
			 delete thread_ptr;
			 thread_ptr = NULL;
		 }
		
	}
}

The function CreateThread is used in creating the threads. The handle to the created threads are stored in an array and the function thread_function is passed as the callback function.

The thread_function used is shown below:

int work_schedular::thread_function(void *param){
	command_interface* pCom = NULL;
	HANDLE arr_queue_signls[2];
	work_schedular* schedular = ((thread_st*)param)->p_schedular;
	arr_queue_signls[0] = schedular->semaphore_stop;
	arr_queue_signls[1] = schedular->semaphore_ComCount;
	
	while (true){
		//First see if the queue is not empty OR if the stop signal has given,
                  //if not any of the case, then wait!
		WaitForMultipleObjects(2, arr_queue_signls, FALSE, INFINITE);
		//Then get the hold of queue, this is needed to synchronize object
                  //extraction and insertion!
		WaitForSingleObject(schedular->mutex_lock, INFINITE );

		if (!schedular->queue_commands.empty()) {
			pCom = schedular->queue_commands.front();
			schedular->queue_commands.pop();
		}
		//Release the hold of queue !
		ReleaseMutex(schedular->mutex_lock);
		if(pCom){
			//Execute the command object
			pCom->execute();
			delete pCom;
			pCom = NULL;
			((thread_st*)param)->i_numOfCommands++;
		}
		else
                //This  queue is empty and the thread was signaled to stop!
			break;
 	}
	return 0;
}

Finally, the add_commands add the command objects to the scheduler queue. This function is executed by the thread initiated in command_generator:

void work_schedular::add_commands(command_interface *pCom){
	//Take a hold of the queue to insert command object
	WaitForSingleObject(mutex_lock, INFINITE );

	//Assign the number of cycles to be run with a random number less than 10
	pCom->set_cycles(rand() % 10);
	queue_commands.push(pCom);

	//Notify the queue is not empty !
	ReleaseSemaphore(semaphore_ComCount, 1, NULL);

	//Release the hold of queue
	ReleaseMutex(mutex_lock);
}

Points of Interest

Basically this code was written in order to see how command pattern can be used in this particular scenario. It also helped me to highlight basic thread synchronizing methods. This example can be improved by wrapping up the threads into thread objects. The Win32 functions used for synchronizing threads are somewhat easier mechanisms in thread management. WaitForSingleObject function used for both the mutex and the semaphore locks with the wait time INFINITY. These mechanisms are not hard to use as long as we know the correct place to use them. Though this code won't have any commercial value, I think this would be a good learning material for those who are interested in this concept!

History

  • 13th November, 2008: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

PrasadPerera
Student
Canada Canada
Prasad Perera is a software developer who's interested in parallel and distributed computing and graphics programming!
 
Currently he is following his masters in Concordia University, Montreal.

Comments and Discussions

 
GeneralSome improvements needed Pinmemberaxelriet15-Nov-08 15:52 
GeneralRe: Some improvements needed PinmemberPrasadSRCSE16-Nov-08 3:02 
GeneralRe: Some improvements needed Pinmemberaxelriet16-Nov-08 4:13 
GeneralRe: Some improvements needed -updated sample program! PinmemberPrasadSRCSE21-Nov-08 16:17 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web04 | 2.8.141223.1 | Last Updated 19 Nov 2008
Article Copyright 2008 by PrasadPerera
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid