Click here to Skip to main content
15,886,199 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
Hello
I have wrote a C# code which calls EmguCV methods to extract feature points of images.
As i want to increase speed of program, i made it multithreaded.
I used a variable named NUMBER_OF_CONCURRENT_THREADS to keep number of concurrent threads which can be done simultaneously. for example, in a system with a cpu that have 4 cores, i set this number to be 3(one main(controller) thread and 3 extra ones). I also used a semaphore to sync. these threads.

My problem is deadlock which occurred non-deterministically when i run program.

my code is as follows:

C#
const int NUMBER_OF_CONCURRENT_THREADS = 3;//other than main thread

Semaphore findDesctriptorSemaphore = new Semaphore(0, 10000);//empty at first

public void FindDesctriptors(int imageCount/*1-based images*/)
{
   if (imageCount <= 0)
      return;


   ParameterizedThreadStart pts = new ParameterizedThreadStart(FindDesctriptor);

   int q = 0;
   for (; q < imageCount; q++)
   {
      if (q >= (NUMBER_OF_CONCURRENT_THREADS))
         findDesctriptorSemaphore.WaitOne();

      //Parallel computing
      Thread t = new Thread(pts);
      t.Name = "desc " + (q + 1).ToString();
      t.Priority = ThreadPriority.Highest;
      t.Start(q + 1);
   }


   if (imageCount > NUMBER_OF_CONCURRENT_THREADS)
      //wait for all thread to complete
      for (int i = 0; i < NUMBER_OF_CONCURRENT_THREADS; i++)
         findDesctriptorSemaphore.WaitOne();
   else
      //wait for all thread to complete
      for (int i = 0; i < q; i++)
         findDesctriptorSemaphore.WaitOne();
}


void FindDesctriptor(object param/*1-based*/)
{
   int id = (int)param;

   Image<Gray, Byte> img = new Image<Gray, Byte>(id.ToString() + ".jpg");

   SURFFeature[] features = Extract_SURF_Features(img)/*a function that only calls emgucv methods to extract feature points*/;

   descs[id - 1] = features;

   findDesctriptorSemaphore.Release();//once

}


Can anyone help me, please?
Posted

I would use two ResetEvent's it seems the semaphore is what is causing you problems. The switching is causing the deadlock and there is nothing you can do unless you add some signals to the logic.

An easier approach than using signals to fix this is to utilize two ResetEvents to provide the statefullness you require.

I actually have also successfully implemented code which does not use the ResetEvents but does use the ThreadPool and locking and that also seems to work...

Take my WebServer class for example.

Requests are received on a single ListenHandler:
ListenHandler.aspx[View Source]

The ListenHandler then brokers the requests either using the ThreadPool or the current thread intelligently.

The requests are brokered to the WebServer class:
WebServer.cs[View Source]

At that point a HttpContextAdapter class is utilized to make various types of HttpRequests ubiquitous across the implementation.
HttpContextAdapter.cs[View Source]

Once there is an adapter created a WebClient is created to represent the client and the request on the WebServer.

WebClient.cs[View Source]

Clients basically interact with the browser and the following JavaScript API is utilized to allow the actions to be sent to the server and thus back to other clients if required.

WebServerAPI.js[View Source]

WebMouse is one of my favorites because it allows multiple WebClients (users) to share mouse interactions on a single WebPage or even an application across the web using the WebServer.

WebMouse.cs[View Source]

I even have WebChat's and WebShare's which hold WebObject's. The WebMouse class is derived from WebObject and is usually stored in a WebShare.

If you need any further assistance please disambiguate your question and we will do our best to help!
 
Share this answer
 
Comments
Sandeep Mewara 28-Apr-12 11:53am    
My 5!
I had nothing to do so I wrote a quick example to help you utilize the AsyncResult pattern as well as locking.

C#
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Collections;

namespace CodeProjectHelpProject
{
    //An example program
    class Program
    {
        /// <summary>
        /// A small class to derive from exception to ensure filtering is managable.
        /// </summary>
        class OperatingException : Exception
        {
            public OperatingException(string ex) : base(ex) { }
            public OperatingException(string ex, Exception innerEx) : base(ex, innerEx) { }
        }

