Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

ConvergeWait

0.00/5 (No votes)
18 Jan 2010 1  
Wait for multiple threads to complete.

Introduction

This is the second article in my series. This time, I want to demonstrate something that I have found very useful. A thread-synchronization object that is very different than anything .NET provides.

Background

.NET offers all sorts of thread control objects. Semaphores, Mutexes, ReaderWriterLocks, CriticalSections, and other thread control objects have one thing in common: they limit the number of threads that can run at a single time. There is no object that allows you to wait for several threads to finish. About the closest you can get is to use Thread.Join like this:

private void Form1_Load(object sender, EventArgs e)
{
    List listOfThreads = new List();
    for (int i = 0; i < 10; i++)
    {
        Thread t = new Thread(ThreadFunc);
        t.Start();
        listOfThreads.Add(t);
    }
    foreach(Thread t in listOfThreads)
    {
        t.Join();
    }
    //when we get here ALL threads have finished.

}
private void ThreadFunc()
{
    Thread.Sleep(5000);
}

Of course, there are a few things to notice here:

  • This approach will simply not work when using ThreadPool threads since Thread.Join never returns from those threads.
  • It will try to join, even if the thread has exited.
  • It just looks messy.

Of course, you could use a ManualResetEvent, but you now have to keep track of how many threads are supposed to be running, decrement the value in a thread-safe way, and finally Set the event when the count reaches zero.

private const Int32 MaxThreads = 100;
private Int64 _runningThreads = MaxThreads;
private readonly ManualResetEvent _event = new ManualResetEvent(false);
private void Form1_Load(object sender, EventArgs e)
{
    for (int i = 0; i < MaxThreads; i++)
    {
        ThreadPool.QueueUserWorkItem(ThreadFunc, null);
    }
    _event.WaitOne();

}
private void ThreadFunc(object state)
{
    try
    {
        Thread.Sleep(5000);
    }
    finally
    {
        Int64 x = Interlocked.Decrement(ref _runningThreads);
        Debug.WriteLine(x + " threads still running");
        if (x == 0)
        {
            _event.Set();
        }
    }
}

The above code will indeed satisfy the requirements, but I prefer to make things more object oriented. Specifically, I want to:

  1. Encapsulate the values, to make sure the critical values can't be changed when it's not appropriate.
  2. Simplify the code required in the finally block, which means overriding the Set and Reset functions.
  3. Enforce everything I can.

To do this, my initial thought was to inherit the ManualResetEvent. However, it is a sealed class, so I decided to inherit the base class myself, which happens to be System.Threading.WaitHandle. This is the final code for what I call a ConvergeWait:

///
/// Class to wait syncronize a number of threads.
///
public class ConvergeWait : EventWaitHandle
{
    private Int32 _currentWaitingCount = 0;
    private Int32 _totalThreadsToWaitOn = 1;
    private bool _isReset = false;

    public ConvergeWait(Int32 numberOfThreads)
        : base(false, EventResetMode.ManualReset)
    {
        if (numberOfThreads < 1)
        {
            throw new ArgumentOutOfRangeException("numberOfThreads", 
                      "number of threads must be greater than 0");
        }
        _totalThreadsToWaitOn = numberOfThreads;
    }

    public bool IsReset
    {
        get { return _isReset; }
    }

    public bool Reset(Int32 numberOfThreads)
    {
        if (_isReset)
        {
            if (numberOfThreads != _totalThreadsToWaitOn)
            {
                throw new Exception("already in reset state, " + 
                          "you may not call 'Reset' again.");
            }
        }
        _isReset = true;
        _totalThreadsToWaitOn = numberOfThreads;
        Interlocked.Exchange(ref _currentWaitingCount, 0);
        return base.Reset();
    }

    public new bool Reset()
    {
        return Reset(_totalThreadsToWaitOn);
    }

    public new void Set()
    {
        if (!_isReset)
        {
            base.Set();
            _isReset = false;
            Interlocked.Exchange(ref _currentWaitingCount, _totalThreadsToWaitOn);
            return;
        }
        int i = Interlocked.Increment(ref _currentWaitingCount);
        Debug.WriteLine(string.Format("released thread count = {0}", i));
        if (i == _totalThreadsToWaitOn)
        {
            base.Set();
            _isReset = false;
        }
    }
}

Using the Code

This object is safer and cleaner to use. Since this example is so small, it doesn't really show it too well. Here is a working example.

private const Int32 MaxThreads = 100;
       
private void Form1_Load(object sender, EventArgs e)
{
    ConvergeWait convergeWait = new ConvergeWait(MaxThreads);
    
    for (int i = 0; i < MaxThreads; i++)
    {
        ThreadPool.QueueUserWorkItem(ThreadFunc, convergeWait);
    }
    convergeWait.WaitOne();

}
private void ThreadFunc(object state)
{
    ConvergeWait convergeWait = (ConvergeWait) state;
    try
    {
        Thread.Sleep(5000);
    }
    finally
    {
        convergeWait.Set();
    }
}

Points of Interest

One of the nice features about the ConvergeWait is that if the threaded function is somewhere other than the class where you are creating the threads, only the ConvergeWait has to be passed. If you tried to use the code from my first attempt, you would have to pass a lot more data, adding significant complexity.

History

None yet.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here