Click here to Skip to main content
15,896,359 members
Articles / Programming Languages / C#

A Controllable Notifying Thread Queue with Generics

Rate me:
Please Sign up or sign in to vote.
4.53/5 (10 votes)
3 Aug 2006CPOL8 min read 61.5K   602   44  
An alternative to the System.Threading.ThreadPool. A controllable queue to execute operations asynchronously.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace NotifyingThreadQueueDemo
{
    public enum QueueState { Idle, Running, Pauseing, Paused, Stopping };
    public delegate void QueueStateChangedHandler(QueueState qs);
    public delegate void ThreadFinishedHandler<T>(T finisheditem);
    public delegate void ThreadErrorHandler<T>(T unfinisheditem, Exception ex);
    public delegate void QueueOperationHandler<T>(T item);

    public class NotifyingThreadQueue<T>
    {
        #region my events
        /// <summary>
        /// signals when the start of the queue changes
        /// </summary>
        public event QueueStateChangedHandler QueueStateChanged;
        /// <summary>
        /// signals when the current operation is finished processing
        /// </summary>
        public event ThreadFinishedHandler<T> ThreadFinished;
        /// <summary>
        /// signals when the current operation had an error OR when an 
        /// enqueued item was removed from the queue prior to processing
        /// </summary>
        public event ThreadErrorHandler<T> ThreadError;
        #endregion

        #region my vars
        private object syncobj;
        private int maxthreads;
        private int currentthreads;
        private QueueState qs;
        private Queue<KeyValuePair<T,QueueOperationHandler<T>>> queue;
        private QueueOperationHandler<T> defaultop;
        #endregion

        #region constructors
        /// <summary>
        /// constructs the NotifyingThreadQueue. sets the state to QueueState.Idle
        /// </summary>
        /// <param name="defaultoperation">the default operation to perform on an enqueued item</param>
        /// <exception cref="System.ArgumentNullException">defaultoperation is null</exception>
        public NotifyingThreadQueue(QueueOperationHandler<T> defaultoperation)
            : this(int.MaxValue, defaultoperation)
        {
        }
        /// <summary>
        /// constructs the NotifyingThreadQueue. sets the state to QueueState.Idle
        /// </summary>
        /// <param name="maxthreads">sets the maximum number of simultaneous operations</param>
        /// <param name="defaultoperation">the default operation to perform on an enqueued item</param>
        /// <exception cref="System.ArgumentException">maxthreads is less than or equal to 0</exception>
        /// <exception cref="System.ArgumentNullException">defaultoperation is null</exception>
        public NotifyingThreadQueue(int maxthreads, QueueOperationHandler<T> defaultoperation)
        {
            if (maxthreads <= 0)
                throw new ArgumentException("maxthreads can not be <= 0");
            if (defaultoperation == null)
                throw new ArgumentNullException("defaultoperation can not be null");

            this.qs = QueueState.Idle;
            this.syncobj = new object();
            this.currentthreads = 0;
            this.maxthreads = maxthreads;
            this.queue = new Queue<KeyValuePair<T, QueueOperationHandler<T>>>();
            this.defaultop = defaultoperation;
        }
        #endregion

        #region control ops
        /// <summary>
        /// pauses the execution of future operations. the current operations are allowed to finish.
        /// </summary>
        public void Pause()
        {
            lock (syncobj)
            {
                if (qs == QueueState.Idle)
                {
                    /// this is a judgment call if you pause this when you 
                    /// don�t have any elements in it then you can go directly 
                    /// to paused and this means that you basically want to 
                    /// keep queuing until something happens                    
                    qs = QueueState.Paused;
                    QueueStateChangedInternal(qs); 
                }
                else if (qs == QueueState.Running)
                {
                    qs = QueueState.Pauseing;
                    QueueStateChangedInternal(qs);

                    /// running means you had some active threads so you couldn�t 
                    /// get to paused right away
                }
                else if (qs == QueueState.Stopping)
                {
                    ThreadErrorInternal(default(T), new ThreadStateException("Once the queue is stopping  its done processing"));
                }
                /// if we are already paused or pausing we dont need to do anything


            }
        }
        /// <summary>
        /// stops the execution of future operations. clears out all pending operations. 
        /// no further operations are allowed to be enqueued. the current operations are 
        /// allowed to finish.
        /// </summary>
        public void Stop()
        {
            lock (syncobj)
            {
                if ((qs == QueueState.Idle) || (qs == QueueState.Stopping))
                {
                    /// do nothing idle has nothing to stop and stopping  
                    /// is already working on stopping 
                    return;
                }
                else if (qs == QueueState.Paused)
                {
                    qs = QueueState.Stopping;
                    QueueStateChangedInternal(qs);

                    /// if we are already paused then we have no threads running 
                    /// so just drop all the extra items in the queue
                    while (queue.Count != 0)
                        ThreadErrorInternal(queue.Dequeue().Key, new ThreadStateException("the Queue is stopping . no processing done"));

                    /// ensure proper event flow paused-> stopping -> idle
                    qs = QueueState.Idle;
                    QueueStateChangedInternal(qs);
                }
                else
                {
                    qs = QueueState.Stopping;
                    QueueStateChangedInternal(qs);

                    /// why are we not dequeuing everything? that�s b/c if we have threads 
                    /// left they have to finish in their own good time so they can go 
                    /// through the process of getting rid of all the others. both ways work
                    if (currentthreads == 0)
                    {
                        qs = QueueState.Idle;
                        QueueStateChangedInternal(qs);
                    }
                }
            }
        }
        /// <summary>
        /// continues the execution of enqueued operations after a pause.
        /// </summary>
        public void Cotinue()
        {
            lock (syncobj)
            {
                if ((qs == QueueState.Pauseing) || (qs == QueueState.Paused))
                {
                    qs = QueueState.Running;
                    QueueStateChangedInternal(qs);

                    while (currentthreads < maxthreads)
                        TryNewThread();
                }
                else if ((qs == QueueState.Idle)||(qs == QueueState.Running))
                {
                    /// Continuing to process while the queue is idle is meaning 
                    /// less just ignore the command
                    return;
                }
                else if (qs == QueueState.Stopping)
                {
                    ThreadErrorInternal(default(T), new ThreadStateException("Once the queue is stopping  its done processing"));
                }
            }
        }
        #endregion

        #region data axcessors
        /// <summary>
        /// gets the current QueueState
        /// </summary>
        public QueueState QueueState
        {
            get
            {
                lock (syncobj)
                {
                    return qs;
                }
            }
        }
        /// <summary>
        /// gets the maximum number of operations that can be executed at once
        /// </summary>
        public int MaxThreads
        {
            get
            {
                return maxthreads;
            }
        }
        /// <summary>
        /// gets the current number of current ongoing operations
        /// </summary>
        public int CurrentRunningThreads
        {
            get
            {
                lock (syncobj)
                {
                    return currentthreads;
                }

            }
        }
        #endregion

        #region enque ops
        /// <summary>
        /// Adds the item to the queue to process asynchronously.
        /// </summary>
        /// <param name="item">the item to enqueue</param>
        public void EnQueue(T item)
        {
            EnQueue(item, defaultop);
        }
        /// <summary>
        /// Adds the item to the queue to process asynchronously and 
        /// uses the different operation instead  of the default.
        /// </summary>
        /// <param name="item">the item to enqueue</param>
        /// <param name="opp">the new operation that overrides the default</param>
        /// <exception cref="System.ArgumentNullException">opp is null</exception>
        public void EnQueue(T item, QueueOperationHandler<T> opp)
        {
            if (opp == null)
                throw new ArgumentNullException("operation can not be null");

            lock (syncobj)
            {
                if (qs == QueueState.Idle)
                {
                    #region idle
                    qs = QueueState.Running;
                    QueueStateChangedInternal(qs);

                    /// the problem with generics is that sometimes the fully 
                    /// descriptive name goes on for a while
                    KeyValuePair<T, QueueOperationHandler<T>> kvp = new KeyValuePair<T, QueueOperationHandler<T>>(item, opp);
                    
                    /// thread demands that its ParameterizedThreadStart take an object not a generic type
                    /// one might have resonably thought that there would be a generic constructor that 
                    /// took a strongly typed value but there is not one
                    currentthreads++;
                    new Thread(new ParameterizedThreadStart(RunOpp)).Start(kvp);

                    #endregion
                }
                else if ((qs == QueueState.Paused)||(qs == QueueState.Pauseing))
                {
                    #region pause
                    /// in the case that we are pausing or currently paused we just add the value to the
                    /// queue we dont try to run the process 
                    queue.Enqueue(new KeyValuePair<T,QueueOperationHandler<T>>(item,opp));                    
                    #endregion
                }
                else if (qs == QueueState.Running)
                {
                    #region running
                    /// you have to enqueue the item then try to execute the first item in the process
                    /// always enqueue first as this ensures that you get the oldest item first since 
                    /// that is what you wanted to do you did not want a stack
                    queue.Enqueue(new KeyValuePair<T, QueueOperationHandler<T>>(item, opp));
                    TryNewThread();
                    #endregion
                }
                else if (qs == QueueState.Stopping)
                {
                    #region stopping
                    /// when you are stopping the queue i assume that you wanted to stop it not pause it this 
                    /// means that if you try to enqueue something it will throw an exception since you 
                    /// shouldnt be enqueueing anything since when the queue gets done all its current 
                    /// threads it clears the rest out so why bother enqueueing it. at this point we have 
                    /// a choice we can make the notifyer die or we can use the error event we already 
                    /// have built in to tell the sender. i chose the later. also try to pick an appropriate 
                    /// exception not just the base
                    ThreadErrorInternal(item, new ThreadStateException("the Queue is stopping . no processing done"));
                    #endregion
                }
            }
        }
        
        #region tools
        private void RunOpp(object o)
        {
            KeyValuePair<T, QueueOperationHandler<T>> kvp = (KeyValuePair<T, QueueOperationHandler<T>>)o;

            try
            {
                kvp.Value(kvp.Key);
                ThreadFinishedInternal(kvp.Key);
            }
            catch (Exception ex)
            {
                ThreadErrorInternal(kvp.Key, new ThreadStateException("error processing. partial processing done.", ex));
            }
            finally
            {
                lock (syncobj)
                {
                    currentthreads--;
                }
                TryNewThread();            
            }
        }
        private void TryNewThread()
        {
            lock (syncobj)
            {
                if (qs == QueueState.Running)
                {
                    #region Running
                    if (queue.Count != 0)
                    {
                        if (currentthreads < maxthreads)
                        {
                            currentthreads++;
                            new Thread(new ParameterizedThreadStart(RunOpp)).Start(queue.Dequeue());
                        }
                    }
                    else
                    {
                        if (currentthreads == 0)
                        {
                            qs = QueueState.Idle;
                            QueueStateChangedInternal(qs);
                        }
                    } 
                    #endregion
                }
                else if (qs == QueueState.Stopping)
                {
                    #region stopping 
                    /// normally when we stop a queue we can just clear out the remaining 
                    /// values and let the threads peter out. however, we made the decision 
                    /// to throw an exception by way of our exception handler. it is therefore 
                    /// important to keep with that and get rid of all the queue items in 
                    /// that same way
                    while (queue.Count != 0)
                        ThreadErrorInternal(queue.Dequeue().Key, new ThreadStateException("the Queue is stopping . no processing done"));
                    
                    /// all threads come through here so its up to us to single the change 
                    /// from stopping to idle
                    if (currentthreads == 0)
                    {
                        qs = QueueState.Idle;
                        QueueStateChangedInternal(qs);
                    }
                    #endregion
                }
                else if (qs == QueueState.Pauseing)
                {
                    #region Pauseing
                    if (currentthreads == 0)
                    {
                        qs = QueueState.Paused;
                        QueueStateChangedInternal(qs);
                    } 
                    #endregion
                }
                else
                {
                    #region Idle / Paused
                    /// there should be no way to got in here while your idle or paused
                    /// this is just an error check
                    ThreadErrorInternal(default(T), new Exception("internal state bad"));
                    #endregion
                }
            }
        }
        #endregion
        #endregion

        #region event fowarders
        /// during testing i found out that it is likely that the events will be 
        /// updating controls and that this means that if care is not taken then 
        /// the events throw exceptions this is probably not desirable so i decided 
        /// to fix this by just dumping these errors for you silently. this is a 
        /// double edged sword though. if things are not appearing in your controls 
        /// this might be the culprit
        private void QueueStateChangedInternal(QueueState qs)
        {
            try
            {
                if (QueueStateChanged != null)
                    QueueStateChanged(qs);
            }
            catch (InvalidOperationException ex)
            {
                if (ex.Message.StartsWith("Cross-thread operation not valid"))
                    Console.WriteLine(ex.Message);
                else
                    throw ex;
            }
        }
        private void ThreadFinishedInternal(T finisheditem)
        {
            try
            {              
                if (ThreadFinished != null)
                    ThreadFinished(finisheditem);
            }
            catch (InvalidOperationException ex)
            {
                if (ex.Message.StartsWith("Cross-thread operation not valid"))
                    Console.WriteLine(ex.Message);
                else
                    throw ex;
            }
        }
        private void ThreadErrorInternal(T unfinisheditem, Exception ex)
        {
            try
            {
                if (ThreadError != null)
                    ThreadError(unfinisheditem, ex);
            }
            catch (InvalidOperationException ex2)
            {
                if (ex.Message.StartsWith("Cross-thread operation not valid"))
                    Console.WriteLine(ex2.Message);
                else
                    throw ex2;
            }
        } 
        #endregion
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Web Developer
United States United States
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions