Click here to Skip to main content
15,937,602 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
I have not yet solved my problem with multi-threading and would like to have another go at this issue.
My Thread Calling function now looks as follows: The difference to previous attempts is that I have created a Mutex Object assuming that this is the method to ensure synchronization. Now the routine is running without a pause but in a purely sequential mode. The calculation is in fact running on different treads but not in parallel.

This is the Thread Calling Function:

C++
bool MC_Simulation::threads_calling_function(	const char* bond_data_file, 
const char* portfolio_data_file,int no_portfolio){

        // MAX_THREADS is global variable 
	HANDLE			aThread[MAX_THREADS]; 
	DWORD			dwThreadID[MAX_THREADS];
	PTHREAD_DATA		pthread_arg[MAX_THREADS];
	int			scenario_range[MAX_THREADS+1];

	// evaluating beginning and end scenarios for each thread
        // No_Scenarios is Class Information
	scenario_range[0]=0;
	for(int i=1;i<=MAX_THREADS;i++){
                  scenario_range[i] = scenario_range[i-1] +
                  No_Scenarios/MAX_THREADS + (No_Scenarios%MAX_THREADS < i); 
		}
		scenario_range[MAX_THREADS] = No_Scenarios;

		// Create a mutex with no initial owner
		ghMutex = CreateMutex( 
			NULL,              // default security attributes
			FALSE,             // initially not owned
			NULL);             // unnamed mutex
                // check error...
		if (ghMutex == NULL).... 
		

		// create worker threads
		for (int index=0; index<max_threads;index++){>
                   // allocate memory
                   pthread_arg[index] = (PTHREAD_DATA) HeapAlloc(GetProcessHeap(), 
                                        HEAP_ZERO_MEMORY,
                			sizeof(THREAD_DATA));

                   // set the information for each thread 
			
		pthread_arg[index]->first_scenario  = scenario_range[index];
		pthread_arg[index]->last_scenario   = scenario_range[index+1];
		pthread_arg[index]->no_portfolio    = no_portfolio;
		pthread_arg[index]->mc_simulation_p  = this;

		// each thread is supposed to write data to its own file 
		sprintf(pthread_arg[index]->bond_data_file,
                       "%s_%d",bond_data_file,index);
		sprintf(pthread_arg[index]->portfolio_data_file,
                       "%s_%d",portfolio_data_file,index);

// Create the thread handles
aThread[index] = CreateThread(NULL, 0, thread_function , pthread_arg[index], 0, &dwThreadID[index]);
                       // check error
			if(aThread[index] ==NULL).....
		}    // end for loop

	// Wait until all threads have terminated.
       	WaitForMultipleObjects(MAX_THREADS, aThread, TRUE, INFINITE);

	   // close threads and mutex handles
	   for(int i=0; i<max_threads;>		CloseHandle(aThread[i]);
		if(pthread_arg[i] != NULL){
		    HeapFree(GetProcessHeap(), 0, pthread_arg[i]);
		     pthread_arg[i] = NULL;    // Ensure address is not reused.
	        }
	    }

	   CloseHandle(ghMutex);

	   return true;
}

End of Thread Calling Function

My Thread Function looks like that:

C++
DWORD WINAPI MC_Simulation::thread_function(LPVOID arg){


   DWORD	dwWaitResult; 
   int		current_thread_id = GetCurrentThreadId();
   bool		success = true;

   PTHREAD_DATA			pDataArray;
   pDataArray =			(PTHREAD_DATA) arg;
   char				bond_data_file[_MAX_FNAME];
   char				portfolio_data_file[_MAX_FNAME];

   const int			no_portfolio	= pDataArray->no_portfolio;
   const long			first_scenario	= pDataArray->first_scenario;
   const long			last_scenario	= pDataArray->last_scenario;

   // each thread is calculating independently and is not changing
   strcpy(bond_data_file,pDataArray->bond_data_file);
   strcpy(portfolio_data_file,pDataArray->portfolio_data_file);
    
 
   const MC_Simulation*	mc_simulation_p = pDataArray->mc_simulation_p;


    // Request ownership of mutex.
 	dwWaitResult = WaitForSingleObject( 
			   ghMutex,    // handle to mutex
			   INFINITE);  // no time-out interval

	switch (dwWaitResult) 
       	 {
		// The thread got ownership of the mutex
            	case WAIT_OBJECT_0: 
               	 __try { 
                    
                    // TODO: Do the monte carlo Calculations
                    //  mc_simulation_p is the pointer to MyClass
		      success = :: bond_portfolio_valuation(no_portfolio,
                                   bond_data_file, 
                                    portfolio_data_file, 
                                    first_scenario, 
                                   last_scenario, 
                                   mc_simulation_p);
                    // check which thread is operating:
      		 printf("Now working with Thread %d...\n", 
                         GetCurrentThreadId());

                }  // end try

                 __finally { 
                // Release ownership of the mutex object
                 if (!ReleaseMutex(ghMutex)) 
                    { 
                     // Handle error.
                      ::ErrorHandler(TEXT("Mutex can not be released"));
                     } 
               	 }  // end finally
             	 break; 

                 // The thread got ownership of an abandoned mutex
            	 // The database is in an indeterminate state
           	 case WAIT_ABANDONED: 
                	return FALSE; 

	} // end switch statement

return success;
}	// mythread function

Can possibly somebody give the final hint, or even complement the routine such that the threads run in parallel and not each on its own and in sequence? This would be actually great!
Posted
Updated 28-Oct-12 14:08pm
v3
Comments
Chuck O'Toole 28-Oct-12 20:42pm    
Of course they run sequentially, each thread waits for the global mutex before doing its work and then released the mutex when done. Ergo only one thread is doing computation at a time.

