To be honest I haven't checked out all your code just check out what synchronization primitives are you using in coda_concorr and that is already suspicious. The same is true for your thread funcs that use ugly lockings and antipaterns like "try". You are not using the right synchronization primitives for sure. Depending on the circumstances your solution may have not only technical problems (bugs) but huge conceptual ones too. Lets say we overlook the conceptual problems of your solutions and we focus only on the bugs of your current solution. What you need here is a single producer single consumer blocking message queue class that you can put in between your threads. I guess coda_concorr wants to be that stuff. To give you some boost in your progress here is one such blocking queue that has several optimizations and uses the right synchronization primitives on windows:
template <typename T>
class BlockingQueue
{
public:
BlockingQueue()
{
m_AddedItemEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
InitializeCriticalSection(&m_Lock);
m_SwappedItemIndex = 0;
}
~BlockingQueue()
{
assert(m_Items.empty() && m_SwappedItemIndex >= m_SwappedItems.size());
CloseHandle(m_AddedItemEvent);
DeleteCriticalSection(&m_Lock);
}
void Produce(const T& item)
{
EnterCriticalSection(&m_Lock);
m_Items.push_back(item);
SetEvent(m_AddedItemEvent);
LeaveCriticalSection(&m_Lock);
}
void Consume(T& item)
{
if (m_SwappedItemIndex >= m_SwappedItems.size())
{
m_SwappedItemIndex = 0;
m_SwappedItems.clear();
WaitForSingleObject(m_AddedItemEvent, INFINITE);
EnterCriticalSection(&m_Lock);
ResetEvent(m_AddedItemEvent);
m_Items.swap(m_SwappedItems);
LeaveCriticalSection(&m_Lock);
}
item = m_SwappedItems[m_SwappedItemIndex++];
}
private:
CRITICAL_SECTION m_Lock;
HANDLE m_AddedItemEvent;
std::vector<T> m_Items;
std::vector<T> m_SwappedItems;
size_t m_SwappedItemIndex;
};
Now we will take a look at the high level design of your solution to your problem. (I won't delve into the deatils and won't write here many books about multithreading). Facts: multithreading speeds up the execution of your program if the hardware supports multithreading and your program uses multithreading correctly. Making a program multithreading "just because" it said to speed up execution (and because its a fashion these days) has no point - in that case you are just making headaches for yourself and you sacrifice the sequential execution and deterministic behaviour and ease of debuggability of your program. (Of course in some cases you are creating background thread not to gain speed but to convert synchronous calls to async ones but that is another story...). Of course your problem maybe be quite well suited for multithreading but that should be debugged out since most of the heavy weighlifting will probably be done by opencv. You should write your program in a way so that it can be executed both sequentially (single threaded) and in multithread mode. Switching between the two modes can be compile time (with a #define) or runtime (with a bool variable). This is important because this way you can compare sequential execution time with the multithreaded one on a specific machine with specific number of cores (and optionally hyperthreads). Lets say you have a machine with 8 cores and your multithreaded version runs 6x of the speed of the sequential one, in this case you have done pretty well with multithreading! (I said only 6xspeed and not 8xspeed because the rest of the computation capacity has been spent on waiting on locks and on the management of common hardware resources between the cores (like memory and cache).) Another good thing about having both single and multithread version is that in my opinion this helps many people to organize and design more clear multithreaded code and forces you on the clear separation of individual jobs. In case of multithreading always think about separating your tasks into separate job classes that will be executed "on a thread somewhere" somehow and not about threads that randomly execute thread functions here and there. If your code is clear then the difference between the single/multi threaded version of your program will be only the central core logic that executes the jobs somehow somewhere that is only a little piece of code.
After the introduction lets talk a bit about your problem that is quite similar to a lot of other multithreading problems. You have items to process and processing consists of performing several operations (one after the other) on each item. Lets talk first about the single threaded version with no multithreading involved! Even in this case you have several choices because you have many items and many operations to perform on each item. One solution is taking one item and processing it by performing every operation on it one after the other. The other option is performing the first operation on every item, then performing the second operation on every item, and so on. This second solution may not be obvious at first, many people would start with the first option but the second is often better! The reason for this is that performing the first operation many times is faster than performing opeartion1, then opeartion2, and then operation3 and so on because the code of the same operation usually accesses more or less the same memory areas that results in less cache misses. Of course sometimes you can not afford that you process all items in one batch by performing operation1 on all items then operation2 on all items and so on because this means that all items will be ready almost all at once, sometimes you have to balance between the previously mentioned solutions by separating your items into smaller batches so first you execute operation1 in one smaller set of items, then you execute operation2 on this smaller set of items,... You can balance between the two if you need regular output by this system to be processed by another system but this is already optimization...
Multithreaded version: Your solution performs each operation in parallel on its own thread. We could make it more colorful by executing each operation by several threads, namely on a thread pool. The other way to the same problem (similarly to the single threaded version) is first performing operation1 on all items on a thread pool (with multiple threads) and then perofming operation2 an all items on a thread pool (usually the very same thread pool). This latter solution has several benefits, if not only benefits over the first version (like in case of the single threaded version). Performing only the same operation on multiple threads on multiple items results in less cache misses, better locality of reference and lets talk about the other advantages of this solution over the one that executes X number of operations on Y number of threads:
If you execute only one operation at a time on multiple threads by processing all of your items you can easily control the number of threads in the thread pool. Usually you want only one thread pool globally in your system and that thread pool usually contains only as many thread as the number of cores in your system (sometimes depending on the type of jobs its not a good idea to create thread for hyperthreads of your cores). All you do is putting together a lot of jobs (lets say 1000 jobs that perform operation 1 on all items) you feed it into the thread pool and sometime later the thread pool will process all of them in some random order. When all jobs are ready you put together another batch of jobs that perform operation2 on all items and then you feed it into the thread pool. Sometimes later all the jobs become ready. All you have is a master/main thread and several worker threads in your thread pool. Another thing I like about this solution is that making it single threaded is super easy: You create a thread pool that works this way: It has no worker threads, when the main thread calls the AddJobs() method of the thread pool the AddJobs() method executes all jobs immediately by the main thread and returns only after that! If you execute all operations in parallel like you do you have much less options to control the number of threads! You have to create at least as many threads as the number of operations, maybe if you have a lot of cores you can create more than one thread for some opeartions but thats all!
Lets continue the discussion of the execution of a "single operation on a single threadpool" solution:
The code involved in performing an operation will usually access "global" code and data that is used by the code of both operation1 and operation2 and operationX. These shared things can be global functions, global variables/objects. These all increase the number of cache misses. The most obvious shared data between these operations is the queue that connects two operations. If you use the better solution and execute only operation1 on a thread pool then the queue will only be written by many threads and then later it will only be read by another thread. The queue is still accessed by many threads but only its producer side, and maybe later only on its consumer side but usually the items are get out all at once by the main/coordinator thread! It never happens that both the producer and consuer side is accessed in parallel by threads! This allows for very nice optimizations in the queue!
When you put together a large number of jobs (lets say the jobs that execute operation1 on all items) and put them into the threadpool for processing (you do this from the main thread) then somehow you have to be able to wait on the main thread for the end of processing of all jobs by the threadpool. Putting this wait code into the threadpool is a bad idea, never complicate clean conceptual units like a thread pool with stupid operations like WaitForAll()!!! What I usually do is that I put a wait for all operation into my queue implementation that holds the items between two operations. I do the following on my main thread:
- preparing the queue between operation1 and operation2 to be able to receive produced items from operation1 (easy because currently no threads are accessing the queue)
- prepare jobs that perform operation1 on all items and putting it to the thread pool for processing
- waiting for the queue between operation1 and operation2 to receive all produced items
- getting out the results of operation1 from the queue (easy because currently no threads are accessing the queue)
- preparing the queue between operation2 and operation3 to be able to receive produced items from operation2 (easy because currently no threads are accessing the queue)
- prepare jobs (from the results we just got out from the queue) that perform operation2 on all items and putting it to the thread pool for processing
- ...
Here is something that looks like a queue I was talking about:
template <typename T>
class ExampleQueue
{
public:
ExampleQueue()
{
m_PrevIndex = -1;
m_MaxIndex = 0;
m_FinishedEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
}
~ExampleQueue()
{
CloseHandle(m_FinishedEvent);
}
void PrepareForProduction(int max_items)
{
m_PrevIndex = -1;
m_MaxIndex = (LONG)(max_items - 1);
m_Items.resize((size_t)max_items);
ResetEvent(m_FinishedEvent);
}
void Produce(const T& item)
{
LONG index = InterlockedIncrement(&m_PrevIndex);
m_Items[(size_t)index] = item;
if (index == m_MaxIndex)
SetEvent(m_FinishedEvent);
}
void GetAllItems(std::vector<T>& items)
{
m_Items.swap(items);
m_Items.clear();
}
private:
LONG m_PrevIndex;
LONG m_MaxIndex;
std::vector<T> m_Items;
HANDLE m_FinishedEvent;
};
Another advice: make your multithreading code object oriented (lock, event, queue, thread, threadpool, job, jobgroup classes....).
EDIT: In multithreading usually there is no good solution. You have to profile your program and tweak usually the number of threads and the size of processed batches to perform well on a given hardware that has given amount of resources.
EDIT: A small example program about the usage of
BlockingQueue
plus I fixed the
BlockingQueue
implementation as it was buggy:
class CJob
{
public:
virtual void Execute() = 0;
};
BlockingQueue<CJob*> g_Queue1;
BlockingQueue<CJob*> g_Queue2;
class COperation2 : public CJob
{
public:
virtual void Execute() override
{
printf("operation2\n");
delete this;
}
};
class COperation1 : public CJob
{
public:
virtual void Execute() override
{
printf("operation1\n");
g_Queue2.Produce(new COperation2);
delete this;
}
};
DWORD WINAPI ThreadFunc(LPVOID param)
{
BlockingQueue<CJob*>& queue = *(BlockingQueue<CJob*>*)param;
for (;;)
{
CJob* job;
queue.Consume(job);
if (!job)
break;
job->Execute();
}
return 0;
}
void ManageThreads()
{
DWORD thread_id;
HANDLE thread1 = CreateThread(NULL, 0, ThreadFunc, &g_Queue1, 0, &thread_id);
assert(thread1);
HANDLE thread2 = CreateThread(NULL, 0, ThreadFunc, &g_Queue2, 0, &thread_id);
assert(thread2);
g_Queue1.Produce(new COperation1);
g_Queue1.Produce(new COperation1);
g_Queue1.Produce(new COperation1);
g_Queue1.Produce(NULL);
WaitForSingleObject(thread1, INFINITE);
g_Queue2.Produce(NULL);
WaitForSingleObject(thread2, INFINITE);
}
EDIT:
The exact same example as the previous but in an object oriented way without global variables:
class Thread
{
public:
Thread()
{
m_hThread = NULL;
}
~Thread()
{
if (m_hThread)
{
assert(!IsRunning());
CloseHandle(m_hThread);
}
}
void Start()
{
assert(!IsRunning());
DWORD thread_id;
m_hThread = CreateThread(NULL, 0, StaticThreadProc, this, 0, &thread_id);
assert(m_hThread);
}
void WaitForExit()
{
assert(m_hThread);
WaitForSingleObject(m_hThread, INFINITE);
}
protected:
virtual void Run() = 0;
private:
bool IsRunning()
{
return m_hThread && WaitForSingleObject(m_hThread, 0)==WAIT_TIMEOUT;
}
static DWORD WINAPI StaticThreadProc(LPVOID param)
{
Thread* thread = (Thread*)param;
thread->Run();
return 0;
}
private:
HANDLE m_hThread;
};
class CJob
{
public:
virtual void Execute() = 0;
};
class JobExecutorThread : public Thread
{
public:
JobExecutorThread()
{
m_ExitRequested = false;
}
bool AddJob(CJob* job)
{
if (m_ExitRequested)
return false;
assert(job);
m_JobQueue.Produce(job);
return true;
}
void RequestExit()
{
assert(!m_ExitRequested);
if (!m_ExitRequested)
m_JobQueue.Produce(NULL);
}
protected:
virtual void Run() override
{
for (;;)
{
CJob* job;
m_JobQueue.Consume(job);
if (job)
job->Execute();
else
break;
}
}
private:
bool m_ExitRequested;
BlockingQueue<CJob*> m_JobQueue;
};
class COperation2 : public CJob
{
public:
virtual void Execute() override
{
printf("operation2\n");
delete this;
}
};
class COperation1 : public CJob
{
public:
COperation1(JobExecutorThread* operation2_thread)
: m_Operation2Thread(operation2_thread)
{}
virtual void Execute() override
{
printf("operation1\n");
m_Operation2Thread->AddJob(new COperation2);
delete this;
}
private:
JobExecutorThread* m_Operation2Thread;
};
void ManageThreads()
{
JobExecutorThread operation1_thread;
JobExecutorThread operation2_thread;
operation1_thread.Start();
operation2_thread.Start();
operation1_thread.AddJob(new COperation1(&operation2_thread));
operation1_thread.AddJob(new COperation1(&operation2_thread));
operation1_thread.AddJob(new COperation1(&operation2_thread));
operation1_thread.RequestExit();
operation1_thread.WaitForExit();
operation2_thread.RequestExit();
operation2_thread.WaitForExit();
}
The object oriented example may look a bit more but it contains no global variables and most of the boilerplate code is reusable code that would reside in a central library and reusability pays off well if you are using threading at more places in your program. Another benefit of the object oriented approach is that the platform specific stuff (thread creation, etc..) is wrapped into classes that can be ported easily to other platforms, you don't have to touch the core logic (the ManageThreads() function/method) the that coordinates the work of the threads.