Click here to Skip to main content
Click here to Skip to main content

A programming model to use a thread pool

, 30 Sep 2000
Rate this:
Please Sign up or sign in to vote.
A class to manage the thread pool
<!-- Link to source file download -->
  • Download source files - 3 Kb
  • <!-- Link to demo file download -->
  • Download demo project - 24 Kb
  • <!-- Add the rest of your HTML here -->

    Introduction

    On many occasions we need to utilize mutiple threads to boost the system performance. The logic for each thread is almost the same, but we need to manage these threads. If the system is busy, we create more threads, otherwise we kill some thread to avoid the extra overhead.

    I have done a couple of projects involving the multiple-thread management. At last I decided to write a class to wrap this machenism. This class can dynamically allocate threads and assign jobs to these worker threads. You derive your own classes and you do not need to know the underlying machanism to handle the mutiple threading and synchronization between these threads. However, you need to make your worker classes thread safe since your objects may be assigned to different threads each time.

    The another thing I want to demonstrate is the using the feature of IOCompletion Port. I found that it is amazing easy and useful, especially when used as a way to transfer data between threads.

    Usage

    To use the thread pool class you need to derive your worker class from IWorker and your job class from IJobDesc. The processing logic must be embedded within the member function IWorker::ProcessJob(IJobDesc* pJob). After you are finished, you can declare a thread pool like this:

    CThreadPool pool;
    pool.Start(6, 10);
    //do some other jobs here
    pool.Stop();
    

    The Start function has two parameters. The first argument is the minimum number of the worker threads this thread pool object should spawn. The second argument indicates the maximum number of worker thread within this thread pool. If the thread pool is very busy working on the assigned jobs it will automatically spawn more worker threads. On the other hand, when the thread pool is idle some threads will be removed from the pool. Fine-tune these two parameters to get the best performance.

    To assign jobs to the thread pool for processing, simply call the function

    pool.ProcessJob(pJob, pWorker);
    

    You must make sure that your derived worker class is thread-safe since a worker instance may be on multiple threads simultaneously. You have no control as to whether the process is on the same thread as the last time or not.

    Note

    If the processing takes a very long time, when you call Stop(), the processing may not finished immediately. The Stop() function will wait for a maximum of 2 minutes and then return. This function has an optional argument. If this argument is set to true, the function will terminate these worker threads anyway. If this argument is set to false, these worker threads will not get terminated harshly and still live. Under this situation, you have to take care that the worker object may not exist after calling Stop() and you will get an access violation error if you attempt to access them.

    The job object must be generated on the heap using new operator. After the process ends it will automatically deleted by the framework.

    License

    This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

    A list of licenses authors might use can be found here

    Share

    About the Author

    Sherwood Hu

    United States United States
    No Biography provided

    Comments and Discussions

     
    GeneralA question about CCriticalSection PinmemberSeanQA28-Oct-08 20:26 
    GeneralWhen the menber function "Stop" is used twice and more PinmemberSeanQA28-Oct-08 20:24 
    GeneralComment on design... PinmemberNigel de Costa4-Sep-06 23:02 
    GeneralRe: Comment on design... Pinmemberalexquisi27-Mar-07 23:01 
    Generalhave memory leaks! Pinmemberpcbirdwang3-Aug-05 14:43 
    GeneralATL Server provides its own thread pool class PinmemberAlexander Gräf6-May-04 4:10 
    GeneralRemoveThreads bug PinmemberLAT26-Jul-03 9:44 
    GeneralRe: RemoveThreads bug PinmemberLAT27-Jul-03 0:54 
    GeneralRe: RemoveThreads bug PinmemberWREY27-Jun-04 6:46 
    GeneralLinker could not find "pthreadVC.lib" PinmemberWREY29-Aug-02 23:12 
    GeneralBug fixed! PinmemberXSimon25-Aug-02 16:27 
    GeneralOther considerations PinmemberBill Wilson21-Nov-01 11:48 
    GeneralRe: Other considerations PinmemberBill Wilson6-Dec-01 5:46 
    GeneralRe: Other considerations PinmemberBill Wilson6-Dec-01 6:05 
    Here is my worker thread class:
     
    #include <afxmt.h>
    #if !defined(AFX_WORKER_H__EC52A7FA_9663_11D5_9FC7_00B0D081C96F__INCLUDED_)
    #define AFX_WORKER_H__EC52A7FA_9663_11D5_9FC7_00B0D081C96F__INCLUDED_
     
    #if _MSC_VER > 1000
    #pragma once
    #endif // _MSC_VER > 1000
    #include "errorlog.h"
    class CWorker : public CWinThread  
    {
     
    public:
    	CWorker();
    	virtual ~CWorker();
     
    	BOOL m_bStopNow;
     
    	CEvent m_Event;
    //	CErrorLog *m_pErrorLog;
    	DWORD m_dwThreadID;
     
    	BOOL Doit();
    	Run();
    	void Start();
    	void SetStop();
    	BOOL IsBusy();
    	void SetBusy(BOOL bIsBusy);
    	void SetRequest(BSTR bstrRequest);
    	BSTR GetRequest();
    	void SetID(long id);
    	long GetID();
    	void SetIndex(int n);
    	GetIndex();
    	DWORD GetThreadID();
    	BOOL NotNeeded();
    	
    //	void SetFunctionThread(DWORD dwThead);
    //	DWORD GetFunctionThread();
    
     
    	// Overrides
    	
    	BOOL InitInstance();
    	BOOL ExitInstance();
    protected:
    	BSTR m_bstrRequest;
    	BOOL m_bIsBusy;
    	DWORD m_dwFunctionThread;
    	long m_lRowID;
    	int m_nThreadIndex;
    };
     
    #endif // !defined(AFX_WORKER_H__EC52A7FA_9663_11D5_9FC7_00B0D081C96F__INCLUDED_)
    
     

     
    #include "stdafx.h"
    // Include object header to get IIDs
    #if defined(_APM_)
    #include "APMObject.h"
    #endif
    #if defined(_OTCS_)
    #include "otcsobject.h"
    #endif
    #if defined(_ODCS_)
    #include "ODCSObject.h"
    #endif
    #if defined(_ODPS_)
    #include "ODPSObject.h"
    #endif
     
    #include "Worker.h"
    #include "errorinfo.h"
    #include "errorlog.h"
     
    
    #ifdef _DEBUG
    #undef THIS_FILE
    static char THIS_FILE[]=__FILE__;
    #define new DEBUG_NEW
    #endif
    int MessageArrived(BSTR bstrRequest,CWorker *parent);
    extern int GetMax();
    // Seperate tracing for Worker threads
    #define TRACEW(msg) \
    {					\
    	CString strTraceW; \
    	strTraceW.Format("Request: %d, %s",m_lRowID, msg); \
    	TRACEX(strTraceW); \
    }
     
    	
    #define DIAGW(sev, msg, sender) \
    	TRACEW(msg);  \
    	g_pErrorLog->LogError(sev, msg, sender); 
     
    #define DIAGXW(msg) \
    	DIAGW(2,msg,g_strSender);
     
    //////////////////////////////////////////////////////////////////////
    // Construction/Destruction
    //////////////////////////////////////////////////////////////////////
    
    CWorker::CWorker()
    {
    	// Initialize worker thread
    	g_strSender = OBJECTNAME + "CWorker::CWorker()";
    	SetBusy( FALSE);
    	m_bStopNow = FALSE;
    	m_lRowID = 0;
     
    
    	DIAGX("Thread Created");
    }
     
    void CWorker::SetIndex(int index)
    {
    	// We want to know where we are in the array for diagnositic purposes
    	m_nThreadIndex = index;
    }
    int CWorker::GetIndex()
    {
    	// Tell us where we are in the thread array
    	return m_nThreadIndex;
    }
     
    DWORD CWorker::GetThreadID()
    {
    	// Get our thread id
    	return m_dwThreadID;
    }
    CWorker::~CWorker()
    {
    	g_strSender = OBJECTNAME + "CWorker::~CWorker()";
    	DIAGX("Thread destroyed");
    }
     
    BOOL CWorker::InitInstance()
    {
     
    	// Initilization
    	g_strSender = OBJECTNAME + "CWorker::InitInstance()";
    	CoInitializeEx(NULL,COINIT_MULTITHREADED );
    	m_dwThreadID = GetCurrentThreadId();
    	m_bStopNow = FALSE;
    //	m_pErrorLog =new CErrorLog();
    //	DIAGX("new CErrorLog created");
    
     
    
    	return TRUE;
    }
     
    BOOL CWorker::ExitInstance()
    {
     
    	g_CS.Lock();
    	// unload us from the thread array
    	g_ThreadsArray.RemoveAt(m_nThreadIndex);
    	// reduce index of all threads in list after this one
    	for (int i = m_nThreadIndex; i < g_ThreadsArray.GetSize(); i++)
    		g_ThreadsArray[i]->m_nThreadIndex--;
    	g_CS.Unlock();
    	CoUninitialize();
    	return TRUE;
    }
     
    BOOL CWorker::IsBusy()
    {
    	// Tell them if we are busy or stopping
    	return m_bIsBusy || m_bStopNow;
    }
     
    void CWorker::SetBusy(BOOL bIsBusy)
    {
    	// Set the busy switch
    	m_bIsBusy = bIsBusy;
     
    }
     
    long CWorker::GetID()
    {
    	// Return the request id being serviced by this threaed
    	long lRow =  m_lRowID;
    	return lRow;
    }
     
    void CWorker::SetID(long lID)
    {
    //	Remember the request ID 
    
    	m_lRowID = lID;
     
    
    }
    BSTR CWorker::GetRequest()
    {
    //	return the request
    	BSTR bstr =  m_bstrRequest;
     
    	return bstr;
    }
     
    void CWorker::SetRequest(BSTR bstrRequest)
    {
    	// Store the request
    	m_bstrRequest = bstrRequest;
     
    }
     
    void CWorker::Start()
    {
    	// Start a thread
    	g_strSender = OBJECTNAME + "CWorker::Start()";
     
    	g_CS.Lock();	// Keep everybody locked out until the thread starts
    	m_Event.SetEvent();
    	DIAGX("CWorker started");
    }
    int CWorker::Run()
    {
     
    	// Entry point for create thread.
    	g_strSender = OBJECTNAME + "CWorker::Run()";
     
    	while (TRUE)
    	{
    		DIAGXW("Waiting");
    		// Wait for event set by start
    		DWORD dwWaitResult = WaitForSingleObject(m_Event,INFINITE);
    		if (m_lRowID == 0) 	// If we don't have a row ID this is a false start.
    		{
    			Sleep(10);	// WaitForSingleObject didn't wait (Don't know why, happens occasionally, try again
    			CString szErr;
    			szErr.Format("WaitForSingleObject returned %d",dwWaitResult);
    			DIAGW(2,szErr,"APMObject::Worker::Run()");
    		} else {
    			SetBusy(TRUE);
    			m_Event.ResetEvent();
    			g_CS.Unlock();	// Free the lock, let other threads start
    			
    			if (m_bStopNow) break;	// If the stop flag was set while we were asleep, terminate the thread by returning from this procedure
    
    			// Service the request
    			DIAGXW("Ready to execute");
    			int iResult = Doit();
     
    			CString szTemp;
    			szTemp.Format("Finished function execution Request: %d, thread %x, Result: %d",
    							m_lRowID, GetCurrentThreadId(), iResult);
    			DIAGXW(szTemp);
    			
    			// Signal main thread this request is complete.
    			::PostThreadMessage(_Module.dwThreadID, WM_MSG_WORKERCOMPLETE,GetID(), iResult);
    			if (GetID() == 0)	// Debug purposes, removeit
    				m_lRowID = 0;	
    			szTemp.Format("Posted message Request: %d, thread %x, Result: %d, Stop = %d",
    							m_lRowID, GetCurrentThreadId(), iResult, m_bStopNow);
    			DIAGXW(szTemp);
    			g_CS.Lock();
    			if (NotNeeded())	// Check to se if this thread is still needed
    			{
    				SetBusy(TRUE);
    				m_bStopNow = TRUE;
    				DIAG(2,"NotNeeded return TRUE","APMObject::CWorker::Run()");
    				g_CS.Unlock();
    				break; // Let surplus threads termintate
    			}
    			g_CS.Unlock();
    			m_lRowID = 0;	// clear the request id so we can tell whether the wait actually waited.
    			SetBusy(FALSE);
    		}
     
    	}
     
    	DIAGXW("Thread terminated");
    	return 0;		// Exiting this procedure terminates the thread
    	
    }
     
    BOOL CWorker::NotNeeded()
    {
     
    	return FALSE; // Malfunction if removing threads???
    
    	// Determine whether there are more than enough threads.
    	// If so, signal that this one is not needed anymore
    	CString strDiag;
    	strDiag.Format(_T("GetMax()=%d, g_ThreadsArray.GetSize()=%d"),GetMax(),g_ThreadsArray.GetSize());
    	DIAG(2,strDiag,"CWorker::NotNeeded");
     
    	// See if we are in excess of our maximum
    	long lMax = GetMax();
    	if(lMax != 0 && lMax < g_ThreadsArray.GetSize())
    	{
     
    		return TRUE; // Too many threads
    	}
    	
    	// Check to see if we're down to what we started with
    	if (g_ThreadsArray.GetSize() <= g_nInitial) return FALSE;
     
    	// Check what percentage of threads are not in use
    	int iTotalUnused = 0;
     
    	for (int i=0; i<g_ThreadsArray.GetSize();i++)
    	{
    		
    		if (!g_ThreadsArray[i]->IsBusy()) iTotalUnused++;
    	}
    	strDiag.Format("Total unused = %d",iTotalUnused);
    	DIAG(2,strDiag,"CWorker::NotNeeded");
     
    	// Thread is surplus if more than 20% are unused
    	strDiag.Format("(iTotalUnused * 100) / g_ThreadsArray.GetSize() = %d",
    					 (iTotalUnused * 100) / g_ThreadsArray.GetSize());
    	DIAG(2,strDiag,"CWorker::NotNeeded");
     
    	if ((iTotalUnused * 100) / g_ThreadsArray.GetSize() > 20) 
    	{
    		DIAG(2,"RETURN TRUE","CWorker::NotNeeded");
    		return TRUE;
    	}
    	DIAG(2,"RETURN FALSE","CWorker::NotNeeded");
    	return FALSE;
     
    }
     
    void CWorker::SetStop()
    {
    	m_bStopNow = TRUE;
    	if (!IsBusy())
    		m_Event.SetEvent();		// get the thread started so it can exit
    
    }
    // Application specific operation
    BOOL CWorker::Doit()
    {
     
    	// Service request
    	g_strSender = OBJECTNAME + "CWorker::Doit";
    	CString strMsg;
    	strMsg.Format("Starting Request: %d, on thread %x", GetID(), m_dwThreadID);
    	DIAGXW(strMsg);
    	
    	// Pefform request specific logic 
    	// MessageArrived is different for each service.
    	return MessageArrived(m_bstrRequest, this);
    }
    
     
    WARNING: There may be something wrong with the way a thread is removed. This is still a work in progress. Sorry, I didn't have time to clean it up.
     

    Bill
    GeneralThis is more like a hack than a clean solution Pinmemberzoly29-Jan-01 8:19 
    GeneralIOCP on Win16 based OSes PinsussJim Murphy4-Oct-00 4:42 
    GeneralRe: IOCP on Win16 based OSes Pinmemberconnex22-Apr-01 5:03 
    QuestionIs there any benefit using 'struct' instead of 'class' to define the interface? PinsussAnonymous26-Sep-00 8:36 
    AnswerRe: Is there any benefit using 'struct' instead of 'class' to define the interface? PinsussSherwood26-Sep-00 10:51 
    AnswerRe: Is there any benefit using 'struct' instead of 'class' to define the interface? PinsussMarius Cabas1-Oct-00 20:26 

    General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

    Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

    | Advertise | Privacy | Mobile
    Web02 | 2.8.140827.1 | Last Updated 1 Oct 2000
    Article Copyright 2000 by Sherwood Hu
    Everything else Copyright © CodeProject, 1999-2014
    Terms of Service
    Layout: fixed | fluid