PS, why do you create the one global mutex every time in the main loop that create the threads? How many Global Mutex objects do you need? If it's more than one, why do you store the Mutex Handle in a single variable?
Sergey Alexandrovich Kryukov 28-Oct-12 23:40pm    
...and that defeats the purpose of those threads. I don't really know how to help here -- OP tends to use some wrong patterns. Perhaps explaining the problem starting from the very top level can help.
--SA

I should either write a full threading tutorial for you or write the code that isn't possible from what you have shown and I don't really want to do that anyway. Your problem is that the global ::bond_portfolio_valuation() function has to be thread safe without locking but currently it isn't thread safe so it must be guarded by a lock externally.
Solution: make the ::bond_portfolio_valuation() function threadsafe and remove the lock (mutex). You can do that by making sure that the ::bond_portfolio_valuation() doesn't use other data than its input parameters and the location where it stores the result data plus some non-static local variables on the stack and the same must be true for all functions that it calls directly or indirectly.

Its often isn't fruitful to start more threads than the number of cores because your program wont be faster, it will rather be slower than a correct implementation. As an edge case: if you have just one core then a sequential execution is faster than a multithreaded, and the same is true for multicore systems. Don't start 16 threads where you have just 2 or 4 cores... (Of course this isn't true if most of the threads are just waiting for blocking IO but in your case this isn't true.)

In case where I have to process the same items on multicore systems I usually separate two cases:
1. the number of processable items is known (like in your case)
2. the number of processable items is unknown

In if the number of processable items is unknown I use a blocking message queue the receives the item on its input side and outputs the items to all threads on the other side.

In your case with fixed number of processable items I usually do the following:
(WARNING!!! Pseudocode written here in firefox without syntax checking - read at your own risk!!!)
C++
InputStruct thread_input[ITEM_COUNT];
OutputStruct thread_output[ITEM_COUNT];
LONG items_left = ITEM_COUNT;
LONG running_threads = THREAD_COUNT;
HANDLE hFinishedEvent = ::CreateEvent(NULL, TRUE, FALSE, NULL);

// returns false if there are no more items
bool GetNextItemToProcess(const InputStruct*& input, OutputStruct*& output)
{
    LONG index = InterlockedDecrement(&items_left);
    if (index < 0)
        return false;
    input = &thread_input[index];
    output = &thread_output[index];
    return true;
}

DWORD WINAPI ThreadProc(LPVOID arg)
{
    for (;;)
    {
        const InputStruct* input;
        OutputStruct* output;
        if (!GetNextItemToProcess(input, output))
            break;

        // Do the processing here, call only thread safe functions
    }

    if (0 == InterlockedDecrement(&thread_count))
        ::SetEvent(hFinishedEvent);
    return 0;
}

void MainControllerThread()
{
    if (ITEM_COUNT <= 0)
        return;

    // TODO: prepare input/output parameters for your threads
    // and reset the other variables too (items_left, ...)

    // Note that thread count should be the number of cores on
    // your system. Even if you have 100 items to process you
    // should create at most 2 threads if you have only 2 cores.
    DWORD thread_id;
    for (int i=0; i<THREAD_COUNT; ++i)
    {
        HANDLE hThread = ::CreateThread(NULL, 0, ThreadProc, NULL, 0, &thread_id);
        if (!hThread)
            YourFatalErrorExitFunc();
        else
            ::CloseHandle(hThread);
    }

    // your main thread waits here until all items are processed
    ::WaitForSingleObject(hFinishedEvent, INFINITE);
}

Note that this isn't C++ code but I intentionally wrote C because it seems that your code is C too, it has just been put in a "namespace" that doesn't make it C++ at all.

Here the synchronization is done with the relative lightweight InterlockedDecrement() and an event on which the main thread waits. In case of a lot of relatively short tasks you can try to decrement the number of InterlockedDecrement() calls by packing more tasks into one task - this is dependent on the scenario, needs experimenting and profiling before releasing your app because there is no single optimal solution.
 
Share this answer
 
Comments
Sergey Alexandrovich Kryukov 28-Oct-12 23:42pm    
Good reply, my 5. I don't really know how effectively help OP who is seemingly lost in wrong patterns defeating the purpose of threads.
--SA
pasztorpisti 29-Oct-12 5:58am    
Thank you! I'm afraid you are right.
MC_Simulation::thread_function() waits for ghMutex, therefore max_threads started, but its execution will be in sequential. When thread 1 executes bond_portfolio_valuation, all other threads waits until thread 1 calls ReleaseMutex(ghMutex). Therefore all threads will be executed in sequential.

Synchronization between threads is required only when any data is accessed by other threads at same time.
Please limit the usage of synchronization to the shared data.
One suggestion, Critical section is enough for synchronization between threads in a process.
 
Share this answer
 
you're creating what appears to be a global mutex (ghMutex) in every iteration of the loop - that seems wrong - and you'll be leaking handles

Then each of your threads is waiting for that mutex - so they are going to form an orderly queue and go in one at a time ... a mutex can only be owned singularly ....

Do you perhaps want to use CreateEvent[^] instead? Create it before the loop, unset, and then set it out of the loop?
 
Share this answer
 
Comments
pasztorpisti 28-Oct-12 21:39pm    
Mentioning the event is a good point. An auto reset event is a primitive semaphore (if unnamed, its more lightweight then an interprocess windows semaphore) with with max_count==1 so it stands its ground as a mutex/criticalsection alternative in this case but I don't give a 5 because it doesn't answer the question.
barneyman 29-Oct-12 0:48am    
a manual reset event would act as a 'gate' for all threads and be exactly the answer that was requested ...
pasztorpisti 29-Oct-12 5:52am    
The request was implementing parallel execution.
barneyman 30-Oct-12 1:30am    
glad we agree

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900