Hi, I have wrote this code, for processing images with threads and concurrent queues...I have create a thread-safe queue template, with mutex for manage reading/writing of queue.
Now, I have introduced CRITICAL SECTION and CONDITION VARIABLES for signaling work...but the situation is that all thread (except the first for clear reason) doing only 1 operation.
This is the code:
#include <opencv\cv.h>
#include <opencv\highgui.h>
#include <opencv2\highgui\highgui.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <process.h>
#include <queue>
using namespace std;
using namespace cv;
template<typename T>
class coda_concorr
{
private:
std::queue<T> la_coda;
HANDLE mutex;
public:
bool elemento;
coda_concorr()
{
mutex = CreateMutex(NULL,FALSE,NULL);
}
~coda_concorr()
{}
void push(T& data)
{
WaitForSingleObject(mutex,INFINITE);
la_coda.push(data);
ReleaseMutex(mutex);
}
bool vuota() const
{
WaitForSingleObject(mutex,INFINITE);
bool RetCode = la_coda.empty();
ReleaseMutex(mutex);
return RetCode;
}
bool try_pop(T& popped)
{
WaitForSingleObject(mutex,INFINITE);
while (la_coda.empty()){
ReleaseMutex(mutex);
return false;
}
WaitForSingleObject(mutex,INFINITE);
popped = la_coda.front();
la_coda.pop();
ReleaseMutex(mutex);
return true;
}
};
struct Args
{
coda_concorr<cv::Mat> in;
coda_concorr<cv::Mat> *out; };
CONDITION_VARIABLE NonVuoto1;
CONDITION_VARIABLE NonVuoto2;
CONDITION_VARIABLE NonVuoto3;
CONDITION_VARIABLE NonVuoto4;
CRITICAL_SECTION Lock1;
CRITICAL_SECTION Lock2;
CRITICAL_SECTION Lock3;
CRITICAL_SECTION Lock4;
bool stop;
void puts (void* param){
Args* arg = (Args*)param;
int i=0;
Mat image;
while(!arg->in.vuota()){
arg->in.try_pop(image);
arg->out->push(image);
i++;
WakeConditionVariable(&NonVuoto1);
}
cout<<endl<<"Thread (PUSH) terminato con "<<i<<" elaborazioni."<<endl;
WakeConditionVariable(&NonVuoto1);
_endthread();
}
void grey (void *param){
Mat temp1,temp2;
int add = 0;
Args* arg = (Args*)param;
while(true){
EnterCriticalSection(&Lock1);
while(arg->in.vuota() && !stop){
SleepConditionVariableCS(&NonVuoto1,&Lock1,INFINITE);
}
if(stop==true){
LeaveCriticalSection(&Lock1);
break;
}
arg->in.try_pop(temp1);
cvtColor(temp1,temp2,CV_BGR2GRAY);
arg->out->push(temp2);
add++;
cout<<endl<<"grey ha fatto: "<<add<<endl;
LeaveCriticalSection(&Lock1);
WakeConditionVariable(&NonVuoto2);
}
cout<<endl<<"Thread (GREY) terminato con "<<add<<" elaborazioni."<<endl;
_endthread();
}
void soglia(void *param){
Mat temp1a,temp2a;
int add=0;
Args* arg = (Args*)param;
while(true){
EnterCriticalSection(&Lock2);
while(arg->in.vuota() && stop == false){
SleepConditionVariableCS(&NonVuoto2,&Lock2,INFINITE);
}
if(stop==true){
LeaveCriticalSection(&Lock2);
break;
}
arg->in.try_pop(temp1a);
threshold(temp1a,temp2a,128,255,THRESH_BINARY);
arg->out->push(temp2a);
add++;
LeaveCriticalSection(&Lock2);
WakeConditionVariable(&NonVuoto3);
cout<<endl<<"soglia ha fatto: "<<add<<endl;
}
cout<<endl<<"Thread (SOGLIA) terminato con "<<add<<" elaborazioni."<<endl;
_endthread();
}
void finitura(void *param){
Mat temp1b,temp2b,temp2c;
int add = 0;
Args* arg = (Args*)param;
while(true){
EnterCriticalSection(&Lock3);
while(arg->in.vuota() && stop == false){
SleepConditionVariableCS(&NonVuoto3,&Lock3,INFINITE);
}
if(stop==TRUE){
LeaveCriticalSection(&Lock3);
break;
}
arg->in.try_pop(temp1b);
erode(temp1b,temp2b,cv::Mat());
dilate(temp2b,temp2c,Mat());
arg->out->push(temp2c);
add++;
LeaveCriticalSection(&Lock3);
WakeConditionVariable(&NonVuoto4);
cout<<endl<<"erode ha fatto: "<<add<<endl;
}
cout<<endl<<"Thread (ERODE) terminato con "<<add<<" elaborazioni."<<endl;
_endthread();
}
void contorno (void *param){
Mat temp;
int add=0;
Args* arg = (Args*)param;
while(true){
EnterCriticalSection(&Lock4);
while(arg->in.vuota() && stop == false){
SleepConditionVariableCS(&NonVuoto4,&Lock4,INFINITE);
}
if(stop==TRUE){
LeaveCriticalSection(&Lock4);
break;
}
arg->in.try_pop(temp);
vector<vector<Point>> contorni;
findContours(temp,contorni,CV_RETR_LIST, CV_CHAIN_APPROX_SIMPLE);
Mat dst(temp.size(), CV_8UC3, Scalar(0,0,0));
Scalar colors[3];
colors[0] = Scalar(255,0,0);
colors[1] = Scalar(0,255,0);
colors[2] = Scalar(0,0,255);
for (size_t idx = 0; idx < contorni.size(); idx++){
drawContours(dst,contorni,idx,colors[idx %3]);
}
arg->out->push(dst);
add++;
cout<<endl<<"cont ha fatto: "<<add<<endl;
LeaveCriticalSection(&Lock4);
}
cout<<endl<<"Thread (CONTOUR) terminato con "<<add<<" elaborazioni."<<endl;
_endthread();
}
int main()
{
coda_concorr<cv::Mat> ingresso;
coda_concorr<cv::Mat> uscita;
InitializeConditionVariable(&NonVuoto1);
InitializeConditionVariable(&NonVuoto2);
InitializeConditionVariable(&NonVuoto3);
InitializeConditionVariable(&NonVuoto4);
InitializeCriticalSection(&Lock1);
InitializeCriticalSection(&Lock2);
InitializeCriticalSection(&Lock3);
InitializeCriticalSection(&Lock4);
LARGE_INTEGER count1, count2, freq;
double elapsed;
Mat temp[10];
Mat out;
Args dati0,dati1,dati2,dati3,dati4;
QueryPerformanceFrequency(&freq);
QueryPerformanceCounter (&count1);
for(int i=0;i<10;i++){
temp[i] = imread("C:/OPENCV/Test/imgtest/bird1.jpg",1);
ingresso.push(temp[i]);
}
dati0.in=ingresso;
dati0.out=&dati1.in;
dati1.out=&dati2.in;
dati2.out=&dati3.in;
dati3.out=&dati4.in;
dati4.out=&uscita;
HANDLE handle0,handle1,handle2,handle3,handle4;
handle0 = (HANDLE) _beginthread(puts,0,&dati0);
handle1 = (HANDLE) _beginthread(grey,0,&dati1);
handle2 = (HANDLE) _beginthread(soglia,0,&dati2);
handle3 = (HANDLE) _beginthread(finitura,0,&dati3);
handle4 = (HANDLE) _beginthread(contorno,0,&dati4);
cout<<endl<<"..Join dei threads..."<<endl;
WaitForSingleObject(handle0,INFINITE);
WaitForSingleObject(handle1,INFINITE);
WaitForSingleObject(handle2,INFINITE);
WaitForSingleObject(handle3,INFINITE);
WaitForSingleObject(handle4,INFINITE);
QueryPerformanceCounter (&count2);
CloseHandle(handle0);
CloseHandle(handle1);
CloseHandle(handle2);
CloseHandle(handle3);
CloseHandle(handle4);
elapsed = (count2.QuadPart - count1.QuadPart) * 1000.0 / freq.QuadPart;
cout <<endl<<"Tempo di esecuzione approssimativo: " <<elapsed<<" ms."<<endl;
system("PAUSE");
return 0;
}
The concurrency don't work: This program is multiple producer/multiple consumer, because every threads is both a producer and consumer (consumer for a queue, producer for another queue)...in my opinion the problem is not in the queue code, but in the while cicle of the threads...but I don't know how fix this problem...introducing another check? How?
Please help me...I'm desperate...