Click here to Skip to main content
15,885,244 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
Hi, I have wrote this code, for processing images with threads and concurrent queues...I have create a thread-safe queue template, with mutex for manage reading/writing of queue.

Now, I have introduced CRITICAL SECTION and CONDITION VARIABLES for signaling work...but the situation is that all thread (except the first for clear reason) doing only 1 operation.

This is the code:

C++
#include <opencv\cv.h>
#include <opencv\highgui.h>
#include <opencv2\highgui\highgui.hpp>
#include <stdio.h>
#include <stdlib.h>
#include <windows.h>
#include <process.h>
#include <queue>

using namespace std;
using namespace cv;

/*thread safe queue*/

template<typename T>
class coda_concorr
{
private:
	std::queue<T> la_coda;
	HANDLE mutex;
	
public:	
	bool elemento;
	coda_concorr()
	{
		mutex = CreateMutex(NULL,FALSE,NULL);
		}
	~coda_concorr()
	{}
	void push(T& data)
	{
		WaitForSingleObject(mutex,INFINITE);
		la_coda.push(data);
		ReleaseMutex(mutex);
		}
	bool vuota() const
	{
		WaitForSingleObject(mutex,INFINITE);
		bool RetCode = la_coda.empty();
		ReleaseMutex(mutex);
		return RetCode;
	}
	bool try_pop(T& popped)
    {
		WaitForSingleObject(mutex,INFINITE);
		while (la_coda.empty()){
            ReleaseMutex(mutex);
			return false;
        }
		WaitForSingleObject(mutex,INFINITE);
		popped = la_coda.front();
        la_coda.pop();
        ReleaseMutex(mutex);
		return true;
    }
};


struct Args
{
	coda_concorr<cv::Mat> in;
	coda_concorr<cv::Mat> *out; //puntatore a coda successiva
};


CONDITION_VARIABLE NonVuoto1;
CONDITION_VARIABLE NonVuoto2;
CONDITION_VARIABLE NonVuoto3;
CONDITION_VARIABLE NonVuoto4;
CRITICAL_SECTION  Lock1;
CRITICAL_SECTION  Lock2;
CRITICAL_SECTION  Lock3;
CRITICAL_SECTION  Lock4;

bool stop;

//initial populating queue
void puts (void* param){
	Args* arg = (Args*)param;
	int i=0;
    Mat image;
	
	while(!arg->in.vuota()){
		arg->in.try_pop(image);
		arg->out->push(image);
		i++;		
		WakeConditionVariable(&NonVuoto1);
		}
	//fine	
	cout<<endl<<"Thread (PUSH) terminato con "<<i<<" elaborazioni."<<endl;
	WakeConditionVariable(&NonVuoto1);
	_endthread();
}

//grey funct
void grey (void *param){
	Mat temp1,temp2;
	int add = 0;
	Args* arg = (Args*)param;
	while(true){
		EnterCriticalSection(&Lock1);
		//se vuoto
		while(arg->in.vuota() && !stop){
		     SleepConditionVariableCS(&NonVuoto1,&Lock1,INFINITE);
			}
			if(stop==true){
			LeaveCriticalSection(&Lock1);
			break;
			}
		arg->in.try_pop(temp1);
		cvtColor(temp1,temp2,CV_BGR2GRAY);
		arg->out->push(temp2);
		add++;
		cout<<endl<<"grey ha fatto: "<<add<<endl;
		LeaveCriticalSection(&Lock1);
		WakeConditionVariable(&NonVuoto2);
		}
	//fine	
	cout<<endl<<"Thread (GREY) terminato con "<<add<<" elaborazioni."<<endl;
	_endthread();
}

//threshold funct
void soglia(void *param){
	Mat temp1a,temp2a;
	int add=0;
	Args* arg = (Args*)param;
	while(true){
		EnterCriticalSection(&Lock2);
		while(arg->in.vuota() && stop == false){
			 SleepConditionVariableCS(&NonVuoto2,&Lock2,INFINITE);
			}
		if(stop==true){
			LeaveCriticalSection(&Lock2);
			break;
			}
		arg->in.try_pop(temp1a);
		threshold(temp1a,temp2a,128,255,THRESH_BINARY);
		arg->out->push(temp2a);
		add++;
		LeaveCriticalSection(&Lock2);
		WakeConditionVariable(&NonVuoto3);
		cout<<endl<<"soglia ha fatto: "<<add<<endl;
		}
		//fine 
	 cout<<endl<<"Thread (SOGLIA) terminato con "<<add<<" elaborazioni."<<endl;
	 _endthread();
}

