Click here to Skip to main content
15,886,787 members
Articles / Programming Languages / C++

Win32 Thread Pool

Rate me:
Please Sign up or sign in to vote.
4.75/5 (42 votes)
7 Feb 2011CPOL6 min read 181.4K   7.1K   205   38
An implementation of a Thread Pool in C++ for Windows
Overview

Introduction

What is a thread pool? Exactly that, a pool of threads. You may have heard of terms like object pooling, thread pooling, car pooling (oops), anyway, the idea behind pooling is that you can re-use the objects which may be threads or database connections or instances of some class. Now why would we ever want to re-use such things? The most important reason would be that creating such an object might take up a lot of resources or time, so, we do the next best thing. We create a bunch of them initially and call the bunch a pool. Whenever someone (some code) wants to use such an object, instead of creating a new one, it gets it from the already existing bunch (pool).

Latest Updates

The code for this was written quite a while back - Sept 2005! After seeing recent action on this article, I decided to update it. Thanks to everyone who has appreciated, asked questions and most of all pointed out defects in the code.

The latest updates are listed below:

  1. The shutdown event is not a named event to allow creation of multiple thread pools across processes.
  2. Individual thread wait handles are named as PID:<processid>, TDX:<threadindex>.
  3. Fixed Create() method to check INVALID_HANDLE_VALUE when using _beginthreadex.
  4. Changed Create() method to return false if we fail to create any of the threads.
  5. Added virtual destructor to IRunObject structure.
  6. Modified Destroy() to delete IRunObject object is AutoDelete() is true.
  7. The sample application has been revamped. I must admit it wasn't very understandable. It's been made lot more simpler and demonstrates the use of the thread-pool better.

Background

Usually when I develop something, there is a fairly good reason behind it like my project demanded it or I did it for someone who asked me about it, but this thread pool was a result of neither of these. Actually, around a year back, I had attended an interview with a company where I was asked this question. Well at that time, I didn't really think of it seriously, but a few days back the question came back to me, and this time I decided it was time to get my answer working.

Using the Code

Main Files

  • threadpool.h & threadpool.cpp - defines the CThreadPool class. This class is the main class which provides the thread pooling facilities.
  • ThreadPoolAppDlg.cpp - see the OnOK() method. This method makes use of the thread pool.

This example was written using VC++ 6.0. The source has now been updated for Visual Studio 2008. The thread pool class itself does not make use of any MFC classes so you should be able to use the thread pool for pure Win32 applications also.

A Little Explanation of How the Pool Works

First, you need to create an instance of the thread pool. While creating it, you have the option of specifying the number of threads which will live in the pool. The default is 10. You can also specify whether to create the pool right now or later. If you choose to create it later, you will need to call the Create() method to actually create the threads in the pool.

Once the pool is created, the following things have already happened:

  1. The specified number of threads have been created.
  2. These threads have been added to an internal list of threads.
  3. A list has been created which will store the user requests for execution.
  4. All the threads are waiting eagerly for someone to add a work item for execution.

Once a new item has been given to the thread pool, the pool looks for the first available (free) thread. It sets off an event which makes the thread go and pop off the work item from the list of requests and immediately go about executing the request. Once execution finishes, the thread marks itself as 'free' and checks the request-list once for any pending requests. This keeps happening for the entire lifetime of the pool.

Using the Pool

Once the pool is created, you can give it work to do using two different ways.

Method 1

Write a function of the following form:

C++
DWORD WINAPI MyThreadFunc1(LPVOID param)
{
    // VERY IMPORTANT: param is a pointer to UserPoolData
    UserPoolData* poolData = (UserPoolData*)param;
	// NOTE: To get to YOUR data, you need to retrieve it from poolData
	LPVOID myData = poolData->pData;
    // do my work
    // ...
    // If you are doing long running processing, you should check if the pool 
    // is getting destroyed:
    while(poolData->pThreadPool->CheckThreadStop() == false)
    {
        // keep doing work as long as CheckThreadStop() keeps returning false
    }   
}

Once this is done, suppose the instance of the thread pool is gThreadPool, then call the Run() method of the pool. E.g.:

C++
CMyOwnData* PointerToMyOwnData = new CMyOwnData();
gThreadPool.Run(MyThreadFunc1, (void*)PointerToMyOwnData);

So, in this case what we specify are:

  • the function to execute
  • the data which will be passed into the function while it executes in a 'new' thread.

See the LPVOID param in the MyThreadFunc1 definition?

Method 2

Write your own class but make sure it derives from IRunObject (declared in RunObject.h). E.g.:

C++
class CRunner : public IRunObject
{
public:
    HWND m_hWnd;

    void Run()
    {
        CListBox list;
        list.Attach(m_hWnd);
        list.ResetContent();
        for(int nIndex=0; nIndex < 10000; nIndex++)
        {
            // Check if the pool want us to stop
            if(this->pThreadPool->CheckThreadStop())
            {
                break;
            }
        
            list.AddString(_T("Item"));
            Sleep(1000);
        }
        list.Detach();

        delete this;
    }

    bool AutoDelete()
    {
        return false;
    }
};

Once you derive a class from IRunObject you need to write code for the void Run() method and also for the bool AutoDelete() method. Write your business logic in the Run method. When the pool picks up your object, it will call its Run() method. After the Run() method finishes, the pool will call the AutoDelete() method. If this method returns true, the pool will use C++ delete to free the object. If the method returns false, the pool will not do anything to the object. Finally, you need to add the object to the queue. E.g.:

C++
CRunner* runner = new CRunner();
runner->m_hWnd = hListBox3;
gThreadPool.Run(runner); // adding to the pool

To run the demo application, go to the bin folder. Rename the file ThreadPoolApp.exe.txt to ThreadPoolApp.exe and double-click it.

If for some reason the executable is not present, please build the source code. The source has been compiled using Visual Studio 2008 SP1.

Explanation of the Demo

Image 2

The demo application is a simple dialog based MFC application. It's got three buttons and three list boxes:

  • The dialog has a thread-pool of size 2 threads created. The work that is submitted is filling the list boxes with numbers from 0 to 9. To add work to fill the first list-box, click the 'Add Work' button under it. Similarly, it is possible to add work for the other list-boxes. Since the initial threadpool contains only 2 threads, if you queue up work items for all the 3 list-boxes, you see the last list-box getting filled only after the first two list-boxes have been filled.
  • 'Create Pool' button: Clicking this button just destroys the existing pool and creates a new one. Enter the number of thread you want for the new pool and click this button. This will cause the existing threadpool to go into a 'destroying' state. The user-code which fills the listboxes check for the state by calling CheckThreadStop() on the threadpool object. If true, the user-code halts its execution.
  • 'Destroy Pool' button: Clicking this button destroys the existing pool. All work items stop their execution. All threads are destroyed. Adding new work items have no effect since the pool itself is destroyed and work-items are no longer queued.
  • 'Quit' button: Closes the application.

History

  • 17th October, 2005: Initial post
  • 31st March, 2010: Article updated
  • 8th January, 2011: Article updated
    • Fixed some memory related bugs
    • Added ability for user-code to react to pool-destruction event
    • Added new functions int GetWorkingThreadCount() and bool CheckThreadStop() to CThreadPool class
    • Updated code to run on Visual Studio 2008
  • 4th February, 2011: Updated source code

C++ is great fun. Hope you all enjoyed this article as much as I had fun writing it.

Please mail your comments to siddharth_b@yahoo.com. I'd love to hear from you.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Software Developer (Senior)
United States United States
My personal website is at http://sbytestream.pythonanywhere.com

Comments and Discussions

 
QuestionFound a issue: release compile with vs2012 on windows 8.1. Pin
spring41010-Jul-14 16:26
spring41010-Jul-14 16:26 
AnswerRe: Found a issue: release compile with vs2012 on windows 8.1. Pin
spring41010-Jul-14 20:27
spring41010-Jul-14 20:27 
QuestionDoes this program support Parallel Programming means if you have multicore machines Pin
Shailspa8-Apr-14 6:21
Shailspa8-Apr-14 6:21 
GeneralMy vote of 5 Pin
lawsonshi16-May-13 22:52
lawsonshi16-May-13 22:52 
QuestionQueueUserWorkItem Pin
shariqmuhammad29-Jan-13 11:42
shariqmuhammad29-Jan-13 11:42 
AnswerRe: QueueUserWorkItem Pin
Dave Calkins27-Feb-13 4:08
Dave Calkins27-Feb-13 4:08 
GeneralRe: QueueUserWorkItem Pin
Siddharth R Barman27-Feb-13 5:05
Siddharth R Barman27-Feb-13 5:05 
GeneralRe: QueueUserWorkItem Pin
Dave Calkins27-Feb-13 5:26
Dave Calkins27-Feb-13 5:26 
GeneralRe: QueueUserWorkItem Pin
Siddharth R Barman27-Feb-13 5:30
Siddharth R Barman27-Feb-13 5:30 
GeneralWhen all threads are busy, does it work well? Pin
saintwang19-May-11 16:26
saintwang19-May-11 16:26 
QuestionMultiple thread pools in a single application... Is it possible? Pin
Dragan Knežević2-Feb-11 6:33
Dragan Knežević2-Feb-11 6:33 
AnswerRe: Multiple thread pools in a single application... Is it possible? Pin
Siddharth R Barman4-Feb-11 6:13
Siddharth R Barman4-Feb-11 6:13 
GeneralRe: Multiple thread pools in a single application... Is it possible? Pin
Dragan Knežević5-Feb-11 3:44
Dragan Knežević5-Feb-11 3:44 
GeneralMy vote of 2 Pin
sheds11-Jan-11 22:21
sheds11-Jan-11 22:21 
GeneralA little refactoring Pin
glabute29-Sep-10 8:09
glabute29-Sep-10 8:09 
Hello Mr. Barman,

Thanks for your useful ThreadPool article. Thumbs Up | :thumbsup: Thumbs Up | :thumbsup: Thumbs Up | :thumbsup:

I've done a little work on it, initially to fix a bug, but got a bit carried away and did what amounts to a refactoring job. I hope you don't mind... I enclose my new version for you in case you're interested. The main changes are these:

- Fixed a bug causing a mismatch between the bFree flag and the actual thread state.
- Fixed a bug in Destroy() which could lead to calling delete on the wrong stuff.
- Added a call in the pool destructor to destroy the critical section.
- Simplified the API to make it safer (no need to explicitly call Destroy() before Create(), fewer parameters in the constructor, no more SetPoolSize() because incorrect use can cause a mismatch, moved private structures and defines out of the global scope).
- Cleaned up the namespace by moving all relevant classes and structs inside the class, including IRunObject.
- Removed event names, to avoid potential conflict with other processes.
- Checked for INVALID_HANDLE_VALUE instead of NULL where appropriate (easy mistake to make when dealing with handles).
- Added a virtual destructor for IRunObject.
- Removed redundant flags (bFree and m_bPoolIsDestroying). Instead, use the event states directly.
- In my zeal I eliminated the possibility of using the alternate CreateTread() API. Sorry, you'll have to re-insert it if you need it.
- Destroy() waits more politely for worker threads to terminate (won't just freeze for the whole wait time).
- Cleaned up some duplicate code (the two Run() functions were largely the same).
- Re-ordered the methods in the cpp file to match the order in the .h file - only because I'm anal that way Smile | :)

Kind regards,
Greg

ThreadPool.h:
#pragma once
// Filename		: ThreadPool.h
// Author		: Siddharth Barman
// Date			: 18 Sept 2005
// Refactored	: 29 Sept 2010 by Greg Labute
/* Description	: Defines ThreadPool class. How to use the Thread Pool. First
				create a ThreadPool object. The default constructor will 
				create 10 threads in the pool. To defer creation of the pool
				pass a pool size of 0 to the constructor. 

				You can use two approaches while working with the thread 
				pool. 

				1. To make use of the thread pool, you will need to first 
				create a function having the following signature
				DWORD WINAPI ThreadProc(LPVOID); Check the CreateThread 
				documentation in MSDN to get details. Add this function to	
				the pool by calling the Run() method and pass in the function 
				name and a void* pointer to any object you want. The pool will
				pick up the function passed into Run() method and execute as 
				threads in the pool become free. 

				2. Instead of using a function pointer, you can use an object
				of a class which derives from IRunObject. Pass a pointer to 
				this object in the Run() method of the thread pool. As threads
				become free, the thread pool will call the Run() method of you
				r class. You will also need to write a body for AutoDelete() f
				unction. If the return value is true, the thread pool will use
				'delete' to free the object you pass in. If it returns false,  
				the thread pool will not do anything else to the object after
				calling the Run() function.

				It is possible to destroy the pool whenever you want by 
				calling the Destroy() method. If you want to create a new pool
				call the Create() method. 

				If this code works, it was written by Siddharth Barman, email 
				siddharth_b@yahoo.com. 
				_____ _                        _  ______           _  
				|_   _| |                      | | | ___ \         | | 
				| | | |__  _ __ ___  __ _  __| | | |_/ /__   ___ | | 
				| | | '_ \| '__/ _ \/ _` |/ _` | |  __/ _ \ / _ \| | 
				| | | | | | | |  __/ (_| | (_| | | | | (_) | (_) | | 
				\_/ |_| |_|_|  \___|\__,_|\__,_| \_|  \___/ \___/|_|  
------------------------------------------------------------------------------*/

#include <list>
#include <vector>

class ThreadPool
{
public:
	// PoolSize is the number of worker threads that will be created.
	ThreadPool(int poolSize = 10, int waitTimeForThreadsToCompleteMS = 5000);
	virtual ~ThreadPool();

	// Use this method to create the worker threads if you specified a poolSize 
	// of 0 in the constructor, or if you want to change the number of worker threads.
	// Returns: true if no error.
	bool Create(int poolSize);

	// Use this method to destroy the worker threads. The destructor of
	// this class will destroy the threads automatically.
	void Destroy();

	// How many worker threads are in the pool?
	int GetPoolSize() { return m_threads.size(); }

	// This decides whether a job is added to the front or back of the queue.
	enum ThreadPriority { High, Low };

	// Run a job
	void Run(LPTHREAD_START_ROUTINE pFunc, LPVOID pData, ThreadPriority priority = Low)
		{ AddJobToQueue(pFunc, pData, NULL, priority); }
	struct IRunObject
	{
		virtual ~IRunObject() {}
		virtual void Run() = 0;
		virtual bool AutoDelete() = 0;
	};
	void Run(IRunObject* job, ThreadPriority priority = Low)
		{ AddJobToQueue(NULL, NULL, job, priority); }

	// This is function expected to be called by thread functions or IRunObject
	// derived. The expectation is that the code will check this 'property' of
	// the pool and stop its processing as soon as possible.
	bool CheckThreadStop() 
		return WaitForSingleObject(m_hNotifyShutdown, 0) != WAIT_TIMEOUT; }

	// How many threads are currently busy?
	int GetWorkingThreadCount();

private:
	void AddJobToQueue(
		LPTHREAD_START_ROUTINE pFunc, LPVOID pData, 
		IRunObject* runObject, ThreadPriority priority);

	// This method is called by the worker threads
	HANDLE GetWaitHandle(DWORD dwThreadId);

	// The worker thread proc
	static UINT __stdcall ThreadProc(LPVOID pParam);

	// info about worker threads will be stored here
	struct ThreadData
	{
		HANDLE hWait;
		HANDLE hThread;
		DWORD dwThreadId;
	};
	typedef std::vector<ThreadData, std::allocator<ThreadData> > ThreadList;

	// jobs passed in by clients will be stored here
	struct JobData
	{
		LPTHREAD_START_ROUTINE lpStartAddress;
		LPVOID pData;
		IRunObject* runObject;
	};
	typedef std::list<JobData, std::allocator<JobData> > JobList;

#pragma warning (push)
#pragma warning (disable : 4251) // non-dll interface, ok because member is private
	JobList m_jobList;
	ThreadList m_threads;
#pragma warning (pop)

	int m_nWaitForThreadsToDieMS; // In milli-seconds
	HANDLE m_hNotifyShutdown; // notifies worker threads to terminate immediately

	CRITICAL_SECTION m_cs;
};


ThreadPool.cpp:
#include "stdafx.h"
#include "ThreadPool.h"
#include <process.h>

ThreadPool::ThreadPool(int poolSize, int waitTimeForThreadsToCompleteMS)
{
	m_nWaitForThreadsToDieMS = waitTimeForThreadsToCompleteMS;
	m_hNotifyShutdown = NULL;

	InitializeCriticalSection(&m_cs); 

	if (poolSize > 0 && !Create(poolSize))
		throw "Unable to create thread pool";
}

ThreadPool::~ThreadPool()
{
	Destroy();
	DeleteCriticalSection(&m_cs);
}

bool ThreadPool::Create(int poolSize)
{
	Destroy();
	if (poolSize == 0)
		return true;

	// create the event which will signal the threads to stop
	m_hNotifyShutdown = CreateEvent(NULL, TRUE, FALSE, NULL);
	assert(m_hNotifyShutdown != NULL);
	if (m_hNotifyShutdown == NULL)
		return false;

	// create the threads
	for (int nIndex = 0; nIndex < poolSize; ++nIndex)
	{
		UINT uThreadId;	
		HANDLE hThread = (HANDLE)_beginthreadex(
			NULL, 0, ThreadPool::ThreadProc, this,  
			CREATE_SUSPENDED, (UINT*)&uThreadId);
		DWORD dwThreadId = uThreadId;
		assert(INVALID_HANDLE_VALUE != hThread);
		if (hThread == INVALID_HANDLE_VALUE)
			return false;

		// add the entry to the thread list
		ThreadData td;
		td.hWait = CreateEvent(NULL, TRUE, FALSE, NULL);
		td.hThread = hThread;
		td.dwThreadId = dwThreadId;
		m_threads.push_back(td);
		ResumeThread(hThread); 
	}
	return true;
}

void ThreadPool::Destroy()
{
	// Never created?
	if (!m_hNotifyShutdown)
		return;

	// tell all threads to shutdown.
	SetEvent(m_hNotifyShutdown);
	int timElapsed = 0;
	while (GetWorkingThreadCount() > 0 && timElapsed < m_nWaitForThreadsToDieMS)
	{
		Sleep(100); // give the threads a chance to complete
		timElapsed += 100;
	}

	// walk through the events and threads and close them all
	for (ThreadList::iterator iter = m_threads.begin(); iter != m_threads.end(); ++iter)
	{
		CloseHandle(iter->hWait);
		CloseHandle(iter->hThread);
	}

	// close the shutdown event
	CloseHandle(m_hNotifyShutdown);
	m_hNotifyShutdown = NULL;

	// free any jobs not released
	for (JobList::iterator funcIter = m_jobList.begin(); 
		funcIter != m_jobList.end(); funcIter++) 
		if (funcIter->runObject && funcIter->runObject->AutoDelete())
			delete funcIter->runObject;

	// empty all collections
	m_jobList.clear();
	m_threads.clear();
}

int ThreadPool::GetWorkingThreadCount()
{
	int nCount = 0;
	EnterCriticalSection(&m_cs);
	for (ThreadList::iterator iter = m_threads.begin(); iter != m_threads.end(); ++iter) 
		if (WaitForSingleObject(iter->hWait, 0) == WAIT_OBJECT_0)
			++nCount;
	LeaveCriticalSection(&m_cs);
	return nCount;
}

void ThreadPool::AddJobToQueue(
	LPTHREAD_START_ROUTINE pFunc, LPVOID pData, 
	IRunObject* runObject, ThreadPriority priority)
{
	assert(pFunc != NULL || runObject != NULL);
	JobData jobData;
	jobData.lpStartAddress = pFunc;
	jobData.pData = pData;
	jobData.runObject = runObject;

	// add it to the list
	EnterCriticalSection(&m_cs);
	{
		if (priority == Low)
			m_jobList.push_back(jobData);
		else
			m_jobList.push_front(jobData);

		// See if any threads are free
		for (ThreadList::iterator iter = m_threads.begin(); iter != m_threads.end(); ++iter)
		{
			if (WaitForSingleObject(iter->hWait, 0) == WAIT_TIMEOUT)
			{
				// here is a free thread, wake it up
				SetEvent(iter->hWait); 
				break;
			}
		}
	}
	LeaveCriticalSection(&m_cs);
}

