Threading is easy (A simple thread, thread pool, object pool, and more)
This simple library provides an implementation of almost all aspects of multithreaded programming.
Introduction
This simple library provides an implementation of almost all aspects of multithreaded programming, and it does not require a deep understanding of the multithreading concept. When I wrote this library, I was keeping in mind that the library must be easy to use for any programmer with a basic understanding of C++ and templates.
This library removes all the extra work related to the management of threads, so you can spend your time concentrating on your business logic implementation.
Using the code
There are five most important classes and interfaces in this library that you need to be aware of:
CSafeThread
is a template class which describes an abstract thread. I decided to define this class as a template class, because of the unknown parameter for the thread and most importantly, because I want to call theStop()
virtual method in the destructor of the class and this is appropriate only ifCSafeThread
is a concrete class.CThreadInterface
is an abstract class (interface). This class is the base class for your concrete implementation of a thread. There are two steps to implement a thread. First, define your thread class and derive it from theCThreadInterface
interface.BEGIN_THREAD_EVENT
– occurs when a thread begins its execution,END_THREAD_EVENT
– occurs when a thread is finished by exiting theThread()
method,TERMINATE_THREAD_EVENT
– occurs when theStop()
method is invoked.CSequentialObjectPool
is a template class which implements a work item queue processor. The object pool processes objects from a queue one by one in a single thread. A work item is user defined concrete class derived fromCWorkItemInterface
.- The last most important class is the
CThreadPool
class. In order to create a pool of threads in your application, you need to define a work item class that is derived from theCWorkItemInterface
interface, then instantiate and initialize theCThreadPool
class. And finally, queue a work item for processing in the thread pool.
class CMyThread: public mb_thread_lib::CThreadInterface<_bstr_t>
{
protected:
virtual void Thread(mb_thread_lib::smart_ptr<_bstr_t> sp)
{
//this is our thread, sp is a parameter we are passing in
// implement your thread code here
}
};
Now, instantiate your thread:
mb_thread_lib::CSafeThread< CMyThread, _bstr_t> Thread;
Start the thread:
Thread.Start( mb_thread_lib::smart_ptr<_bstr_t>(
new _bstr_t("parameter for thread")) );
Wait for this thread to finish:
Thread.Wait();
You can explicitly stop the thread by calling the Stop()
method of the CSafeThread
instance, or by destroying the instance. If you want to be notified of the thread events, you need to implement the Notify()
method in your thread class.
void Notify(mb_thread_lib::NOTIFY_EVENT_TYPE evt)
{
switch(evt)
{
case mb_thread_lib::BEGIN_THREAD_EVENT:
m_bStop = false;
break;
case mb_thread_lib::TERMINATE_THREAD_EVENT:
m_bStop = true;
break;
default:
break;
}
};
There are three types of events:
Let's look at the TestSequentialObjectPool example in the source code. First, I define the CTestWorkItem
class, this is a work item class. This class implements a task that needs to be executed. You must derive this class from CWorkItemInterface
which is an abstract class. You can create a different work item type to work with the same CSequentialObjectPool
instance.
class CTestWorkItem: public mb_thread_lib::CWorkItemInterface
{
bool bPrcessing;
public:
CTestWorkItem()
{
}
~CTestWorkItem()
{
}
void ProcessWorkItem()
{
//execute our code here
//…
}
void AbortWorkItem()
{
//abort our item
//…
}
};
Now, I define the CPushThread
class. This is a helper thread class to push work items for processing to the CSequentialObjectPool
instance. This class implements the CSafeThread
type of class (you already know how to create threads, see above).
class CPushThread: public mb_thread_lib::CThreadInterface
<mb_thread_lib::CSafeThread<mb_thread_lib::CSequentialObjectPool<_bstr_t>,
_bstr_t> >
{
volatile bool m_bStop;
protected:
void Notify(mb_thread_lib::NOTIFY_EVENT_TYPE evt)
{
switch(evt)
{
case mb_thread_lib::BEGIN_THREAD_EVENT:
m_bStop = false;
break;
case mb_thread_lib::TERMINATE_THREAD_EVENT:
m_bStop = true;
break;
default:
break;
}
};
public:
void Thread(mb_thread_lib::smart_ptr<mb_thread_lib::CSafeThread
<mb_thread_lib::CSequentialObjectPool<_bstr_t>,
_bstr_t> > spWorkQueue)
{
for(int i = 0; ((!m_bStop)&&(i < MY_MAX_ITER*2)); ++i)
{
_bstr_t bstr("Val: ");
_variant_t var;
var = i;
var.ChangeType(VT_BSTR);
bstr = bstr + var.bstrVal;CTestWorkItem *pWorkItem = new CTestWorkItem();
pWorkItem->m_bstr = bstr;
mb_thread_lib::smart_ptr< mb_thread_lib::CWorkItemInterface >
spWorkItem(dynamic_cast<mb_thread_lib::CWorkItemInterface*>(pWorkItem));
if(!spWorkQueue->QueueWorkItem(spWorkItem))
break;
}
}
};
Now, I create an instance of the CSequentialObjectPool
class, and because it's a thread based class, I invoke the Start()
method to begin a thread to process the work items. I the invoke the Start()
method of my push thread, which is an instance of the CPushThread
class, with passes a smart pointer to the CSequentialObjectPool
instance.
//create sequental object pool (work queue)
mb_thread_lib::smart_ptr<mb_thread_lib::CSafeThread
<mb_thread_lib::CSequentialObjectPool<_bstr_t>, _bstr_t> >
spWorkQueue(new mb_thread_lib::CSafeThread
<mb_thread_lib::CSequentialObjectPool<_bstr_t>, _bstr_t>());
//create a push thread (this thread feeds work queue
// with objects for sequental processing)
mb_thread_lib::CSafeThread<CPushThread,
mb_thread_lib::CSafeThread<mb_thread_lib::CSequentialObjectPool<_bstr_t>,
_bstr_t> > PushThread;
//start work queue
spWorkQueue->Start(mb_thread_lib::smart_ptr<_bstr_t>(NULL));
//start push thread
PushThread.Start(spWorkQueue);
Sleep(1000);
//create thread pool
mb_thread_lib::CThreadPool ThreadPool;
//initialize thread pool with 2 MIN threads,
//15 MAX threads and thread idle time 10 sec
ThreadPool.InitPool(2, 15, 10000);
_variant_t var;
int i;
//creaet 50000 work items
for(i = 0; i < 50000; ++i)
{
var = i;
var.ChangeType(VT_BSTR);
CTestWorkItem* p = new CTestWorkItem();
p->m_bstr = var.bstrVal;
mb_thread_lib::smart_ptr<mb_thread_lib::CWorkItemInterface> spWorkItem(p);
//push work items into the thread pool
ThreadPool.QueueWorkItem(spWorkItem);
}
Another helpful class is CTimer
. In order to use the CTimer
class, you must define a work item class that is derived from CWorkItemInterface
, create an instance of the CTimer
class, and start the timer with a reference to a work item and a due time.
CTestWorkItem* p = new CTestWorkItem();
p->m_bstr = L"Timer work item.";
mb_thread_lib::smart_ptr<mb_thread_lib::CWorkItemInterface> spWorkItem(p);
mb_thread_lib::CTimer Timer;
DWORD d = 2000;
printf("execute handler every %d second(s)\n", d/1000);
Timer.Start(spWorkItem,d/* 2 sec. */);
Sleep(10000);
Timer.Stop();
In addition to these main classes, there are several synchronization classes and a smart pointer implementation. There are two types of critical section classes, CLocalCriticalSec
and CGlobalCriticalSec
. The CLocalCriticalSec
class is a wrapper of the critical section Win32 APIs, and CGlobalCriticalSec
is a wrapper of the mutex Win32 APIs. In order to instantiate a named critical section, local or global, there is the CCriticalSecFactory
class in the library. To dispose a named critical section, use the CCriticalSecDisposer
class. You can see an example in the "ThreadTest.spp" under the following comments: "test critical sections". The CAutoCriticalSec
class allows you to automatically protect part of your code by instantiating the class with a reference to a critical section instance.