//erode/dilate funct
void finitura(void *param){
	Mat temp1b,temp2b,temp2c;
	int add = 0;
	Args* arg = (Args*)param;
	//come consumatore
	while(true){
		EnterCriticalSection(&Lock3);
		while(arg->in.vuota() && stop == false){
			 SleepConditionVariableCS(&NonVuoto3,&Lock3,INFINITE);
			}
		if(stop==TRUE){
			LeaveCriticalSection(&Lock3);
			break;
			}	
		arg->in.try_pop(temp1b);
		erode(temp1b,temp2b,cv::Mat());
		dilate(temp2b,temp2c,Mat());
		arg->out->push(temp2c);
		add++;
		LeaveCriticalSection(&Lock3);
		WakeConditionVariable(&NonVuoto4);
		cout<<endl<<"erode ha fatto: "<<add<<endl;
		}
	 //fine	
	 cout<<endl<<"Thread (ERODE) terminato con "<<add<<" elaborazioni."<<endl;
	_endthread();
}

//contour funct
void contorno (void *param){
	Mat temp;
	int add=0;
	Args* arg = (Args*)param;
	//come consumatore
	while(true){
		EnterCriticalSection(&Lock4);
		while(arg->in.vuota() && stop == false){
			 SleepConditionVariableCS(&NonVuoto4,&Lock4,INFINITE);
			}
		if(stop==TRUE){
			LeaveCriticalSection(&Lock4);
			break;
			}	
	//esegue pop
	arg->in.try_pop(temp);
	//trova i contorni
	vector<vector<Point>> contorni;
	findContours(temp,contorni,CV_RETR_LIST, CV_CHAIN_APPROX_SIMPLE);
	//disegna i contoni in un'immagine
	Mat dst(temp.size(), CV_8UC3, Scalar(0,0,0));
	Scalar colors[3];
	colors[0] = Scalar(255,0,0);
	colors[1] = Scalar(0,255,0);
	colors[2] = Scalar(0,0,255);
	for (size_t idx = 0; idx < contorni.size(); idx++){
		drawContours(dst,contorni,idx,colors[idx %3]);
		}

	//come produttore
	arg->out->push(dst);
	add++;
	cout<<endl<<"cont ha fatto: "<<add<<endl;
	LeaveCriticalSection(&Lock4);
	}
    cout<<endl<<"Thread (CONTOUR) terminato con "<<add<<" elaborazioni."<<endl;
   _endthread();
}

//main
int main()
{
	
	coda_concorr<cv::Mat> ingresso;
	coda_concorr<cv::Mat> uscita;

	InitializeConditionVariable(&NonVuoto1);
	InitializeConditionVariable(&NonVuoto2);
	InitializeConditionVariable(&NonVuoto3);
	InitializeConditionVariable(&NonVuoto4);
	InitializeCriticalSection(&Lock1);
	InitializeCriticalSection(&Lock2);
	InitializeCriticalSection(&Lock3);
	InitializeCriticalSection(&Lock4);
	

	LARGE_INTEGER count1, count2, freq;
	double elapsed;
	

	Mat temp[10];
	Mat out;
	
	//dichiarazione code
	Args dati0,dati1,dati2,dati3,dati4;
	
	
	//avvio contatori
	QueryPerformanceFrequency(&freq);	
	QueryPerformanceCounter (&count1);
		
	for(int i=0;i<10;i++){
		temp[i] = imread("C:/OPENCV/Test/imgtest/bird1.jpg",1);
		ingresso.push(temp[i]);
	}

	//next queue pointer
	dati0.in=ingresso;
	dati0.out=&dati1.in;
	dati1.out=&dati2.in;
	dati2.out=&dati3.in;
	dati3.out=&dati4.in;
	dati4.out=&uscita;

	

	//handle
	HANDLE handle0,handle1,handle2,handle3,handle4;

	//start threads
	handle0 = (HANDLE) _beginthread(puts,0,&dati0);
	handle1 = (HANDLE) _beginthread(grey,0,&dati1);
	handle2 = (HANDLE) _beginthread(soglia,0,&dati2);
	handle3 = (HANDLE) _beginthread(finitura,0,&dati3);
	handle4 = (HANDLE) _beginthread(contorno,0,&dati4);

	cout<<endl<<"..Join dei threads..."<<endl;

	//join
	WaitForSingleObject(handle0,INFINITE);
	WaitForSingleObject(handle1,INFINITE);
	WaitForSingleObject(handle2,INFINITE);
	WaitForSingleObject(handle3,INFINITE);
	WaitForSingleObject(handle4,INFINITE);



	//chiusura contatori
	QueryPerformanceCounter (&count2);

	CloseHandle(handle0);
	CloseHandle(handle1);
	CloseHandle(handle2);
	CloseHandle(handle3);
	CloseHandle(handle4);

	elapsed = (count2.QuadPart - count1.QuadPart) * 1000.0 / freq.QuadPart;
	
	
	cout <<endl<<"Tempo di esecuzione approssimativo: " <<elapsed<<" ms."<<endl;
    	system("PAUSE");
	return 0;
        }



The concurrency don't work: This program is multiple producer/multiple consumer, because every threads is both a producer and consumer (consumer for a queue, producer for another queue)...in my opinion the problem is not in the queue code, but in the while cicle of the threads...but I don't know how fix this problem...introducing another check? How?

Please help me...I'm desperate...
Posted
Updated 3-Jul-13 6:56am
v6
Comments
Sergey Alexandrovich Kryukov 18-Jun-13 12:09pm    
I did not scan all your code. Instead of mutex, use lightweight Critical Section.

You did not provide enough information on your real problem though? What did you expect, exactly? what's "different"?...

—SA
Domus1919 18-Jun-13 16:43pm    
As I wrote at the end of the post, if I launch multiple times the program, sometimes every thread do all works (10 elaborations) and sometimes don't (first thread do all, second and the rest don't)...in my opinion it's a problem releated to the "while" cicle in the threads, but how can I fix it???
joshrduncan2012 18-Jun-13 16:23pm    
Please reply to SA's comment instead of creating a new thread, otherwise he won't see it. Click on "reply" next to his name instead of "Have a Question or Comment?" down below.

Thanks!
Domus1919 19-Jun-13 10:16am    
Nobody can help me??

1 solution

I'm having a hard time both understanding your code as well as the problem description. I think you are unhappy that you have a multithreaded app and many times when you run it, all the work is being done by only one thread.

Keep in mind that thread creation is a resource intensive activity, and if the work to be done is fairly brief, all of the work can be completed by one of the threads before the remaining threads are created. In other words, all of the test code in your program can be effectively completed following the first beginthread() call above.

To benchmark this kind of activity, you should:
- create all threads first (before starting the work in any of them)
- make the activity/activities longer in duration
- run the loop(s) multiple times and discard (or discount the value of) the first several runs.
 
Share this answer
 
Comments
Domus1919 20-Jun-13 3:30am    
You're right about the "simple" code, but this is just the little edition of a program that elaborates a video streaming in this mode. Now, I'll understand the rules about threads, but I discover that this problem it's related to "while" cycles in the threads...the problem born there. I need an help about how can I do that situation: every thread have to check queue and pop if it's possibile to processing image, or in case queue is empty, wait and re-check..in the real situation I don't know the exact number of images (because it's a streaming) and then I need to manage that "check" cycle to obtain the situation of an assembly line, where every thread it's consumer for a queue, producer for other queue...You can help me? Show me how?
H.Brydon 20-Jun-13 23:15pm    
There are a lot of possibilities but one thing that is glaring to me is that you have consecutive beginthread() calls in the main control code. It is very reasonable on a 1 or 2 processor machine for example for the first beginthread() call to kick off a thread, which then instantly becomes computable and does all the work before the second beginthread() call is processed. Remember that beginthread() is very resource intensive and a lot of work is done before the API returns.
Domus1919 21-Jun-13 3:13am    
And how can resolve? Not consecutive beginthread()? Sorry for my insistence, but I can not understand how to do it and then comes the desperation ..thank you for the time you're devoting to help me ... and I hope to solve ... can you tell me how to set the thing in your opinion?
H.Brydon 21-Jun-13 18:15pm    
Rewrite the thread logic so that all threads start, with nothing to do. After the last thread is created, make the "task list" available.
Domus1919 3-Jul-13 12:34pm    
How make task list available?

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

  Print Answers RSS
Top Experts
Last 24hrsThis month


CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900