Click here to Skip to main content
15,888,803 members
Articles / Programming Languages / C#

A DelegateQueue Class

Rate me:
Please Sign up or sign in to vote.
4.81/5 (44 votes)
13 Mar 2007CPOL14 min read 234.8K   1.8K   219  
An implementation of the ISynchronizeInvoke interface.
#region License

/* Copyright (c) 2006 Leslie Sanford
 * 
 * Permission is hereby granted, free of charge, to any person obtaining a copy 
 * of this software and associated documentation files (the "Software"), to 
 * deal in the Software without restriction, including without limitation the 
 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 
 * sell copies of the Software, and to permit persons to whom the Software is 
 * furnished to do so, subject to the following conditions:
 * 
 * The above copyright notice and this permission notice shall be included in 
 * all copies or substantial portions of the Software. 
 * 
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 
 * THE SOFTWARE.
 */

#endregion

#region Contact

/*
 * Leslie Sanford
 * Email: jabberdabber@hotmail.com
 */

#endregion

using System;
using System.Collections;
using System.ComponentModel;
using System.Diagnostics;
using System.Threading;
using Sanford.Collections.Generic;

namespace Sanford.Threading
{
    /// <summary>
    /// Represents an asynchronous queue of delegates.
    /// </summary>
    public partial class DelegateQueue : SynchronizationContext, IComponent, ISynchronizeInvoke
    {
        #region DelegateQueue Members

        #region Fields

        // The thread for processing delegates.
        private Thread delegateThread;

        // The deque for holding delegates.
        private Deque<DelegateQueueAsyncResult> delegateDeque = new Deque<DelegateQueueAsyncResult>();

        // The object to use for locking.
        private readonly object lockObject = new object();

        // The synchronization context in which this DelegateQueue was created.
        private SynchronizationContext context;

        // Inidicates whether the delegate queue has been disposed.
        private volatile bool disposed = false;

        // Thread ID counter for all DelegateQueues.
        private volatile static uint threadID = 0;

        private ISite site = null;

        #endregion

        #region Events

        /// <summary>
        /// Occurs after a method has been invoked as a result of a call to 
        /// the BeginInvoke or BeginInvokePriority methods.
        /// </summary>
        public event EventHandler<InvokeCompletedEventArgs> InvokeCompleted;

        /// <summary>
        /// Occurs after a method has been invoked as a result of a call to
        /// the Post and PostPriority methods.
        /// </summary>
        public event EventHandler<PostCompletedEventArgs> PostCompleted;

        #endregion

        #region Construction

        /// <summary>
        /// Initializes a new instance of the DelegateQueue class.
        /// </summary>
        public DelegateQueue()
        {
            InitializeDelegateQueue();

            if(SynchronizationContext.Current == null)
            {
                context = new SynchronizationContext();
            }
            else
            {
                context = SynchronizationContext.Current;
            }
        }

        /// <summary>
        /// Initializes a new instance of the DelegateQueue class with the specified IContainer object.
        /// </summary>
        /// <param name="container">
        /// The IContainer to which the DelegateQueue will add itself.
        /// </param>
        public DelegateQueue(IContainer container)
        {
            ///
            /// Required for Windows.Forms Class Composition Designer support
            ///
            container.Add(this);

            InitializeDelegateQueue();
        }

        ~DelegateQueue()
        {
            Dispose(false);
        }

        // Initializes the DelegateQueue.
        private void InitializeDelegateQueue()
        {
            // Create thread for processing delegates.
            delegateThread = new Thread(DelegateProcedure);

            lock(lockObject)
            {
                // Increment to next thread ID.
                threadID++;

                // Create name for thread.
                delegateThread.Name = "Delegate Queue Thread: " + threadID.ToString();

                // Start thread.
                delegateThread.Start();

                Debug.WriteLine(delegateThread.Name + " Started.");

                // Wait for signal from thread that it is running.
                Monitor.Wait(lockObject);
            }
        }

        #endregion

        #region Methods

        protected virtual void Dispose(bool disposing)
        {
            if(disposing)
            {
                lock(lockObject)
                {
                    disposed = true;

                    Monitor.Pulse(lockObject);

                    GC.SuppressFinalize(this);
                }
            }
        }

        /// <summary>
        /// Executes the delegate on the main thread that this object executes on.
        /// </summary>
        /// <param name="method">
        /// A Delegate to a method that takes parameters of the same number and 
        /// type that are contained in args. 
        /// </param>
        /// <param name="args">
        /// An array of type Object to pass as arguments to the given method. 
        /// </param>
        /// <returns>
        /// An IAsyncResult interface that represents the asynchronous operation 
        /// started by calling this method.
        /// </returns>
        /// <remarks>
        /// The delegate is placed at the beginning of the queue. Its invocation
        /// takes priority over delegates already in the queue. 
        /// </remarks>
        public IAsyncResult BeginInvokePriority(Delegate method, params object[] args)
        {
            #region Require

            if(disposed)
            {
                throw new ObjectDisposedException("DelegateQueue");
            }
            else if(method == null)
            {
                throw new ArgumentNullException();
            }

            #endregion

            DelegateQueueAsyncResult result;

            // If BeginInvokePriority was called from a different thread than the one
            // in which the DelegateQueue is running.
            if(InvokeRequired)
            {
                result = new DelegateQueueAsyncResult(this, method, args, false, NotificationType.BeginInvokeCompleted);

                lock(lockObject)
                {
                    // Put the method at the front of the queue.
                    delegateDeque.PushFront(result);

                    Monitor.Pulse(lockObject);
                }
            }
            // Else BeginInvokePriority was called from the same thread in which the 
            // DelegateQueue is running.
            else
            {
                result = new DelegateQueueAsyncResult(this, method, args, true, NotificationType.None);

                // The method is invoked here instead of placing it in the 
                // queue. The reason for this is that if EndInvoke is called 
                // from the same thread in which the DelegateQueue is running and
                // the method has not been invoked, deadlock will occur. 
                result.Invoke();
            }

            return result;
        }

        /// <summary>
        /// Executes the delegate on the main thread that this object executes on.
        /// </summary>
        /// <param name="method">
        /// A Delegate to a method that takes parameters of the same number and 
        /// type that are contained in args. 
        /// </param>
        /// <param name="args">
        /// An array of type Object to pass as arguments to the given method. 
        /// </param>
        /// <returns>
        /// An IAsyncResult interface that represents the asynchronous operation 
        /// started by calling this method.
        /// </returns>
        /// <remarks>
        /// <para>
        /// The delegate is placed at the beginning of the queue. Its invocation
        /// takes priority over delegates already in the queue. 
        /// </para>
        /// <para>
        /// Unlike BeginInvoke, this method operates synchronously, that is, it 
        /// waits until the process completes before returning. Exceptions raised 
        /// during the call are propagated back to the caller.
        /// </para>
        /// </remarks>
        public object InvokePriority(Delegate method, params object[] args)
        {
            #region Require

            if(disposed)
            {
                throw new ObjectDisposedException("DelegateQueue");
            }
            else if(method == null)
            {
                throw new ArgumentNullException();
            }

            #endregion

            object returnValue = null;

            // If InvokePriority was called from a different thread than the one
            // in which the DelegateQueue is running.
            if(InvokeRequired)
            {
                DelegateQueueAsyncResult result = new DelegateQueueAsyncResult(this, method, args, false, NotificationType.None);

                lock(lockObject)
                {
                    // Put the method at the back of the queue.
                    delegateDeque.PushFront(result);

                    Monitor.Pulse(lockObject);
                }

                // Wait for the result of the method invocation.
                returnValue = EndInvoke(result);
            }
            // Else InvokePriority was called from the same thread in which the 
            // DelegateQueue is running.
            else
            {
                // Invoke the method here rather than placing it in the queue.
                returnValue = method.DynamicInvoke(args);
            }

            return returnValue;
        }

        /// <summary>
        /// Dispatches an asynchronous message to this synchronization context. 
        /// </summary>
        /// <param name="d">
        /// The SendOrPostCallback delegate to call.
        /// </param>
        /// <param name="state">
        /// The object passed to the delegate.
        /// </param>
        /// <remarks>
        /// The Post method starts an asynchronous request to post a message. 
        /// </remarks>
        public void PostPriority(SendOrPostCallback d, object state)
        {
            #region Require

            if(disposed)
            {
                throw new ObjectDisposedException("DelegateQueue");
            }
            else if(d == null)
            {
                throw new ArgumentNullException();
            }

            #endregion

            lock(lockObject)
            {
                DelegateQueueAsyncResult result = new DelegateQueueAsyncResult(this, d, new object[] { state }, false, NotificationType.PostCompleted);

                // Put the method at the front of the queue.
                delegateDeque.PushFront(result);

                Monitor.Pulse(lockObject);
            }
        }

        /// <summary>
        /// Dispatches an synchronous message to this synchronization context. 
        /// </summary>
        /// <param name="d">
        /// The SendOrPostCallback delegate to call.
        /// </param>
        /// <param name="state">
        /// The object passed to the delegate.
        /// </param>
        public void SendPriority(SendOrPostCallback d, object state)
        {
            InvokePriority(d, state);
        }       

        // Processes and invokes delegates.
        private void DelegateProcedure()
        {
            lock(lockObject)
            {
                // Signal the constructor that the thread is now running.
                Monitor.Pulse(lockObject);
            }

            // Set this DelegateQueue as the SynchronizationContext for this thread.
            SynchronizationContext.SetSynchronizationContext(this);

            // Placeholder for DelegateQueueAsyncResult objects.
            DelegateQueueAsyncResult result = null;

            // While the DelegateQueue has not been disposed.
            while(true)
            {
                // Critical section.
                lock(lockObject)
                {
                    // If the DelegateQueue has been disposed, break out of loop; we're done.
                    if(disposed)
                    {
                        break;
                    }

                    // If there are delegates waiting to be invoked.
                    if(delegateDeque.Count > 0)
                    {
                        result = delegateDeque.PopFront();
                    }
                    // Else there are no delegates waiting to be invoked.
                    else
                    {
                        // Wait for next delegate.
                        Monitor.Wait(lockObject);

                        // If the DelegateQueue has been disposed, break out of loop; we're done.
                        if(disposed)
                        {
                            break;
                        }

                        Debug.Assert(delegateDeque.Count > 0);

                        result = delegateDeque.PopFront();
                    }
                }

                Debug.Assert(result != null);

                // Invoke the delegate.
                result.Invoke();

                if(result.NotificationType == NotificationType.BeginInvokeCompleted)
                {
                    InvokeCompletedEventArgs e = new InvokeCompletedEventArgs(
                        result.Method,
                        result.GetArgs(),
                        result.ReturnValue,
                        result.Error);

                    OnInvokeCompleted(e);
                }
                else if(result.NotificationType == NotificationType.PostCompleted)
                {
                    object[] args = result.GetArgs();

                    Debug.Assert(args.Length == 1);
                    Debug.Assert(result.Method is SendOrPostCallback);

                    PostCompletedEventArgs e = new PostCompletedEventArgs(
                        (SendOrPostCallback)result.Method,
                        args[0],
                        result.Error);

                    OnPostCompleted(e);
                }
                else
                {
                    Debug.Assert(result.NotificationType == NotificationType.None);
                }
            }

            Debug.WriteLine(delegateThread.Name + " Finished");
        }
        
        // Raises the InvokeCompleted event.
        protected virtual void OnInvokeCompleted(InvokeCompletedEventArgs e)
        {
            EventHandler<InvokeCompletedEventArgs> handler = InvokeCompleted;

            if(handler != null)
            {
                context.Post(delegate(object state)
                {
                    handler(this, e);
                }, null);
            }
        }

        // Raises the PostCompleted event.
        protected virtual void OnPostCompleted(PostCompletedEventArgs e)
        {
            EventHandler<PostCompletedEventArgs> handler = PostCompleted;

            if(handler != null)
            {
                context.Post(delegate(object state)
                {
                    handler(this, e);
                }, null);
            }
        }

        // Raises the Disposed event.
        protected virtual void OnDisposed(EventArgs e)
        {
            EventHandler handler = Disposed;

            if(handler != null)
            {
                context.Post(delegate(object state)
                {
                    handler(this, e);
                }, null);
            }
        }

        #endregion        

        #endregion

        #region SynchronizationContext Overrides

        /// <summary>
        /// Dispatches a synchronous message to this synchronization context. 
        /// </summary>
        /// <param name="d">
        /// The SendOrPostCallback delegate to call.
        /// </param>
        /// <param name="state">
        /// The object passed to the delegate.
        /// </param>
        /// <remarks>
        /// The Send method starts an synchronous request to send a message. 
        /// </remarks>
        public override void Send(SendOrPostCallback d, object state)
        {
            Invoke(d, state);
        }

        /// <summary>
        /// Dispatches an asynchronous message to this synchronization context. 
        /// </summary>
        /// <param name="d">
        /// The SendOrPostCallback delegate to call.
        /// </param>
        /// <param name="state">
        /// The object passed to the delegate.
        /// </param>
        /// <remarks>
        /// The Post method starts an asynchronous request to post a message. 
        /// </remarks>
        public override void Post(SendOrPostCallback d, object state)
        {
            #region Require

            if(disposed)
            {
                throw new ObjectDisposedException("DelegateQueue");
            }
            else if(d == null)
            {
                throw new ArgumentNullException();
            }

            #endregion

            lock(lockObject)
            {
                delegateDeque.PushBack(new DelegateQueueAsyncResult(this, d, new object[] { state }, false, NotificationType.PostCompleted));

                Monitor.Pulse(lockObject);
            }
        }

        #endregion

        #region IComponent Members

        /// <summary>
        /// Represents the method that handles the Disposed delegate of a DelegateQueue.
        /// </summary>
        public event System.EventHandler Disposed;

        /// <summary>
        /// Gets or sets the ISite associated with the DelegateQueue.
        /// </summary>
        public ISite Site
        {
            get
            {
                return site;
            }
            set
            {
                site = value;
            }
        }

        #endregion

        #region ISynchronizeInvoke Members

        /// <summary>
        /// Executes the delegate on the main thread that this DelegateQueue executes on.
        /// </summary>
        /// <param name="method">
        /// A Delegate to a method that takes parameters of the same number and type that 
        /// are contained in args. 
        /// </param>
        /// <param name="args">
        /// An array of type Object to pass as arguments to the given method. This can be 
        /// a null reference (Nothing in Visual Basic) if no arguments are needed. 
        /// </param>
        /// <returns>
        /// An IAsyncResult interface that represents the asynchronous operation started 
        /// by calling this method.
        /// </returns>
        /// <remarks>
        /// <para>The delegate is called asynchronously, and this method returns immediately. 
        /// You can call this method from any thread. If you need the return value from a process 
        /// started with this method, call EndInvoke to get the value.</para>
        /// <para>If you need to call the delegate synchronously, use the Invoke method instead.</para>
        /// </remarks>
        public IAsyncResult BeginInvoke(Delegate method, params object[] args)
        {
            #region Require

            if(disposed)
            {
                throw new ObjectDisposedException("DelegateQueue");
            }
            else if(method == null)
            {
                throw new ArgumentNullException();
            }

            #endregion

            DelegateQueueAsyncResult result;

            if(InvokeRequired)
            {
                result = new DelegateQueueAsyncResult(this, method, args, false, NotificationType.BeginInvokeCompleted);

                lock(lockObject)
                {
                    delegateDeque.PushBack(result);

                    Monitor.Pulse(lockObject);
                }
            }
            else
            {
                result = new DelegateQueueAsyncResult(this, method, args, false, NotificationType.None);

                result.Invoke();
            }

            return result;
        }

        /// <summary>
        /// Waits until the process started by calling BeginInvoke completes, and then returns 
        /// the value generated by the process.
        /// </summary>
        /// <param name="result">
        /// An IAsyncResult interface that represents the asynchronous operation started 
        /// by calling BeginInvoke. 
        /// </param>
        /// <returns>
        /// An Object that represents the return value generated by the asynchronous operation.
        /// </returns>
        /// <remarks>
        /// This method gets the return value of the asynchronous operation represented by the 
        /// IAsyncResult passed by this interface. If the asynchronous operation has not completed, this method will wait until the result is available.
        /// </remarks>
        public object EndInvoke(IAsyncResult result)
        {
            #region Require

            if(disposed)
            {
                throw new ObjectDisposedException("DelegateQueue");
            }
            else if(!(result is DelegateQueueAsyncResult))
            {
                throw new ArgumentException();
            }
            else if(((DelegateQueueAsyncResult)result).Owner != this)
            {
                throw new ArgumentException();
            }

            #endregion

            result.AsyncWaitHandle.WaitOne();

            DelegateQueueAsyncResult r = (DelegateQueueAsyncResult)result;

            if(r.Error != null)
            {
                throw r.Error;
            }

            return r.ReturnValue;
        }

        /// <summary>
        /// Executes the delegate on the main thread that this DelegateQueue executes on.
        /// </summary>
        /// <param name="method">
        /// A Delegate that contains a method to call, in the context of the thread for the DelegateQueue.
        /// </param>
        /// <param name="args">
        /// An array of type Object that represents the arguments to pass to the given method.
        /// </param>
        /// <returns>
        /// An Object that represents the return value from the delegate being invoked, or a 
        /// null reference (Nothing in Visual Basic) if the delegate has no return value.
        /// </returns>
        /// <remarks>
        /// <para>Unlike BeginInvoke, this method operates synchronously, that is, it waits until 
        /// the process completes before returning. Exceptions raised during the call are propagated 
        /// back to the caller.</para>
        /// <para>Use this method when calling a method from a different thread to marshal the call 
        /// to the proper thread.</para>
        /// </remarks>
        public object Invoke(Delegate method, params object[] args)
        {
            #region Require

            if(disposed)
            {
                throw new ObjectDisposedException("DelegateQueue");
            }
            else if(method == null)
            {
                throw new ArgumentNullException();
            }

            #endregion

            object returnValue = null;

            if(InvokeRequired)
            {
                DelegateQueueAsyncResult result = new DelegateQueueAsyncResult(this, method, args, false, NotificationType.None);

                lock(lockObject)
                {
                    delegateDeque.PushBack(result);

                    Monitor.Pulse(lockObject);
                }

                returnValue = EndInvoke(result);
            }
            else
            {
                // Invoke the method here rather than placing it in the queue.
                returnValue = method.DynamicInvoke(args);
            }

            return returnValue;
        }

        /// <summary>
        /// Gets a value indicating whether the caller must call Invoke.
        /// </summary>
        /// <value>
        /// <b>true</b> if the caller must call Invoke; otherwise, <b>false</b>.
        /// </value>
        /// <remarks>
        /// This property determines whether the caller must call Invoke when making 
        /// method calls to this DelegateQueue. If you are calling a method from a different 
        /// thread, you must use the Invoke method to marshal the call to the proper thread.
        /// </remarks>
        public bool InvokeRequired
        {
            get
            {
                return Thread.CurrentThread.ManagedThreadId != delegateThread.ManagedThreadId;
            }
        }        

        #endregion

        #region IDisposable Members

        /// <summary>
        /// Disposes of the DelegateQueue.
        /// </summary>
        public void Dispose()
        {
            #region Guards

            if(disposed)
            {
                return;
            }

            #endregion

            Dispose(true);

            OnDisposed(EventArgs.Empty);
        }

        #endregion                
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
United States United States
Aside from dabbling in BASIC on his old Atari 1040ST years ago, Leslie's programming experience didn't really begin until he discovered the Internet in the late 90s. There he found a treasure trove of information about two of his favorite interests: MIDI and sound synthesis.

After spending a good deal of time calculating formulas he found on the Internet for creating new sounds by hand, he decided that an easier way would be to program the computer to do the work for him. This led him to learn C. He discovered that beyond using programming as a tool for synthesizing sound, he loved programming in and of itself.

Eventually he taught himself C++ and C#, and along the way he immersed himself in the ideas of object oriented programming. Like many of us, he gotten bitten by the design patterns bug and a copy of GOF is never far from his hands.

Now his primary interest is in creating a complete MIDI toolkit using the C# language. He hopes to create something that will become an indispensable tool for those wanting to write MIDI applications for the .NET framework.

Besides programming, his other interests are photography and playing his Les Paul guitars.

Comments and Discussions