        /// <summary>
        /// A small class to utilize during a program which will assist in multi threading
        /// </summary>
        class OperationManager
        {
            /// <summary>
            /// The history of operation results sorted by date and time
            /// </summary>
            public readonly SortedDictionary<datetime,> ResultHistory = new SortedDictionary<datetime,>();

            /// <summary>
            /// The currently executing operation
            /// </summary>
            public OperationResult CurrentOperation { get; private set; }

            /// <summary>
            /// Begins the process of executing a new operation on this instance of the OperationManager
            /// </summary>
            /// <param name="newOperation">The class which is passes for state observation and added to the ResultHistory upon any action</param>
            /// <param name="cancelExisting">A signal which indicates if there is a operation already executing to cancel the operation</param>
            /// <param name="resumePoint">A WaitCallback which may be used to give signal to external code</param>
            public void Begin(OperationResult newOperation, bool cancelExisting = false, WaitCallback resumePoint = null)
            {
                //If there is a CurrentOperation and there is no operation which is expected to be canceled indicate so by throwing an OperatingException.
                if (CurrentOperation != null && !cancelExisting) throw new OperatingException("The Operation Manager does not support multiple operations and there is currently an operation pending");
                else
                {
                    //Obtain a lock around this OperationManager
                    lock (this)
                    {
                        //Lock the CurrentOperation
                        lock (CurrentOperation)
                        {
                            //If there is a value assigned to the CurrentOperation and it is not completed
                            if (CurrentOperation != null && !CurrentOperation.Signal)
                            {
                                //Cancel the CurrentOperation
                                CurrentOperation.Cancel();                                
                            }
                        }

                        //Add the result to history
                        AddHistoryResult(CurrentOperation);

                        //Assign the newOperation to the CurrentOperation
                        CurrentOperation = newOperation;

                        //Create a scope to enclose the exposed logic
                        try
                        {
                            //Begin the CurrentOperation
                            CurrentOperation.Begin();
                        }
                        catch
                        {
                            //On exception of the resumePoint is not null
                            if (resumePoint != null)
                            {
                                //Add the history result
                                AddHistoryResult(newOperation);

                                //Invoke it passing the CurrentOperation
                                resumePoint.Invoke(CurrentOperation);

                                //Set the CurrentOperation to nil.
                                CurrentOperation = null;
                            }
                        }
                    }
                }
            }

            private void AddHistoryResult(OperationResult what, DateTime? when = null)
            {
                //If there is no what to add return
                if (what == null) return;
                try
                {
                    //Guard when to it's value or DateTime.UtcNow
                    when = when ?? DateTime.UtcNow;
                    //Add the result to the history statefully
                    IDictionary securePointer = ResultHistory;
                    lock (securePointer.SyncRoot)
                    {
                        //Attempt to add what at when
                        ResultHistory.Add(when.Value, what);
                    }
                    //Remove the reference
                    securePointer = null;
                }
                catch
                {
                    //Catch and propagate the exception
                    throw;
                }
            }

        }

        /// <summary>
        /// A small class which is useful for storing operating results on.
        /// Currently it only contains the Signal Member which is used in this example to display completeness of the operation in question.
        /// </summary>
        sealed class OperationResult : IAsyncResult
        {
            /// <summary>
            /// The manager in which this result was created for
            /// </summary>
            private OperationManager manager;

            public OperationResult(OperationManager manager)
            {
                // TODO: Complete member initialization
                this.manager = manager;
            }
            public bool Signal { get; set; }

            /// <summary>
            /// Do something here if desired
            /// </summary>
            internal void Cancel()
            {
                lock (this)
                {
                    this.Signal = true;
                }
            }

            /// <summary>
            /// Called when the Operation Begins
            /// </summary>
            internal void Begin()
            {
                lock (this)
                {
                    if (this.Signal) return;
                    else
                    {
                        try
                        {
                            //Call another method or override in a derived class etc.
                            new Thread(() =>
                            {
                                //Do work in here and add to this object statefully
                                this.Signal = true;
                                return;
                            }).Start();
                        }
                        catch(Exception ex)
                        {
                            //Pass to the propagation mechanism
                            PropagateException(ex);
                        }
                    }
                }
            }

            /// <summary>
            /// Does the needful thing
            /// </summary>
            /// <param name="ex">The exception</param>
            private void PropagateException(Exception ex)
            {
                throw new OperatingException("An exception occured", ex);
            }

            public object AsyncState
            {
                get { throw new NotImplementedException(); }
            }

            public WaitHandle AsyncWaitHandle
            {
                get { throw new NotImplementedException(); }
            }

            public bool CompletedSynchronously
            {
                get { throw new NotImplementedException(); }
            }

            public bool IsCompleted
            {
                get { return !Signal; }
            }
        }

        /// <summary>
        /// Logic for cancelation will occur in this function.
        /// This function may be called from inside any function and the invocation of this function should indicate the current action is being canceled.
        /// </summary>
        /// <param name="parameter"></param>
        static void OnCancel(object parameter)
        {
            //Whatever you need
        }

        static void AfterProcessing(IAsyncResult result)
        {
            //Checks and sh*t
        }

        static void Main(string[] args)
        {
            //Create an OperationManager for this static execution
            OperationManager manager = new OperationManager();

            //We create a callback which can be invoked from inside the logic, the logic points to the OnCancel Method.
            WaitCallback resumePoint = new WaitCallback(OnCancel);
            
            //We create a result to indicate when the operation has completed
            var result = new OperationResult(manager);

            //Begin the operation
            manager.Begin(result, false, resumePoint);

            //Save the old thread priority
            ThreadPriority oldPriority = Thread.CurrentThread.Priority; 

            //While there is no signal
            while (result.Signal)
            {
                //Reduce priority
                Thread.CurrentThread.Priority = ThreadPriority.Lowest;
                //Do a big wait which is different depending on the year
                Thread.SpinWait((int)DateTime.Now.Ticks / DateTime.UtcNow.Year);
            }

            //Set the oldPriority to the currentThread
            Thread.CurrentThread.Priority = oldPriority;

            //Do work after processing
            AfterProcessing(result);

            //An example of this could work without the OperationManager
            //resumePoint.BeginInvoke(resumePoint, new AsyncCallback(AfterProcessing), result);

            //It would utilize the ThreadPool
            //ThreadPool.QueueUserWorkItem(resumePoint);

        }
    }
}


Let me know how you like it!

It is not 100% complete but it should provide you a suitable base to construct a properly synchronized design pattern.

For instance one thing I would do is utilize the ThreadPool rather than Threads due to the intrinsic memory overhead of creating threads however your external code may require thread local storage and I can see how this may be beneficial due to how old code using fast calls are executed.

Another would be to move the resumePoint to the OperationResult and then invocate it from the Begin or any other Method when required. Right now there is separation which I have purposefully done because I want you to learn a thing or two as you work with the code.

There is a lot being shown in the example and if you have further questions you know exactly where to ask them!

Sincerely,
V//
 
Share this answer
 
v5
Comments
Member 8299125 28-Apr-12 14:18pm    
Thank you for your attention !!!


but i want to know my problem's reasons.
I use semaphores to store release signals , so using this structure of coding is not error prone.

Any further answers are appreciated.

-----------------------------------------------
Note: Excuse me if my English writing is so bad. :(
The problem I can immediately see is that you are allowing multiple threads to invoke the same library. While I am not familiar with the library in question I would say that when you release the semaphore the unman-aged code is still executing in other threads. If the library is utilizing shared local memory then this is possibly what is causing you problems.

Another thing I see is that when the semaphore is released as previously stated there other threads are still executing withing their scope (managed and unmangaged performing cleanup), this causes memory leaks if not watched carefully and on top of that causes the runtime to mangle objects which are being marshaled across application boundaries due to destructor invocation.

I would utilize a class and pass the class by reference "ref" this will ensure that the local legacy code can make changes to the object just like it would in native execution.

I would implement IDisposable and I would utilize IAsyncResult as given in the example.

Without you giving us more information relevant to the errors you are receiving and possibly giving us more code to run and see the results all I can do at this time is vaguely speculate.

One immediate solution I can pose which would definitely help you is to break the file down into several more manageable chunks.

Use a thread per chunk and interleave the extra data per chunk to a new chunk which can be run before post processing.

E.g.

Create Byte[] From File.

Split Byte[] into chunks.

Let Remainder = new List<byte[]>();

Setup storage(e.g. Dictionary or List)

Invoke library.

Check Remainder slots for extra Byte[] data left over from each fragment, sort and run process again.

Finally check storage for results.

Users can then actually peek into storage during operation and not interfere with the search operation.

v//
 
Share this answer
 
v3

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900