Introduction
I'd looked at a number of articles on the internet about multi-threading, and thread-pool designs, but could not really adopt any of them suitably to my requirements. Most of the thread pools I'd seen, had a lot of the thread management logic intertwined with the actual function that the thread was executing. I wanted a different conceptual view of it. To do this, I moved away from deriving from threads and assigning pointers to functions to execute, and latched onto the command pattern, from the inherently useful Design Patterns: Elements of Reusable Object-Oriented Software, Gamma, Helm et al. Requests are submitted as functors, allowing the functor to maintain its own environment.
Fundamentally, you submit a request in the form of a functor to a queue, and then let the thread-pool do the rest. I didn't want to have to "join" threads back to the main thread, or "wait" for completion. A thread was merely an execution process. It doesn't care what it's executing, it just keeps doing the next thing in its path.
Features
- Exception safe
- Configurable number of threads
- Configurable queue length
Usage
You'll need first of all to:
#include "ThreadPool.h"
Then, create the ThreadPool object. The simple way is simply to create the ThreadPool object thus:
ThreadPool myPool;
Once this is done, you can call myPool.accept() to prepare the thread-pool to accept connections.
A second way, provides more control. We can derive from the ThreadPool class, and override the onThreadStart and onThreadFinish methods to provide thread specific information.
class COMPool : public ThreadPool
{
public:
void onThreadStart(int threadId)
{
::OleInitialize(NULL);
}
void onThreadFinish(int threadId)
{
::OleUninitialize();
}
};
We also need to create one or more functors which we can request the thread pool to handle. These are created by deriving from ThreadRequest. If you wish to pass in parameters, or retrieve information from the functor, you can provide these in a constructor. Also, any necessary cleanup can be done in the destructor.
class SpecialRequest : public ThreadRequest
{
public:
SpecialRequest(int param1, int param2, int& retStat) :
myLocal1(param1), myLocal2(param2), myStatus(retStat)
{
}
void operator()(int)
{
retStatus = myLocal1 + myLocal2;
}
private:
int myLocal1;
int myLocal2;
int& retStatus;
};
To submit the request, we use the previously defined ThreadPool object, and submit the functor into the queue. In this case, creating it at the same time:
myPool.submitRequest(new SpecialRequest(1, 2, returnVal);
Note that the SpecialRequest has the two parameters being passed in, 1 and 2, and the reference to the returnValue for the functor to populate when it is completed. Note that the functor must be created on heap memory using new, because the thread pool will delete it.
Once we are finished with the thread-pool, we can shut it down using
myPool.shutdown();
So, our main loop looks like this:
int main(int, char*)
{
int result;
COMPool myCOMThreadPool;
myCOMThreadPool.accept();
myCOMThreadPool.submitRequest(new SpecialRequest(1, 2, result));
myCOMThreadPool.shutdown();
std::cout << result;
}
The demonstration project contains a more explicit example using multiple threads, loops and thread statistics
Note: It's important that you handle any exceptions that your code may throw in the overridden operator()(int). The acceptHandler will not allow any exceptions to propagate out of its loop in order to maintain integrity. This is not actually a bad thing, because your functor should be able to handle its own exceptions anyway.
The gory details
For the individuals who REALLY want to know how it works, the entire cpp and h files are commented using doxygen, and the tricky clever bits are commented also. What I'll try to explain here is why I've done certain things.
The guiding principle that I tried to follow was that a thread is a completely separate entity to the thing that it is executing. The idea being to disconnect the management of the threads and the execution completely. This would enable the thread-pool to be used easily for database calls - simulating an asynchronous call effectively - HTTP responses via fast-CGI, and calculations of PI to (insert arbitrarily large number here) decimal places, whilst simultaneously making me a coffee. I didn't want the thread to be aware of the execution status of the function/functor, and I didn't want the functor to be aware that it was being executed in a thread.
This provides a slightly different programming model to what would otherwise be expected. In many of the implementations I've seen, the worker thread is responsible for signalling it is finished, and the "main" thread of execution is responsible for waiting until the worker thread has signalled this. Then, the main thread needs to clean up the worker, or query the worker to get the workers results. Some of the implementations passed an arbitrary number of pointers around the place to indicate parameters and return values.
In this implementation, the worker thread doesn't wait at all. It executes the job and then cleans up after itself. If you need to get results, you can provide a pointer or reference to a structure in the constructor, and use the functor to populate that structure as part of its implementation. For example, you can send the results of a database commit into the passed constructor, and check this at anytime to see whether it has committed yet. Or, you can pass an entire business object, such as a Purchase Order, into the constructor, and make the functor responsible for populating the values. The thread doesn't care. All it does, is populates and cleans up after itself. If your application needs to wait for something to happen, you can include a signal (event) in the functor, and put the main thread to sleep until the signal is signalled. (sugged?) Again, you are waiting for the functor to complete not the thread. A conceptual difference, but I think a more accurate representation.
The second thing that I had to add was an onThreadStart and onThreadFinish call. In the simple case, this is a no-op, however by deriving from ThreadPool you can make these do whatever you like. I had to add these because COM needed to be initialized per thread when I was using this for my OLEDB calls.
The queue itself when it reaches the maximum queue length will actually block, which in the case of the example provided will also block the main thread. This has the affect of preventing additional requests, allowing the pool time to catch up.
See my todo list at the bottom, for improvement ideas I have for this
I also used functors because for the OLEDB calls, I really needed transaction integrity. To provide this, the constructor of the functor could perform the beginTransaction() and the destructor would call commit() or rollback() depending on the private transactionSuccess flag. This ensured that either commit or rollback was called, regardless of the outcome, and improved exception safety incredibly. After implementing it this way, I realized just how effective functors were, so I ended up using them for the generic solution.
The beauty of this is that the functor contains everything that it needs to execute. Because it can maintain state, you can actually pass parameters in the constructor, and use these parameters to get information about the current functor object, including its state of execution, and the data returned. The best part though, is that by using a functor, you can test the functor itself outside of the multithreaded environment quite easily - allowing easy integration for unit tests.
Another benefit of functors is that they can maintain their own environment. In the case of the FastCGI application, I needed to pass the address of the output structure to return the output back to the webserver. I did this passing the environment, including the CGI parameters, the error stream and the output stream into the functor during construction. This meant that the same functor was perfectly thread safe because it was a completely individual object, and yet it had access to the environment that was created - at that time. On execution, it would always write to the correct output stream. This provided thread safety, without the need for mutexes and critical sections etc.
The acceptHandler has been declared as throwing no exceptions. This is necessary, because exceptions would potentially disrupt the safety of other threads. If a functor throws an exception, and it's not handled in the functor (This is bad!), the handler will simply swallow the exception, and you will not see it. You should not bank on this functionality though, because it may change. (Not sure how, or why, but if someone comes up with a better method, I'll change it in a flash)
Todo
All these are tentative ideas, and not things that I'll implement until I actually need them.
- More debug information - i.e. Thread Information Block etc.
- Thread-pool monitoring
- average queue length
- average response time
- A
QueueFullException to be optionally added instead of just blocking. May do this as a templated policy class (See Modern C++ Design, Andrei Alexandrescu)
- Dynamic addition and deletion of threads.
- Forced shutdown
Acknowldgement
Special thanks to Taka for reviewing the code for this article.
Changes and edits
- 18/01/2002: Fixed the apostrophe on it because it was bugging me.
|
|
 |
 | Current Code John Simmons / outlaw programmer | 5:36 1 Dec '06 |
