A common scenario in applications is having a pool of worker threads that has to process several objects waiting in a queue. The operation to be executed on the waiting objects is the same, but it can be carried out by different threads on different objects, as described in the next picture:
A single queue stores objects that need to be processed; a set of worker threads is running and each thread gets an object at a time from the queue and processes it by calling the same method.
This article presents an easy to use class that implements a ThreadPool and encapsulates a Queue.
Deriving your class from CMBThreadPool
The core of the sample project is the
CMBThreadPool class that encapsulates the
System::Threading::Queue class and implements the thread pool. This class uses the
CMBThread class to manage a single instance of a worker thread.
CMBThreadPool class provides
Pop methods to access its queue; using the
Push method, any
System:Object derived object can be inserted in the queue.
The threads can be controlled by the methods
Each worker thread (implemented by the
CMBThread class) gets an object at a time from the queue and calls the
OnDoWork virtual method.
To use this class, you just need to derive a new (managed) class from the
CMBThreadPool class in the following way:
<PRE lang=mc++>__gc class CMyThreadPool :
void OnDoWork(System::Object* obj);
CMyThreadPool class overrides the
OnDoWork method that will be called each time an object can be processed.
In the sample, in the
CMyThreadPool::OnDoWork method, a
Thread::Sleep call simulates a slow I/O operation.
<PRE lang=mc++>void CMyThreadPool::OnDoWork(System::Object* obj)
Thread::Sleep(2000); // simulate a slow I/O operation
Starting & stopping the thread pool
The thread pool is explicitly started by the
StartThreadPool method; the
ThreadCount parameter specifies how many threads are created, while
nProcessTimeOut parameters are used to specify the time any thread of the pool has to wait when it finds the queue empty and the time it has to wait after each object process.
The thread can be stopped by the
StopThreadPool method; the thread can be stopped using one of the following options:
eStopAbort - all threads are stopped immediately.
eStopHandleCurrent - the thread pool is stopped after each thread ends handling the current object.
eStopHandleAll - the thread pool is stopped the first time the queue is empty.
The last 2 options make the
StopThreadPool method waiting (and blocking the program execution) while the threads end processing objects in the queue.
In order to synchronize the threads' termination in a “safe” way, which is without aborting the threads when they are in the middle of something, a
ManualResetEvent object is used. Each thread managed by the
CMBThread class creates a
ManualResetEvent in the class constructor:
<PRE lang=mc++>CMBThread::CMBThread(Int32 nIndex, CMBQueue* pMBQueue)
m_nIndex = nIndex;
m_pMBQueue = pMBQueue;
m_pEventStop = new ManualResetEvent(false); // initially not-signaled
new ThreadStart(this, &CMBThread::ThreadProc);
The event is created in the “not-signaled” state (meaning that any Wait call will be blocked until the object becomes signaled). Each thread calls the
m_pEventStop->Set(); to signal the event just before exiting.
StopThreadPool method, an array of
ManualResetEvent is created and then a call to the
WaitHandle::WaitAll method blocks the execution until all event objects are signaled.
<PRE lang=mc++>ManualResetEvent* manualEvents __gc  =
new ManualResetEvent* __gc [m_arrayThreads->get_Count()];
for (i = 0; i < m_arrayThreads->get_Count(); i++ )
CMBThread* pMBThread =
manualEvents[i] = pMBThread->m_pEventStop;