// Called by the worker thread to get its wake-up event handle
HANDLE ThreadPool::GetWaitHandle(DWORD dwThreadId)
{
	HANDLE hWait = NULL;
	EnterCriticalSection(&m_cs);
	for (ThreadList::iterator iter = m_threads.begin(); !hWait && iter != m_threads.end(); ++iter)
		if (iter->dwThreadId == dwThreadId)
			hWait = iter->hWait;
	LeaveCriticalSection(&m_cs);
	return hWait;
}

// static method:
// This is the worker thread function which will run 
// continuously till the Thread Pool is deleted.
UINT __stdcall ThreadPool::ThreadProc(LPVOID pParam)
{
	ThreadPool* pool = static_cast<ThreadPool*>(pParam);
	assert(NULL != pool);
	if (pool == NULL)
		return 1;

	DWORD dwThreadId = GetCurrentThreadId();
	HANDLE hWaits[2] = { pool->GetWaitHandle(dwThreadId), pool->m_hNotifyShutdown };
	bool bContinue = true;
	while (bContinue)
	{	
		DWORD waitResult = WaitForMultipleObjects(2, hWaits, FALSE, INFINITE);
		if (waitResult == WAIT_FAILED)
			throw "Wait failed";
		if (waitResult == WAIT_OBJECT_0 || waitResult == WAIT_OBJECT_0 + 1)
		{
			// Shutting down?
			if (pool->CheckThreadStop())
				break;

			// We have been woken up.  Get the next available job
			EnterCriticalSection(&pool->m_cs);
			JobList::iterator iter = pool->m_jobList.begin();
			if (iter == pool->m_jobList.end())
			{
				// No job available, go back to sleep
				ResetEvent(hWaits[0]);
				LeaveCriticalSection(&pool->m_cs);
				continue;
			}
			else
			{
				// Get the job and remove it from the queue
				LPTHREAD_START_ROUTINE proc = iter->lpStartAddress;
				LPVOID data = iter->pData;
				IRunObject* runObject = iter->runObject;
				pool->m_jobList.pop_front();
				LeaveCriticalSection(&pool->m_cs);

				// Run the job
				if (proc)
					proc(data);
				else if (runObject)
				{
					bool autoDelete = runObject->AutoDelete();
					runObject->Run(); // may execute "delete this"
					if (autoDelete)
						delete runObject;
				}
			}
		}
	}
	// Thread is shutting down
	ResetEvent(hWaits[0]);
	return 0;
}


Note that RunObject.h is defunct in this version, as the IRunObject class has been moved inside ThreadPool.
GeneralRe: A little refactoring Pin
pophelix30-Dec-10 14:47
pophelix30-Dec-10 14:47 
GeneralRe: A little refactoring Pin
lijianli11-Apr-12 19:46
lijianli11-Apr-12 19:46 
GeneralRe: A little refactoring Pin
Siddharth R Barman13-Apr-12 5:00
Siddharth R Barman13-Apr-12 5:00 
GeneralRe: A little refactoring Pin
BCN-16325-Mar-13 20:08
BCN-16325-Mar-13 20:08 
GeneralRe: A little refactoring Pin
HIYueYue10-Aug-18 17:32
HIYueYue10-Aug-18 17:32 
GeneralShutdown Event Pin
Aaronov26-Sep-10 19:59
Aaronov26-Sep-10 19:59 
GeneralRe: Shutdown Event Pin
glabute29-Sep-10 7:05
glabute29-Sep-10 7:05 
GeneralRe: Shutdown Event Pin
Siddharth R Barman4-Feb-11 6:15
Siddharth R Barman4-Feb-11 6:15 
GeneralMy vote of 5 Pin
crysis1688-Aug-10 20:41
crysis1688-Aug-10 20:41 
Generalthread state error Pin
chibiya21-Jul-08 1:18
chibiya21-Jul-08 1:18 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.