|
 |
Is the download in this article representative of the latest changes?
[Edit]Doesn't look like it - all files are dated 06/2003 or earlier. Any chance of an update?[/edit]
"Why don't you tie a kerosene-soaked rag around your ankles so the ants won't climb up and eat your candy ass..." - Dale Earnhardt, 1997 ----- "...the staggering layers of obscenity in your statement make it a work of art on so many levels." - Jason Jystad, 10/26/2001
|
|
|
|
 |
|
 |
I haven't programmed in C++ for about 18 months now. I should pick it up and update it, and I still have emails of requests... but I just never seem to find any time anymore for this. Which is a bit sad really.
I'll bump it up my priority list and see what happens - but I'm making no promises.
Peter Hancock My blog is here
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
 | Which build option is advisable /MD or /MT ? ana_v123 | 20:11 29 Mar '06 |
|
 |
Hi
We are developing multi-threaded Applications with Visual C++ 6.
We have following main and sub-projects 1. Main Server Application (ATL-DCOM Application - a multi-threaded application) 2. Win32 static library (containing code for multi-threading) 3. Win32 static library (containing business objects code) 4. DLL (containing code for database layer, uses multi-threading)
Note: Main Application uses two Win32 static libraries and one DLL library.
Which option is advisable /MD -> Multithreaded DLL or /MT -> Multithreaded for each of the above four projects (VC++6), (for "Use Run-Time Library" in Project Settings dialog box (Project | Settings | C/C++ tab | Code Generation category)
Thanks in advance for any help or direction.
Ana
|
|
|
|
 |
|
 |
If you have to use the DLL, I'd go with the /MD
Most of the stuff I've done I've got by with static linking using /MT, but I have used it also with a charting library that had to be distributed as a dll, and compiled with /MD.
Peter Hancock My blog is here
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
|
 |
Thanks for your reply.
In our case, we have following main and sub-projects 1. Main Application (a multi-threaded application) 2. Win32 static library (containing multi-threading lib code) 3. Win32 static library (containing business objects code) 4. DLL (containing code for database layer, uses multi-threading)
So for EACH project (main app, 2 static lib and a DLL), which build option is advisable /MD or /MT ?
If you can guide (for EACH project), it will give me correct direction.
Ana
Ana_v123
|
|
|
|
 |
|
 |
/MD for all... otherwise you end up getting compiler warnings - and I hate compiler warnings.
Peter Hancock My blog is here
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
 | Any one used this lib on SMMP multi-processor system ? ana_v123 | 11:46 10 Oct '05 |
|
 |
Excellent library. Thanks.
Just to know if anyone is using this lib on multi-processor system. Wanted to know about his/her experience of this library.
Ana
|
|
|
|
 |
 | Great code one ? though vjairam | 18:45 1 Apr '05 |
|
 |
Definetly give this a 5, one question though when shutdown is called will it allow all the req that were submitted in the queue to be processed and will all existing threads that are working on a functor finish before shutting down.
Vince.
|
|
|
|
 |
 | Re: Great code one ? though Peter Hancock | 23:28 3 Apr '06 |
|
 |
Yes
Peter Hancock My blog is here
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
 | can adjust the thread number of thread pool according to the jobs number in queue willian | 0:31 2 Sep '04 |
|
 |
can you adjust the thread number of thread pool according to the jobs number in queue。you can design it using command design pattern and provide a prompt realization.I think it is useful.
|
|
|
|
 |
|
 |
See the article: http://www.codeproject.com/useritems/Queue_Manager.asp which automatically adjust the number of threads based on user defined load balancing criteria.
|
|
|
|
 |
|
 |
I considered adding this, and at one point, I did - but under no circumstances, even under my multi processor test box did I find it improved performance. The problem was that increasing the thread pool required creating a new thread, which hammered performance for a short period of time, and in fact negatively impacted performance in all cases. I removed it because I was never going to use it, and I like to keep my objects as light as possible, and remove "dead" code.
Also, increased threads also increased the context switches in computationally expensive thread objects - which again, dramtically reduced overall throughput. A better option is to profile your application with some actual usage patterns, and determine the optimum amount of threads to fire up (and queue length also) for that machine. It'll be different from machine to machine.
Peter Hancock My blog is here
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
 | [Wish-List] New feature wanted ;) rbid | 1:02 16 May '04 |
