Click here to Skip to main content
Click here to Skip to main content

A programming model to use a thread pool

By , 30 Sep 2000
 
  • Download source files - 3 Kb
  • Download demo project - 24 Kb
  • Introduction

    On many occasions we need to utilize mutiple threads to boost the system performance. The logic for each thread is almost the same, but we need to manage these threads. If the system is busy, we create more threads, otherwise we kill some thread to avoid the extra overhead.

    I have done a couple of projects involving the multiple-thread management. At last I decided to write a class to wrap this machenism. This class can dynamically allocate threads and assign jobs to these worker threads. You derive your own classes and you do not need to know the underlying machanism to handle the mutiple threading and synchronization between these threads. However, you need to make your worker classes thread safe since your objects may be assigned to different threads each time.

    The another thing I want to demonstrate is the using the feature of IOCompletion Port. I found that it is amazing easy and useful, especially when used as a way to transfer data between threads.

    Usage

    To use the thread pool class you need to derive your worker class from IWorker and your job class from IJobDesc. The processing logic must be embedded within the member function IWorker::ProcessJob(IJobDesc* pJob). After you are finished, you can declare a thread pool like this:

    CThreadPool pool;
    pool.Start(6, 10);
    //do some other jobs here
    pool.Stop();
    

    The Start function has two parameters. The first argument is the minimum number of the worker threads this thread pool object should spawn. The second argument indicates the maximum number of worker thread within this thread pool. If the thread pool is very busy working on the assigned jobs it will automatically spawn more worker threads. On the other hand, when the thread pool is idle some threads will be removed from the pool. Fine-tune these two parameters to get the best performance.

    To assign jobs to the thread pool for processing, simply call the function

    pool.ProcessJob(pJob, pWorker);
    

    You must make sure that your derived worker class is thread-safe since a worker instance may be on multiple threads simultaneously. You have no control as to whether the process is on the same thread as the last time or not.

    Note

    If the processing takes a very long time, when you call Stop(), the processing may not finished immediately. The Stop() function will wait for a maximum of 2 minutes and then return. This function has an optional argument. If this argument is set to true, the function will terminate these worker threads anyway. If this argument is set to false, these worker threads will not get terminated harshly and still live. Under this situation, you have to take care that the worker object may not exist after calling Stop() and you will get an access violation error if you attempt to access them.

    The job object must be generated on the heap using new operator. After the process ends it will automatically deleted by the framework.

    License

    This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

    A list of licenses authors might use can be found here

    About the Author

    Sherwood Hu
    United States United States
    Member
    No Biography provided

    Sign Up to vote   Poor Excellent
    Add a reason or comment to your vote: x
    Votes of 3 or less require a comment

    Comments and Discussions

     
    You must Sign In to use this message board.
    Search this forum  
        Spacing  Noise  Layout  Per page   
    GeneralA question about CCriticalSectionmemberSeanQA28 Oct '08 - 20:26 
    when I used the CThreadPool class, I encapsulate it in a dll.But i used it ina another all,there has be a error when the CThreadPool object was released.I traked it,the error occur in ~CCriticalSection().That is it:
    User breakpoint called from code at 0x7c921230
    HEAP[****.exe]: Invalid Address specified to RtlValidateHeap( 00F00000, 5F4D0AE6 )
    I know the first error whie a invalid pointer was deleted.Maybe the Desconstrcut of CThreadPool was used twice or more.But i can make sure the Desconstrcut of CThreadPool was used once.And strangely,after when i change "CCriticalSection m_arrayCs" into "CRITICAL_SECTION m_arrayCs",the progamme runs with no error.Of course I add "InitializeCriticalSection(&m_arrayCs);" in CThreadPool() and dd "DeleteCriticalSection(&m_arrayCs);" in ~CThreadPool().
    GeneralWhen the menber function "Stop" is used twice and morememberSeanQA28 Oct '08 - 20:24 
    In some case,the menber function "Stop" may be used twice and more,the first using "Stop" has close all I/0 Completion Portand and teminated all thread,so when using "Stop" agin,"::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);" will be not handled,and "DWORD rc=WaitForMultipleObjects(nCount, pThread, TRUE, 120000);" will be waited for 2 minute,it's not appropriate.I think the "CThreadPool::Stop()" like this:
    void CThreadPool::Stop(bool bHash)
    {
    CSingleLock singleLock(&m_arrayCs);
    singleLock.Lock(); // Attempt to lock the shared resource
     
    if(m_hMgrIoPort != NULL)
    {
    EnterCriticalSection(&m_arrayCs);
    ::PostQueuedCompletionStatus(m_hMgrIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);
    WaitForSingleObject(m_hMgrThread, INFINITE);
    CloseHandle(m_hMgrIoPort);
    m_hMgrIoPort = NULL;
    LeaveCriticalSection(&m_arrayCs);
    }
    if(m_hWorkerIoPort != NULL)
    {
    EnterCriticalSection(&m_arrayCs);
    //shut down all the worker threads
    UINT nCount=m_threadMap.GetCount();
    HANDLE* pThread = new HANDLE[nCount];
    long n=0;
    POSITION pos=m_threadMap.GetStartPosition();
    DWORD threadId; ThreadInfo info;
    while(pos!=NULL)
    {
    ::PostQueuedCompletionStatus(m_hWorkerIoPort, 0, 0, (OVERLAPPED*)0xFFFFFFFF);
    m_threadMap.GetNextAssoc(pos, threadId, info);
    pThread[n++]=info.m_hThread;
    }



    DWORD rc=WaitForMultipleObjects(nCount, pThread, TRUE, 120000);//wait for 2 minutes, then start to kill threads
    CloseHandle(m_hWorkerIoPort);
    m_hWorkerIoPort = NULL;

    if(rc==WAIT_TIMEOUT&&bHash)
    {
    //some threads not terminated, we have to stop them.
    DWORD exitCode;
    for(int i=0; i<ncount;> if (::GetExitCodeThread(pThread[i], &exitCode)==STILL_ACTIVE)
    TerminateThread(pThread[i], 99);
    }
    delete[] pThread;
    }
    singleLock.Unlock();
     
    }
    GeneralComment on design...memberNigel de Costa4 Sep '06 - 23:02 
    I have read many articles on thread-pools on this site. Some use IOCP - some don't. However none of them discuss WHY you might want to use IOCP. Yes IOCP is quite easy to use but there are many other ways to implement a thread pool.
     
    There is a very good reason for using IOCP and it should be explained!
    GeneralRe: Comment on design...memberalexquisi27 Mar '07 - 23:01 
    http://www.microsoft.com/technet/sysinternals/information/IoCompletionPorts.mspx[^]
    Generalhave memory leaks!memberpcbirdwang3 Aug '05 - 14:43 
    when i use the code,have some memory leaks
    GeneralATL Server provides its own thread pool classmemberAlexander Gräf6 May '04 - 4:10 
    From VS7 and up, you can use the internal class ATL::CThreadPool. The worker archetype that is passed as a template argument has only to provide three public methods and one typedef for the worker items. It is a very versatile, easy and fast class. When programming for Windows, use this class instead.
     
    Cheers
    GeneralRemoveThreads bugmemberLAT26 Jul '03 - 9:44 
    Hi,
     
    First, many thanks for sharing your code.
     
    I think there is a little bug when WorkerProc process RemoveThread message (0xFFFFFFFE)
    Currently, when the thread receives this messages, the While boucle breaks and the thread is terminated. We dont knows if ProcessJob has finished or not.
     
    So, I've fixed this like that :
    unsigned int CThreadPool::WorkerProc(void* p)
    {
    {
    ...
    bool bBusy = false;
    ...
     
    while(::GetQueuedCompletionStatus(IoPort, &pN1, &pN2,
    &pOverLapped, INFINITE ))
    {
    if(pOverLapped == (OVERLAPPED*)0xFFFFFFFE)
    {
    if(bBusy == false)
    {
    TRACE1("Server remove thread ID %d\n", threadId);
    pServer->RemoveThread(threadId);
    break;
    }
     
    else
    TRACE1("Server remove thread ID %d BUSY\n", threadId);
     
    }
     
    else if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF)
    {
    break;
    }
     
    else
    {
    ...
    bBusy = true;
    pIWorker->ProcessJob(pIJob);
    bBusy = false;
    ....
    }
     
    }
    }
     
    Best regards
    Blush | :O

     
    Always look on the bright side of life.
    GeneralRe: RemoveThreads bugmemberLAT27 Jul '03 - 0:54 
    Sorry,
    Spaces seem to be wiped out in my previous message so it's quite difficult to read.
     
    unsigned int CThreadPool::WorkerProc(void* p)
    {
     
       {
       ...
       bool bBusy = false;
       ...
     
       while(::GetQueuedCompletionStatus(IoPort, &pN1, &pN2, &pOverLapped, INFINITE ))
       {
             if(pOverLapped == (OVERLAPPED*)0xFFFFFFFE)
             {
                if(bBusy == false)
                {
                TRACE1("Server remove thread ID %d\n", threadId);
                pServer->RemoveThread(threadId);
                break;
                }
     
                else
                   TRACE1("Server remove thread ID %d BUSY\n", threadId);
     
             }
     
             else if(pOverLapped == (OVERLAPPED*)0xFFFFFFFF)
             {
             break;
             }
     
             else
             {
             ...
             bBusy = true;
             pIWorker->ProcessJob(pIJob);
             bBusy = false;
             ....
          }
     
       }
    }

     
    Always look on the bright side of life.
    GeneralRe: RemoveThreads bugmemberWREY27 Jun '04 - 6:46 
    Thanks for what you have done!!
     
    And thanks to the author for sharing his code with us!!
     
    Wink | ;)
     
    William
     
    Fortes in fide et opere!
    GeneralLinker could not find "pthreadVC.lib"memberWREY29 Aug '02 - 23:12 

    The program couldn't link due to the error, "cannot open file, 'pthreadVC.lib'"
     
    As part of the download, I did receive a 'pthreadVC.dll'. If this is the file that should be 'pthreadVC.lib', do I just rename it, or how do I resolve what the linker is complaining about?
     
    Thanks.
     
    Cool | :cool:
     
    William

    General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

    Permalink | Advertise | Privacy | Mobile
    Web03 | 2.6.130523.1 | Last Updated 1 Oct 2000
    Article Copyright 2000 by Sherwood Hu
    Everything else Copyright © CodeProject, 1999-2013
    Terms of Use
    Layout: fixed | fluid