Click here to Skip to main content
15,886,806 members
Please Sign up or sign in to vote.
3.22/5 (2 votes)
Hi,
I have a problem. I have to create a program that manage 4 queues, 3 threads, like in the diagram that I've linked.

THIS IS A DIAGRAM OF WHAT I HAVE TO DO:

http://img839.imageshack.us/img839/3196/schemah.jpg[^]

I have started to work on, but concurrency with "shared" queue works only with 2 threads and three don't.

I post here my code, wait for your response.Watch my code, give me an idea on what's wrong.

I HAVE UPDATE MY CODE NOW..Please help me. Thanks in advance.

C++
#include <opencv\cv.h>
#include <opencv\highgui.h>
#include <stdio.h>
#include <windows.h>
#include <process.h>
#include <queue>
#include <time.h>

using namespace std;
using namespace cv;


/*inizio della parte relativa alla coda thread safe*/

template<typename T>
class coda_concorr
{
private:
	std::queue<T> la_coda;
	HANDLE mutex;
public:
	bool complete;
	coda_concorr()
	{
		mutex = CreateMutex(NULL,FALSE,NULL);
		complete = false;
	}
	void push(T& data)
	{
		WaitForSingleObject(mutex,INFINITE);
		la_coda.push(data);
		ReleaseMutex(mutex);
	}
	bool vuota() const
	{
		bool RetCode;
		WaitForSingleObject(mutex,INFINITE);
		RetCode= la_coda.empty();
		ReleaseMutex(mutex);
		return RetCode;
	}

	bool try_pop(T& popped)
    {
        WaitForSingleObject(mutex,INFINITE);
        while (la_coda.empty())
        {
            if (complete)
            {
                ReleaseMutex(mutex);
                return false;
            }
            else
            {
                ReleaseMutex(mutex);
                Sleep(100); // FUTURE IMPROVEMENT
                WaitForSingleObject(mutex,INFINITE);
            }
        }
 
        popped = la_coda.front();
        la_coda.pop();
        ReleaseMutex(mutex);
        return true;
    }
};

//pacchetto per passaggio ai thread
struct Args
{
	coda_concorr<cv::Mat> in;
	coda_concorr<cv::Mat> *out; //puntatore a coda successiva
};



After declare the 3 functions that I've to launch:

C++
//funzione grey
void grey (void *param){
	Mat temp1,temp2;
	Args* arg = (Args*)param;
	if(!arg->in.vuota()){
	while(arg->in.try_pop(temp1)){
	cvtColor(temp1,temp2,CV_BGR2GRAY);
	arg->out->push(temp2);
	   }
	}
	//flag completato
	arg->out->complete=true;
	_endthread();
	
}

//funzione sogliatura
void soglia(void *param){
	Mat temp1a,temp2a;
	Args* arg = (Args*)param;
		while(arg->in.try_pop(temp1a)){
		threshold(temp1a,temp2a,128,255,THRESH_BINARY);
		arg->out->push(temp2a);
		}
		//flag completato
	 arg->out->complete=true;
	_endthread();
}


//funzione erosione/dilate
void finitura(void *param){
	Mat temp1b,temp2b,temp2c;
	Args* arg = (Args*)param;
		while(arg->in.try_pop(temp1b)){
		erode(temp1b,temp2b,cv::Mat());
		//dilate(temp2b,temp2c,Mat());
		arg->out->push(temp2b);
		}
		//flag completato
	 arg->out->complete=true;
	_endthread();
}



then this is main:

int main()
{
	//dichiarazione delle due code principali
	coda_concorr<cv::Mat> ingresso;
	coda_concorr<cv::Mat> uscita;

	//clock
	clock_t avvio;
	clock_t fine;

	
	//array locali di ingresso e uscita
	Mat inn[10];
	Mat out;

	//assegnazione 10 immagini di prova
	inn[0]=imread("C:/OPENCV/Test/imgtest/bird1.jpg",1);
	inn[1]=imread("C:/OPENCV/Test/imgtest/bird2.jpg",1);
	inn[2]=imread("C:/OPENCV/Test/imgtest/bird3.jpg",1);
	inn[3]=imread("C:/OPENCV/Test/imgtest/pig1.jpg",1);
	inn[4]=imread("C:/OPENCV/Test/imgtest/pig2.jpg",1);
	inn[5]=imread("C:/OPENCV/Test/imgtest/pig3.jpg",1);
	inn[6]=imread("C:/OPENCV/Test/imgtest/spider1.jpg",1);
	inn[7]=imread("C:/OPENCV/Test/imgtest/spider2.jpg",1);
	inn[8]=imread("C:/OPENCV/Test/imgtest/spider3.jpg",1);
	inn[9]=imread("C:/OPENCV/Test/imgtest/Nutella.jpg",1);

	
	//dichiarazione code
	Args dati,dati2,dati3,dati4;

	//inizio a contare tempo
	avvio = clock();

	//Popolamento coda ingresso
	for(int i=0;i<=9;i++){
		dati.in.push(inn[i]);
	}
	//punta all'output successivo
	dati.out=&dati2.in;
	dati2.out=&dati3.in;
	dati3.out=&uscita;
	
	//dichiarazione delle maniglie
	HANDLE handle1,handle2,handle3;

	//dichiarazione dei threads
	handle1 = (HANDLE) _beginthread(grey,0,&dati);
	handle2 = (HANDLE) _beginthread(soglia,0,&dati2);
	handle3 = (HANDLE) _beginthread(finitura,0,&dati3);

	//join
	WaitForSingleObject(handle1,INFINITE);
	WaitForSingleObject(handle2,INFINITE);
	WaitForSingleObject(handle3,INFINITE);
	
	//fine registrazione tempo
	fine = clock()-avvio;

	double tempoimp = fine / (double)CLOCKS_PER_SEC;

//output di verifica
	while (dati3.out->try_pop(out)){
		imshow("immagine",out);
		waitKey(100);
	}
	
	cout<<endl<<"Cicli di clock: "<<fine<<endl;
	cout<<"Secondi trascorsi: "<<tempoimp<<endl;

	system("PAUSE");
	return 0;

}


I don't have any output...Can someone help me?

Enviroment is: Visual C++ 2010, OpenCV 2.4.3 on WIndows 7 64bit.
Posted
Updated 6-Jun-13 21:08pm
v12
Comments
Steve44 5-Jun-13 15:25pm    
What is exactly happening in the second thread? Is it immediately terminating and not doing any work? Or is it doing work, but the results are incorrect?
Domus1919 5-Jun-13 17:11pm    
Not execute...program blocked.Watch diagram in the link that I've just posted...that it's what I have to do.
Sergey Alexandrovich Kryukov 5-Jun-13 15:36pm    
This almost all the code dump. You really need to provide detail explanation, starting with the explanation of your goals.
—SA
Domus1919 5-Jun-13 17:11pm    
Wow, nothing less is "dump" ... if I could put an outline of what I should do, I placed it. What did not you understand? I have posted a link with diagram.
Sergey Alexandrovich Kryukov 5-Jun-13 17:28pm    
Thank you for clarification. However, I don't want anything specifically, I only want to help you, if possible. This is you who wants some help, if I understand your post right.
—SA

I think I see that the blocking is caused by some bugs within your queue code:

C++
//queue code
template<typename T>
class coda_concorr
{
private:
    ...
    // incorrect code:
    bool vuota() const
    {
        WaitForSingleObject(mutex,INFINITE);
        return la_coda.empty();
        ReleaseMutex(mutex);  // The mutex is never released
                              // as the function return in the previous line and
                              // ReleaseMutex is never called.
    }
    // fixed code:
    bool vuota() const
    {
        bool RetCode;
        WaitForSingleObject(mutex,INFINITE);
        RetCode = la_coda.empty();
        ReleaseMutex(mutex);
        return RetCode;
    }

 
    // incorrect code:
    bool try_pop(T& popped)
    {
        WaitForSingleObject(mutex,INFINITE);
        if (la_coda.empty()){
            return false; // same here, the mutex is not released
        }
        WaitForSingleObject(mutex,INFINITE); // This takes the mutex a second time
                                             // It would require a second ReleaseMutex()
                                             // for proper operation
        popped = la_coda.front();
        la_coda.pop();
        ReleaseMutex(mutex);
        return true;
    }
    // fixed code:
    bool try_pop(T& popped)
    {
        WaitForSingleObject(mutex,INFINITE);
        if (la_coda.empty()){
            ReleaseMutex(mutex);
            return false;
        }
        popped = la_coda.front();
        la_coda.pop();
        ReleaseMutex(mutex);
        return true;
    }
};


<V2> Another bug:
Here the incorrect code with explanation, see underlined lines:
C++
int main()
{
	coda_concorr<cv::Mat> ingresso;
	coda_concorr<cv::Mat> coda1;
	coda_concorr<cv::Mat> coda2;
	coda_concorr<cv::Mat> uscita;
 
	
//assing second queue	
dati.out=coda1; // This creates a copy of coda1	
...
	//share part that don't WORK
	dati2.in=coda1; // This creates a second copy of coda1,
                         // it does not reference the same queue
	dati2.out=coda2;
...
 
}

So both threads are seeing different copies of coda1 while you would like to have them see the same instance, coda1.
Here a proposal to fix the issue, make the out queue a pointer so you can link Args structures:
C++
struct Args
{
	coda_concorr<cv::Mat> in;
	coda_concorr<cv::Mat> *out; // this is a pointer now, make it point to the
                                     // next queue in sequence
};


void grey (void *param){
...
            arg->out->push(temp2);
        }
        arg->out->complete=true;
...
}
//threshold funct
void soglia(void *param){
...
            arg->out->push(temp2a);
        }
    }
    arg->out->complete=true;
    _endthread();
}


int main()
{
...
	coda_concorr<cv::Mat> coda1; // Not needed
...
//assing second queue	
dati.out=&dati2.in; // This stores a pointer to the next queue	
...

	dati2.in=&coda1; // Not needed as the queues are linked, see above
	dati2.out=&coda2;
...
	//output
	while (dati2.out->try_pop(out)){
		imshow("immagine",out);
		waitKey(100);
	}
...
}


Also there is a startup issue: When the second thread starts and its input queue is empty, it immediately exits. The same happens when the input queue becomes empty during execution (i.e. the second thread executes faster than the first one).
You have to change the design of your queuing in way that you can distinguish a "queue empty due to all work done" from a "queue empty due to previous step not yet finished".
One option would be to have a queue state added that is set when the producer is done providing data, so your thread only terminates when the queue is empty and the producer has indicated that it will not provide further data, otherwise the thread waits until the queue becomes not-empty.

<V3> Add the waiting in the queue try_pop method, this should get rid of the immediate termination at startup issue:
C++
bool try_pop(T& popped)
{
    WaitForSingleObject(mutex,INFINITE);
    while (la_coda.empty())
    {
        if (complete)
        {
            ReleaseMutex(mutex);
            return false;
        }
        else
        {
            ReleaseMutex(mutex);
            Sleep(100); // FUTURE IMPROVEMENT: This can be improved by creating
                        // an event signaling that the queue is not empty.
                        // I'll keep it like this for simplicity for now.
                        // I try to avoid mixing functionality re-work with
                        // performance improvements.
            WaitForSingleObject(mutex,INFINITE);
        }
    }

    popped = la_coda.front();
    la_coda.pop();
    ReleaseMutex(mutex);
    return true;
}
 
Share this answer
 
v4
Comments
Domus1919 6-Jun-13 5:57am    
Wonderful explanation ... what do you suggest I use to have that indicator of empty queue waiting for? Because I HAVE UPDATED THE POSTED CODE in this page. The Second thread always see input queue empty. See it, and let me know how can I manage this situation...what's wrong in your opinion?
Steve44 6-Jun-13 15:17pm    
I see, there is another bug this time in the main code, I'll add this fix to the solution.
Domus1919 6-Jun-13 15:25pm    
Wait for your proposal...thank you very much for your time...I'm hopeless, because it's the first time that I work with threads...

Wait for your modify..thanks! :)
Steve44 6-Jun-13 15:45pm    
You should see it now.
Domus1919 6-Jun-13 16:42pm    
Sorry if I take advantage of you, but I have re-update code, I need to introduce a third thread, and with your system (that works for 2) programs blocked.

Can you verify my code, and please tell me where is now the problem?? Thank you very much...for all and for your kindness...
After more try, this is the solution:

/*programma di prova gestione thread e OpenCV con timer di evalutazione tempo 

impiegato v.1.0 */

#include <opencv\cv.h>
#include <opencv\highgui.h>
#include <stdio.h>
#include <windows.h>
#include <process.h>
#include <queue>
#include <time.h>

using namespace std;
using namespace cv;


/*inizio della parte relativa alla coda thread safe*/

template<typename T>
class coda_concorr
{
private:
	std::queue<T> la_coda;
	HANDLE mutex;
public:
	bool complete;
	coda_concorr()
	{
		mutex = CreateMutex(NULL,FALSE,NULL);
		complete = false;
	}
	void push(T& data)
	{
		WaitForSingleObject(mutex,INFINITE);
		la_coda.push(data);
		ReleaseMutex(mutex);
	}
	bool vuota() const
	{
		bool RetCode;
		WaitForSingleObject(mutex,INFINITE);
		RetCode= la_coda.empty();
		ReleaseMutex(mutex);
		return RetCode;
	}
	bool try_pop(T& popped)
    {
        WaitForSingleObject(mutex,INFINITE);
        while (la_coda.empty()){
            //ReleaseMutex(mutex);
            //Sleep(100);
			//WaitForSingleObject(mutex,INFINITE);
			return false;
        }
        popped = la_coda.front();
        la_coda.pop();
        ReleaseMutex(mutex);
        return true;
    }
};

//pacchetto per passaggio ai thread
struct Args
{
	coda_concorr<cv::Mat> in;
	coda_concorr<cv::Mat> *out; //puntatore a coda successiva
};

//funzione grey
void grey (void *param){
	Mat temp1,temp2;
	int add = 0;
	Args* arg = (Args*)param;
	while(arg->in.try_pop(temp1)){
	cvtColor(temp1,temp2,CV_BGR2GRAY);
	arg->out->push(temp2);
	add++;
	   }
	//flag completato
	arg->out->complete=true;
	cout<<"Thread 1 terminato con "<<add<<" elaborazioni."<<endl;
	_endthread();
	
}

//funzione sogliatura
void soglia(void *param){
	Mat temp1a,temp2a;
	int add=0;
	Args* arg = (Args*)param;
	while(arg->in.vuota()){
	Sleep(50);
	}
	while(arg->in.try_pop(temp1a)){
		threshold(temp1a,temp2a,128,255,THRESH_BINARY);
		arg->out->push(temp2a);
		add++;
		}
		//flag completato
	 arg->out->complete=true;
	 cout<<"Thread 2 terminato con "<<add<<" elaborazioni."<<endl;
	_endthread();
}


//funzione erosione/dilate
void finitura(void *param){
	Mat temp1b,temp2b,temp2c;
	int add = 0;
	Args* arg = (Args*)param;
	while(arg->in.vuota()){
	Sleep(40);
	}
	while(arg->in.try_pop(temp1b)){
		erode(temp1b,temp2b,cv::Mat());
		dilate(temp2b,temp2c,Mat());
		arg->out->push(temp2c);
		add++;
		}
		//flag completato
	 arg->out->complete=true;
	 cout<<"Thread 3 terminato con "<<add<<" elaborazioni."<<endl;
	_endthread();
}



//main
int main()
{
	//dichiarazione delle due code principali
	coda_concorr<cv::Mat> ingresso;
	coda_concorr<cv::Mat> uscita;

	//clock
	clock_t avvio;
	clock_t fine;

	
	//array locali di ingresso e uscita
	Mat inn[10];
	Mat out;
	
	//inizio a contare tempo
	avvio = clock();

	//assegnazione 10 immagini di prova
	inn[0]=imread("C:/OPENCV/Test/imgtest/bird1.jpg",1);
	inn[1]=imread("C:/OPENCV/Test/imgtest/bird2.jpg",1);
	inn[2]=imread("C:/OPENCV/Test/imgtest/bird3.jpg",1);
	inn[3]=imread("C:/OPENCV/Test/imgtest/pig1.jpg",1);
	inn[4]=imread("C:/OPENCV/Test/imgtest/pig2.jpg",1);
	inn[5]=imread("C:/OPENCV/Test/imgtest/pig3.jpg",1);
	inn[6]=imread("C:/OPENCV/Test/imgtest/spider1.jpg",1);
	inn[7]=imread("C:/OPENCV/Test/imgtest/spider2.jpg",1);
	inn[8]=imread("C:/OPENCV/Test/imgtest/spider3.jpg",1);
	inn[9]=imread("C:/OPENCV/Test/imgtest/Nutella.jpg",1);

	
	//dichiarazione code
	Args dati,dati2,dati3;

	

	//Popolamento coda ingresso
	for(int i=0;i<=9;i++){
		dati.in.push(inn[i]);
	}
	//punta all'output successivo
	dati.out=&dati2.in;
	dati2.out=&dati3.in;
	dati3.out=&uscita;
	//dichiarazione delle maniglie
	HANDLE handle1,handle2,handle3;

	//dichiarazione dei threads
	handle1 = (HANDLE) _beginthread(grey,0,&dati);
	handle2 = (HANDLE) _beginthread(soglia,0,&dati2);
	handle3 = (HANDLE) _beginthread(finitura,0,&dati3);

	//join
	WaitForSingleObject(handle1,INFINITE);
	WaitForSingleObject(handle2,INFINITE);
	WaitForSingleObject(handle3,INFINITE);
	
	
	//fine registrazione tempo
	fine = clock()-avvio;

	double tempoimp = fine / (double)CLOCKS_PER_SEC;

//output di verifica
	/*
	while(dati3.out->try_pop(out)){
		imshow("immagine",out);
		waitKey(100);
	}*/
	
	cout<<endl<<"Cicli di clock: "<<fine<<endl;
	cout<<"Secondi trascorsi: "<<tempoimp<<endl;
	system("PAUSE");
	
	
	return 0;

}


This manage 3 threads that works on shared elements, in 4 queues.
 
Share this answer
 

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900