|
 |
Hello,
It will be nice to have the following feature:
1.- Get queue count (just create a function that returns the jobQueue.size().
2.- Have a function that flushes the jobs queue. (remove and delete all waiting requests in the queue.)
Thanks in advance.
---Ricky Marek (AKA:Rbid)
- And on the eight day God said, "Murphy, you're in charge."
|
|
|
|
 |
|
 |
*sigh*
So many things to do, and so little time. You're right... this is another of the TODO things that just never get's done. - UNICODE for exceptions version, kindly provided by Mendicant - MAXQUEUE size bug fixed - still needs to be uploaded - Return Q count [RBID] - Flush job queue [RBID]
My mission, should I choose to accept it, is to do this by September. (Will I actually achieve it? I don't know)
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
|
 |
Hello Again,
Here is some code that could be used for flushing the job queue:
void ThreadPool::FlushQueue(void) {
try
{ queueGuard.enter(); while(!jobQueue.empty()) {
std::auto_ptr request(jobQueue.front()); jobQueue.pop(); }
notFull.signal(); queueGuard.leave();
} catch(...) { queueGuard.leave(); }
}
Should be keep the comment in ::WaitForSingleObject(queueAccess, INFINITE);?
I did not test this code roughly.
Any comment?
--- Ricky Marek(AKA: Rbid)
- "Microcomputer : One millionth of a computer."
|
|
|
|
 |
|
 |
When a user calls shutdown, he would not want any requests to be done any more. So if shutdown() has a bool parameter, e.g. clearUndoneRequests, with a default value (true or false), a user can determine whether or not to do the remaining requests.
|
|
|
|
 |
 | WaitForSingleObject vs. WaitForMultipleObject... rbid | 3:03 14 Mar '04 |
|
 |
Hello,
I'm running here some SOC simulation, that puts thousands of thread requests into the thread-pool queue.
When the simulation is killed, shutting down the thread pool takes a long time..(till the SuicidePill will reach the target thread).
An alternative I thought was to use WaitForMultipleObjects in the acceptHandler method, by accepting an additional control event that may instruct the thread to terminate.
In this way, you sepparate the data-path from the control-path.
---Ricky
"Microcomputer : One millionth of a computer."
|
|
|
|
 |
|
 |
Yeah - it's certainly something I had considered. I ended up not going that way mainly because I wanted to ensure that ALL jobs that were in the queue were completed. Essentially, once a job is submitted, I wanted to provide a sort of guarantee to the developer that the jobs would be completed - even long running ones. I had to make some changes to the shutdown method though to ensure that this worked - and I never uploaded the changes. Essentially, I changed the timeout default to INFINITE (which currently is hardcoded to 10 seconds per thread) in the shutdown() method. I also allowed the user to provide a default timeout per thread before forcing the thread to terminate. (Which should never occur) This ensured they were completed.
The problem with your alternative, that I see, is that jobs in the queue won't get completed. For my application this would be a huge issue - however, I can see in your case, where you don't really care if the jobs aren't complete, it's an issue on the other side.
Suggestion - how about if I implemented a pool.abort() method, whereby we can add the WaitForMultiple objects, and use that to abort the threads, voiding any submitted jobs? It would still shutdown cleanly, but the user can explicitly abort the entire pool... (And should therefore be in control of the possible repercussions?) This is almost the same as using pool.shutdown(1) except that it will abort it cleanly. pool.shutdown(1) may actually preempt one of the jobs, and shut it down midway through. Not a particularly nice thing to do.
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
|
 |
Peter,
Yes, you are right.
Another issue to take care when aborting the threads, is to "delete" the elements stored on the queue, otherwise you may get memory leaks.
I'm using WaitForMultipleObjects in a special thread-pool, that when instructed, it runs a periodic thread request (that was previously provided). The threads receive two events, one that announces that a thread request is waiting on the queue, or a second event generated by a WaitableTimer. (See CreateWaitableTimer/SetWaitableTimer in the MSDN documentation). The motivation for this change was that I was required to run a periodic data collection from a hardware device, using the same thread-request, in intervals of 1msec.
The acceptHandler now is as follow:
void ThreadPool::acceptHandler(unsigned int threadId) throw() {
HANDLE acceptEvents[2] = { queueAccess, timerHandle };
onThreadStart(threadId);
for(bool abortFlag = false; false == abortFlag; ) {
DWORD status = ::WaitForMultipleObjects( 2, acceptEvents, FALSE, INFINITE); switch(status) { case WAIT_OBJECT_0:
... break;
case WAIT_OBJECT_0 + 1:
...
break; ... } } onThreadFinish(threadId); }
Thanks again.
--- Ricky Marek.
"And on the eight day God said, "Murphy, you're in charge."
|
|
|
|
 |
|
 |
A suggestion - Why not put the periodic thread request in its own job....
class PeriodicExecution : public ThreadRequest { public: PeriodicExecution(long timerInterval = 1) { waitEvent = CreateWaitableTimer(...) }
~PeriodicExecution() { }
int operator()() { while(running) { DWORD result = WaitSingleObject(waitEvent); switch(result) { case WAIT_OBJECT_0: break; } } }
void abort() { running = false; }
private: void* waitEvent; bool running; };
This way, you don't need to touch the thread pool code, and one thread will continue to run in the operator() loop until you call abort() from the main loop. Meanwhile, the other threads will continue to service jobs...
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
 | MAX_QUEUE_SIZE Question.. rbid | 6:22 11 Mar '04 |
|
 |
Hello,
In my scenario, I need that the thread-pool queue should not be limited to a small MAX_QUEUE_SIZE number. That is, the client of the thread-pool should never block, taking into account that I have enough memory to keep all std::queue...
Any hint how can I implement this requirement using your great thread-pool framework?
--- Ricky.
Rbid
"Microcomputer : One millionth of a computer."
|
|
|
|
 |
|
 |
Yep - MAX_QUEUE_SIZE is actually a default passed in to the constructor.
Simply create the pool thus...
ThreadPool myPool(10, 20000);
where 10 is the number of threads in the pool, and 20,000 is the number of items that can be allowed in the queue.
Note that there is a bug in this submitRequest, where I forgot to change the MAX_QUEUE_SIZE constant to refer to the actual max queue size passed in to the constructor. You'll need to add an additional member to store the actual queue size, update the constructor to set it to the value passed in, and then change the line if(jobQueue.size() >= MAX_QUEUE_SIZE) to refer to the new member variable instead of MAX_QUEUE_SIZE. (Thanks to Hylu for finding this!) [^]
PK
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
 | Excellent Code -- One Problem *fixed* Ben | 15:00 14 Dec '03 |
|
 |
This framework is exactly what I was looking for, you just saved me hours of coding, thank you. Only this, I am accepting and shutting down the threadpool more than once (in order, of course) and upon the second call to 'ThreadPool::shutdown()', a crash occured. The reason was that the variable named 'std::vector<void *> ThreadPool::pool' was not cleared at the shutdown and thus bad handles remained in the vector and crashed the calls to 'WaitForSingleObject()'. So I added 'pool.clear();' after the for()-loop and whoila
Thanks again Benji
|
|
|
|
 |
|
 |
Glad it helped. I'll have to add that test to my harness. Not one that I'd thought of. Thanks for the positive feedback!
PK
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
|
 |
Trying to replicate this error using...
ThreadPool pool; int result; pool.accept(); pool.submitRequest(new Add(1, 2, result)); pool.shutdown(); BOOST_CHECK_EQUAL(3, result); pool.accept(); pool.submitRequest(new Add(3, 4, result)); pool.shutdown(); BOOST_CHECK_EQUAL(7, result);
but it's not working (that is, it doesn't crash)... could you please post how you created the error?
Thanks
PK
And they still ran faster and faster and faster, till they all just melted away, and there was nothing left but a great big pool of melted butter
"I ask candidates to create an object model of a chicken." -Bruce Eckel
|
|
|
|
 |
|
|
Last Updated 1 Jul 2003 |
Advertise |
Privacy |
Terms of Use |
Copyright ©
CodeProject, 1999-2010