Click here to Skip to main content
15,891,529 members
Articles / Programming Languages / C++

Multithreading Tutorial

Rate me:
Please Sign up or sign in to vote.
4.80/5 (89 votes)
28 Dec 2006CPOL20 min read 599.9K   13.1K   250  
This article demonstrates how to write a multithreaded Windows program in C++ using only the Win32 API.
/*  file Main.cpp
 *
 *  This program is an adaptation of the code Rex Jaeschke showed in
 *  his Nov 2005 C/C++ User's Journal article entitled "C++/CLI Threading:
 *  Part II".  I changed it from C++/CLI (managed) code to standard
 *  C++.
 *
 *  One hassle is the fact that C++ must employ a free (C) function
 *  or a static class member function as the thread entry-point-function.
 *
 *  Note that the Producer and Consumer share an instance of the
 *  MessageBuffer class.  Hence the functions in this class must
 *  be protected with a synchronization method.  I demonstrate the
 *  use of a mutex here (see my Part1Listing2 program for a demonstration
 *  of a critical section).
 *
 *  This program must be compiled with a multi-threaded C run-time
 *  (/MT for LIBCMT.LIB or /MTd for LIBCMTD.LIB).
 *
 *                                      John Kopplin  3/2006
 */


#include <stdio.h>
#include <windows.h>          // for HANDLE
#include <process.h>          // for _beginthread()

#define MSG_BUF_SIZE  128

//#define  WITH_SYNCHRONIZATION


class MessageBuffer 
{
    // The functions in this MessageBuffer class must be synchronized
    // because they are called by both the producer and consumer threads.
    // Only one of these two threads must be allowed access to the
    // message buffer at any one time.

    // A "synchronization object" is an object whose handle can be
    // specified in one of the wait functions to coordinate the
    // execution of multiple threads. More than one process can have
    // a handle to the same synchronization object, making interprocess
    // synchronization possible.

    // A "mutex object" is a synchronization object whose state is set
    // to signaled when it is not owned by any thread, and nonsignaled
    // when it is owned. Only one thread at a time can own a mutex object,
    // whose name comes from the fact that it is useful in coordinating
    // mutually exclusive access to a shared resource.

    char  messageText[MSG_BUF_SIZE];  // the message buffer
    BOOL  m_bContinueProcessing;      // used to control thread lifetime

#ifdef WITH_SYNCHRONIZATION
    HANDLE  m_hMutex;
    HANDLE  m_hEvent;
#endif

public:
    MessageBuffer()            // ctor
    {
        memset( messageText, '\0', sizeof( messageText ) );  // initially the message buffer is empty

        m_bContinueProcessing = true;   // will be changed upon a call to DieDieDie()

#ifdef WITH_SYNCHRONIZATION
        printf( "Creating mutex in MessageBuffer ctor.\n" );

        m_hMutex = CreateMutex( NULL,    // no security attributes
                                false,   // BOOL bInitialOwner, we don't want the
                                         // thread that creates the mutex to
                                         // immediately own it.
                                "MessageBufferMutex" // lpName
                              );
        if ( m_hMutex == NULL )
        {
            printf( "CreateMutex() failed in MessageBuffer ctor.\n" );
        }

        // Create the auto-reset event.

        m_hEvent = CreateEvent( NULL,     // no security attributes
                                FALSE,    // auto-reset event
                                FALSE,    // initial state is non-signaled
                                "MessageBufferEvent" );    // lpName

        if (m_hEvent == NULL) 
        {
            printf( "CreateEvent() failed in MessageBuffer ctor.\n" );
        }
#endif
    }

    ~MessageBuffer()
    {
#ifdef WITH_SYNCHRONIZATION
        CloseHandle( m_hMutex );
        CloseHandle( m_hEvent );
#endif
    }

    void SetMessage(char * s)
    {
        printf("in MessageBuffer::SetMessage()\n");

#ifdef WITH_SYNCHRONIZATION
        DWORD   dwWaitResult = WaitForSingleObject( m_hMutex, INFINITE ); // Jaeschke's Monitor::Enter(this)
            
        if ( dwWaitResult != WAIT_OBJECT_0 )
        {
            printf( "WaitForSingleObject() failed in MessageBuffer::SetMessage().\n");
            return;
        } 
        printf( "SetMessage() acquired mutex\n" );
#endif

        // I intentionally use a very non-atomic method of copying
        // the new message into the message buffer, in order to
        // exacerbate the problem which occurs if the program doesn't
        // use a synchronization object between the producer and consumer.

        if ( strlen( s ) >= MSG_BUF_SIZE )
            s[MSG_BUF_SIZE-1] = '\0';    // make sure the caller doesn't overfill our buffer

        char * pch = &messageText[0];
        while ( *s )
        {
            *pch++ = *s++;
            Sleep( 5 );
        }

        *pch = '\0';

        // Since the message buffer now holds a message we can
        // allow the consumer thread to run.

//        printf("Set new message: %s.\n", messageText );

#ifdef WITH_SYNCHRONIZATION
        printf( "SetMessage() pulsing Event\n" );
        if ( ! SetEvent( m_hEvent ) )   // Jaeschke's Monitor::Pulse(this)
        {
            printf( "SetEvent() failed in SetMessage()\n" );
        }

        printf( "SetMessage() releasing mutex\n" );
        if ( ! ReleaseMutex( m_hMutex ))   // Jaeschke's Monitor::Exit(this)
        { 
            printf( "ReleaseMutex() failed in MessageBuffer::SetMessage().\n");
        } 
#endif
    }

    void ProcessMessages()
    {
//        printf("in MessageBuffer::ProcessMessages()\n");

        while ( m_bContinueProcessing )   // state variable used to control thread lifetime
        {
            // We now want to enter an "alertable wait state" so that
            // this consumer thread doesn't burn any cycles except
            // upon those occasions when the producer thread indicates
            // that a message waits for processing.

#ifdef WITH_SYNCHRONIZATION
            DWORD   dwWaitResult = WaitForSingleObject( m_hEvent, 2000 );

            if ( ( dwWaitResult == WAIT_TIMEOUT )   // WAIT_TIMEOUT = 258
              && ( m_bContinueProcessing == false ) )
            {
                break;    // we were told to die
            }
            else if ( dwWaitResult == WAIT_ABANDONED ) // WAIT_ABANDONED = 80
            {
                printf( "WaitForSingleObject(1) failed in MessageBuffer::ProcessMessages().\n");
                return;
            }
            else if ( dwWaitResult == WAIT_OBJECT_0 )  // WAIT_OBJECT_0 = 0
            {
                printf( "ProcessMessages() saw Event\n" );
            } 

            dwWaitResult = WaitForSingleObject( m_hMutex, INFINITE );
            
            if ( dwWaitResult != WAIT_OBJECT_0 )
            {
                printf( "WaitForSingleObject(2) failed in MessageBuffer::ProcessMessages().\n");
                return;
            } 
            printf( "ProcessMessages() acquired mutex\n" );
#endif

            if ( strlen( messageText ) != 0 )
            {
                printf( "Processed new message: %s\n", messageText );

                // We now empty the message buffer to show we have finished
                // processing the message:

                messageText[0] = '\0';   
            }

#ifdef WITH_SYNCHRONIZATION
        printf( "ProcessMessages() releasing mutex\n" );
        if ( ! ReleaseMutex( m_hMutex )) 
        { 
            printf( "ReleaseMutex() failed in MessageBuffer::ProcessMessages().\n");
        } 
#endif

        } // end of while ( m_bContinueProcessing ) loop
    }

    void  DieDieDie( void )
    {
        m_bContinueProcessing = false;   // ProcessMessages() watches for this in a loop
    }
};

class CreateMessages        // this is the producer
{
    MessageBuffer* msg;
public:
    CreateMessages(MessageBuffer* m)    // ctor
    {
        msg = m;
    }

    void CreateMessagesEntryPoint()
    {
        char  szTemp[MSG_BUF_SIZE];

        // We create 5 string messages for the consumer to process.
        // These 5 messages look like:
        //    1111111111
        //    2222222222
        //    3333333333
        //    4444444444
        //    5555555555

        for (int i = 1; i <= 5; ++i)
        {
            sprintf( szTemp, "%d%d%d%d%d%d%d%d%d%d", i, i, i, i, i, i, i, i, i, i );
//          printf( "new msg = %s\n", szTemp );

            msg->SetMessage( szTemp );
            Sleep(1000);
        }

        // Once we have created all the messages we die by
        // exiting our thread entry-point-function.

        printf("CreateMessages thread terminating.\n");
    }

   // In C++ you must employ a free (C) function or a
   // static class member function as the thread entry function.
   // Furthermore, _beginthreadex() demands that the thread
   // entry function signature take a single (void*) and returned
   // an unsigned.

    static unsigned __stdcall CreateMessagesStaticEntryPoint(void * pThis)
    {
        CreateMessages* pCM = (CreateMessages*)pThis;
        pCM->CreateMessagesEntryPoint();

        // Unlike the consumer thread which attempts to live forever,
        // this producer thread commits suicide after performing a
        // specific amount of work.

        // You can call _endthreadex() explicitly to terminate a thread;
        // however, _endthreadex() is called automatically when the thread
        // returns from the routine passed as a parameter to _beginthreadex().

        // By calling _endthreadex() we cause our thread object to go
        // to the signaled state, which releases anyone waiting on us.

        _endthreadex( 2 );    // the thread exit code

        // _endthreadex() does not close the thread handle so somebody
        // must still call CloseHandle().

        // We never get to here because of the earlier call to _endthreadex().

        return 2;  // the thread exit code
    }
};

class ProcessMessages   // this is the consumer
{
    MessageBuffer* msg;
public:
    ProcessMessages(MessageBuffer* m)    // ctor
    {
        msg = m;
    }

    void ProcessMessagesEntryPoint()
    {
        msg->ProcessMessages();
    }

    static unsigned __stdcall ProcessMessagesStaticEntryPoint( void * pThis )
    {
        ProcessMessages * pPM = (ProcessMessages*)pThis;
        pPM->ProcessMessagesEntryPoint();  // calls MessageBuffer::ProcessMessages()
                                           // which has a while(m_bContinueProcessing) loop

        printf("ProcessMessages thread terminating.\n");
        return 1;   // the thread exit code
    }

    void  DieDieDie( void )
    {
        msg->DieDieDie();   // pass the die message along to the MessageBuffer class
                            // which watches for it in its ProcessMessages() function
    }
};

int main()
{
    DWORD   dwExitCode;

    // The following single instance of the MessageBuffer class is shared
    // by both the producer and consumer threads.  Via synchronization
    // statements, this program guarantees that the consumer thread can't
    // process the contents of the message buffer until the producer thread
    // has put something there, and that the producer thread can't
    // put another message there until the previous one has been consumed.

    MessageBuffer* m = new MessageBuffer();

    ProcessMessages* pm = new ProcessMessages( m );

    HANDLE   hthConsumer;
    unsigned  uiConsumerThreadID;

    // When developing a multithreaded WIN32-based application with
    // Visual C++, you need to use the CRT thread functions to create
    // any threads that call CRT functions. Hence to create and terminate
    // threads, use _beginthreadex() and _endthreadex() instead of
    // the Win32 APIs CreateThread() and EndThread().

    // Unlike the thread handle returned by _beginthread(), the thread handle
    // returned by _beginthreadex() can be used with the synchronization APIs.

    hthConsumer = (HANDLE)_beginthreadex( NULL,
                                          0,
                                          ProcessMessages::ProcessMessagesStaticEntryPoint,
                                          pm,
                                          CREATE_SUSPENDED, // so we can later call ResumeThread()
                                          &uiConsumerThreadID );

    if ( hthConsumer == 0 )
        printf("Failed to create consumer thread\n");

    GetExitCodeThread( hthConsumer, &dwExitCode );  // should be STILL_ACTIVE = 0x00000103 = 259
    printf( "initial Consumer thread exit code = %u\n", dwExitCode );


    ResumeThread( hthConsumer );   // Jaeschke's pmt->Start()


    CreateMessages* cm = new CreateMessages( m );  // note that both the ProcessMessages ctor
                                                   // and the CreateMessages ctor are given
                                                   // the same passed param. Hence they share
                                                   // a single MessageBuffer instance.

    HANDLE  hthProducer;
    unsigned  uiProducerThreadID;
    hthProducer = (HANDLE)_beginthreadex( NULL,
                                          0,
                                          CreateMessages::CreateMessagesStaticEntryPoint,
                                          cm,
                                          CREATE_SUSPENDED,
                                          &uiProducerThreadID );
    if ( hthProducer == 0 )
        printf("Failed to create producer thread\n");

    GetExitCodeThread( hthProducer, &dwExitCode );  // should be STILL_ACTIVE = 0x00000103 = 259
    printf( "initial Producer thread exit code = %u\n", dwExitCode );

    ResumeThread( hthProducer );   // Jaeschke's cmt->Start()

    // Now that all threads are running, we wait for the producer thread
    // to finish its work and die.

    // Unlike the consumer thread which attempts to live forever,
    // this producer thread commits suicide after performing a
    // specific amount of work.

    WaitForSingleObject( hthProducer, INFINITE );  // Jaeschke's cmt->Join()

    GetExitCodeThread( hthProducer, &dwExitCode );
    printf( "Producer thread exited with code %u\n", dwExitCode );

    // We now explicitly command the consumer thread to die.

    pm->DieDieDie();       // Jaeschke's pmt->Interrupt()

    // We now wait for the consumer thread to die.

    WaitForSingleObject( hthConsumer, INFINITE );  // Jaeschke's pmt->Join()

    GetExitCodeThread( hthConsumer, &dwExitCode );
    printf( "Consumer thread exited with code %u\n", dwExitCode );

    // The handle returned by _beginthreadex() has to be closed
    // by the caller of _beginthreadex().

    CloseHandle( hthProducer );
    CloseHandle( hthConsumer );

    delete cm;
    cm = NULL;

    delete pm;
    pm = NULL;

    delete  m;
    m = NULL;

    printf("Primary thread terminating.\n");
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
United States United States
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions