Click here to Skip to main content
15,886,799 members
Articles / Programming Languages / C#

The Super Pool Framework

Rate me:
Please Sign up or sign in to vote.
4.87/5 (53 votes)
31 Aug 2010CPOL26 min read 100.8K   1.5K   178  
The Super Pool is a framework for decoupled communication and management of components. The Super Pool introduces a natural asynchronous communication environment into your solution that can be fluently spread over different components, threads, processes, or even computers or networks.
// -----
// Copyright 2010 Deyan Timnev
// This file is part of the Matrix Platform (www.matrixplatform.com).
// The Matrix Platform is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, 
// either version 3 of the License, or (at your option) any later version. The Matrix Platform is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; 
// without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
// You should have received a copy of the GNU Lesser General Public License along with the Matrix Platform. If not, see http://www.gnu.org/licenses/lgpl.html
// -----
using System;
using System.Collections.Generic;
using System.Threading;
using System.Diagnostics;
using Matrix.Common.Core;

#if Matrix_Diagnostics
using Matrix.Common.Diagnostics;
#endif

namespace Matrix.Common.Extended.ThreadPools
{
    /// <summary>
    /// A very fast implementation of the thread pool.
    /// *WARNING* This class has been extremely optimized, line by line, so even very small changes can distrupt the fine
    /// threading model of execution. Upon doing any changes make sure to execute the Speed tests mutliple times, to be sure
    /// no damage has been dome.
    /// </summary>
    public class ThreadPoolFast : IDisposable
    {
        //const string DefaultThreadName = "ThreadPoolEx.Thread";

        /// <summary>
        /// Internal data storage class - for a running thread.
        /// </summary>
        class ThreadInfo
        {
            internal volatile int ThreadId;
            internal volatile Thread Thread;
            internal bool MustDispose = false;
            internal AutoResetEvent Event = new AutoResetEvent(false);
        }

        /// <summary>
        /// Internal data storage class - for a queued thread entity.
        /// </summary>
        public class TargetInfo
        {
            /// <summary>
            /// Constructor.
            /// </summary>
            public TargetInfo(string invokerName, object target, FastInvokeHelper.FastInvokeHandlerDelegate delegateInstance,
                              bool poolAsFirstParameter, ThreadPoolFast pool, params object[] args)
            {
                DelegateInstance = delegateInstance;
                Target = target;
                InvokerName = invokerName;

                if (poolAsFirstParameter)
                {
                    Args = new object[] { pool, args };
                }
                else
                {
                    Args = args;
                }

                //if (args != null && args.Length == 1)
                //{// Single parameter pass.
                //    Args = new object[] { pool, args[0] };
                //}
                //else
                //{
                //    Args = new object[] { pool, args };
                //}
            }

            public object Invoke()
            {
                return DelegateInstance(Target, Args);
            }

            readonly object Target;
            readonly string InvokerName;
            readonly FastInvokeHelper.FastInvokeHandlerDelegate DelegateInstance;
            readonly object[] Args;
        }

        #region Statitics

        int _totalThreadsAwakens = 0;
        /// <summary>
        /// 
        /// </summary>
        public int TotalThreadsAwakens
        {
            get { return _totalThreadsAwakens; }
            set { _totalThreadsAwakens = value; }
        }

        int _totalThreadsStarted = 0;
        /// <summary>
        /// 
        /// </summary>
        public int TotalThreadsStarted
        {
            get { return _totalThreadsStarted; }
        }

        #endregion

#if Matrix_Diagnostics
        InstanceMonitor _monitor;
        /// <summary>
        /// 
        /// </summary>
        public InstanceMonitor Monitor
        {
            get { return _monitor; }
        }
#endif

        volatile bool _running = true;
        protected bool IsRunning
        {
            get
            {
                return _running && ApplicationLifetimeHelper.ApplicationClosing == false;
            }
        }

        TimeSpan _threadIdle = TimeSpan.FromSeconds(25);
        /// <summary>
        /// How long a thread waits for new tasks before going away.
        /// </summary>
        public TimeSpan ThreadIdle
        {
            get { return _threadIdle; }
            set { _threadIdle = value; }
        }

        volatile string _name = string.Empty;
        /// <summary>
        /// Name of this thread pool.
        /// </summary>
        public string Name
        {
            get { return _name; }
        }

        /// <summary>
        /// Total threads (running, sleeping, suspended, etc.)
        /// </summary>
        volatile int _maximumTotalThreadsAllowed = 20;
        public int MaximumThreadsCount
        {
            get { return _maximumTotalThreadsAllowed; }
            set { _maximumTotalThreadsAllowed = value; }
        }

        volatile int _minimumThreadsCount = 1;
        /// <summary>
        /// How many threads should be kept always ready and 
        /// alive waiting for pending tasks to come.
        /// This will increase the thread count in general, but
        /// provide a faster responce when a task comes in.
        /// Typical speed values: 
        /// - new thread = 0.2ms
        /// - existing thread = 0.05ms
        /// </summary>
        public int MinimumThreadsCount
        {
            get { return _minimumThreadsCount; }
            set { _minimumThreadsCount = value; }
        }

        volatile ApartmentState _threadsApartmentState = ApartmentState.STA;
        /// <summary>
        /// The default ApartmentState to use for the threads.
        /// </summary>
        public ApartmentState ThreadsApartmentState
        {
            get { return _threadsApartmentState; }
            set { _threadsApartmentState = value; }
        }

        /// <summary>
        /// Number of thread slots available.
        /// </summary>
        public int ActiveRunningThreadsCount
        {
            get
            {
                return Math.Max(0, _threadsHotSwap.Count - _sleepingThreads.Count);
            }
        }

        public int SleepingThreadsCount
        {
            get
            {
                return _sleepingThreads.Count;
            }
        }

        /// <summary>
        /// Number of thread slots available.
        /// </summary>
        public int FreeThreadsCount
        {
            get
            {
                return Math.Max(0, _maximumTotalThreadsAllowed - ActiveRunningThreadsCount);
            }
        }

        public int QueuedItemsCount
        {
            get { return _queue.Count; }
        }

        int _finalDisposeTimeoutMilliseconds = 15000;

        /// <summary>
        /// Creating a new threads takes about 100-200ms, so this helps to prevent system from creation overload.
        /// </summary>
        long _minimumThreadCreationIntervalMilliseconds = 500;

        /// <summary>
        /// ManagedThreadNumber vs ThreadInfo.
        /// </summary>
        volatile Dictionary<int, ThreadInfo> _threadsHotSwap = new Dictionary<int, ThreadInfo>();

        Stack<ThreadInfo> _sleepingThreads = new Stack<ThreadInfo>();

        long _lastQueueItemProcessedMillisecond = 0;

        long _lastThreadCreatedMillisecond = 0;

        /// <summary>
        /// *Reserving* space for the list here, makes it VERY VERY MUCH *SLOWER*, since all the reserved items are moved, each
        /// time we do an insert or remove, SO DO NOT DO IT.
        /// </summary>
        protected Queue<TargetInfo> _queue = new Queue<TargetInfo>();

        protected AutoResetEvent _queueProcessEvent = new AutoResetEvent(false);

        Thread _queueProcessorThread;

        #region Instance Control

        /// <summary>
        /// Constructor.
        /// </summary>
        public ThreadPoolFast(string name)
        {
#if Matrix_Diagnostics
            _monitor = new InstanceMonitor(this);
#endif

            _name = name;
            ApplicationLifetimeHelper.ApplicationClosingEvent += new GeneralHelper.DefaultDelegate(GeneralHelper_ApplicationClosingEvent);

            _queueProcessorThread = new Thread(new ThreadStart(QueueProcessor));
            _queueProcessorThread.Name = name + ".QueueProcessor";
            _queueProcessorThread.Start();
        }

        /// <summary>
        /// Helper, performs common actions on stopping a still running thread.
        /// </summary>
        /// <param name="thread"></param>
        public static void StopThread(Thread thread, bool systemMonitorReport, int preInterruptTimeout, int preAbortTimeout)
        {
            if (thread == null)
            {
                return;
            }

            if (thread.ThreadState != System.Threading.ThreadState.Running
                && thread.ThreadState != System.Threading.ThreadState.WaitSleepJoin)
            {
                return;
            }

            if (preInterruptTimeout > 0)
            {
                Thread.Sleep(preInterruptTimeout);
            }

            if (thread.ThreadState != System.Threading.ThreadState.Running
                && thread.ThreadState != System.Threading.ThreadState.WaitSleepJoin)
            {
                return;
            }

            if (systemMonitorReport)
            {
#if Matrix_Diagnostics
                SystemMonitor.OperationWarning(string.Format("Interrupting  thread [{0}, {1}].", thread.ManagedThreadId, thread.Name));
#endif
            }

            // Will awaken, if asleep, or cause exception if goes to sleep.
            thread.Interrupt();
            if (preAbortTimeout > 0)
            {
                Thread.Sleep(preAbortTimeout);
            }

            if (thread.ThreadState != System.Threading.ThreadState.Running
                && thread.ThreadState != System.Threading.ThreadState.WaitSleepJoin)
            {
                return;
            }

            if (systemMonitorReport)
            {
#if Matrix_Diagnostics
                SystemMonitor.OperationWarning(string.Format("Aborting thread [{0}, {1}].", thread.ManagedThreadId, thread.Name));
#endif
            }

            thread.Abort();
        }
        
        public virtual void Dispose()
        {
            ApplicationLifetimeHelper.ApplicationClosingEvent -= new GeneralHelper.DefaultDelegate(GeneralHelper_ApplicationClosingEvent);
            Dispose(false);
        }

        /// <summary>
        /// Free all threads, both asleep, and those that do not wish to end peacefully.
        /// Also stop the queue processor only in case it is still running.
        /// </summary>
        protected void Dispose(bool disposeQueueProcessor)
        {
            _running = false;

            if (_sleepingThreads.Count > 0)
            {
                lock (_sleepingThreads)
                {
                    while (_sleepingThreads.Count > 0)
                    {// Wake up all sleeping threads and kill them.
                        ThreadInfo info = _sleepingThreads.Pop();
                        info.MustDispose = true;
                        info.Event.Set();
                    }
                }
            }

            Stopwatch disposeWatch = new Stopwatch();

            if (_threadsHotSwap.Count > 0)
            {
                lock (this)
                {
                    Dictionary<int, ThreadInfo> threadsHotSwap = new Dictionary<int, ThreadInfo>(_threadsHotSwap);
                    while (threadsHotSwap.Count > 0)
                    {
                        using (Dictionary<int, ThreadInfo>.ValueCollection.Enumerator enumerator = threadsHotSwap.Values.GetEnumerator())
                        {
                            if (enumerator.MoveNext() == false)
                            {
                                return;
                            }

                            Thread thread = enumerator.Current.Thread;
                            if (thread.ThreadState == System.Threading.ThreadState.Running
                                || thread.ThreadState == System.Threading.ThreadState.WaitSleepJoin)
                            {
                                // Some thread is still working, see if we can wait any further.
                                if (_finalDisposeTimeoutMilliseconds > disposeWatch.ElapsedMilliseconds)
                                {// Continue waiting for some more time.
                                    Thread.Sleep(500);
                                    continue;
                                }
                            }

                            StopThread(thread, false, 0, 500);
                            threadsHotSwap.Remove(thread.ManagedThreadId);
                        }
                    }

                    _threadsHotSwap = threadsHotSwap;

                    // Execute under the lock.
                    if (disposeQueueProcessor && _queueProcessorThread != null)
                    {
                        // Finally stop the queue processor in case it is still running.
                        StopThread(_queueProcessorThread, false, 500, 500);
                        _queueProcessorThread = null;
                    }
                }
            }
        }

        void GeneralHelper_ApplicationClosingEvent()
        {
            Dispose();

            //TracerHelper.TraceSimple(TracerItem.TypeEnum.Report, "Thread pool fast GeneralHelper_ApplicationClosingEvent");
        }

        #endregion

        #region Input

        /// <summary>
        /// Enqueue a target and Fast Invoke delegate instance for execution.
        /// *IMPORTANT* make sure to store the delegateInstance and reuse it over multiple calls!
        /// </summary>
        public void QueueFastDelegate(object target, FastInvokeHelper.FastInvokeHandlerDelegate delegateInstance,
                                      params object[] args)
        {
            QueueFastDelegate(target, false, delegateInstance, args);
        }

        /// <summary>
        /// Enqueue a target and Fast Invoke delegate instance for execution.
        /// *IMPORTANT* make sure to store the delegateInstance and reuse it over multiple calls!
        /// </summary>
        public void QueueFastDelegate(object target, bool poolAsFirstParameter, FastInvokeHelper.FastInvokeHandlerDelegate delegateInstance, 
                                      params object[] args)
        {
            ThreadPoolFastEx.TargetInfo targetInfo = new ThreadPoolFastEx.TargetInfo(string.Empty,
                                                                                     target, delegateInstance, poolAsFirstParameter, this, args);

            QueueTargetInfo(targetInfo);
        }

        /// <summary>
        /// Enqueue a fully assigned target info item for execution.
        /// </summary>
        protected void QueueTargetInfo(TargetInfo info)
        {
            if (IsRunning == false)
            {
                return;
            }

            lock (_queue)
            {
                _queue.Enqueue(info);
            }

            int activeRunningThreadsCount = ActiveRunningThreadsCount;
            bool notEnoughRunning = activeRunningThreadsCount < MinimumThreadsCount;
            if (activeRunningThreadsCount == 0 || notEnoughRunning)
            {
                _queueProcessEvent.Set();
            }
        }

        #endregion

        /// <summary>
        /// Routine running the queue processor thread.
        /// </summary>
        void QueueProcessor()
        {
            try
            {
                try
                {
                    while (IsRunning)
                    {
                        _queueProcessEvent.WaitOne(1);
                        ProcessThreads();
                    }
                }
                finally
                {
                    //lock (this)
                    //{// Make self null, since otherwise the dispose will try to shut us down
                    //    // while we are executing on it (shut ourselves).
                    //    _queueProcessorThread = null;
                    //}

                    // Dispose, since in cases where the ApplicationClosingEvent is not raised
                    // the pools threads will remain active.
                    Dispose(false);
                }

            }
            catch (Exception ex)
            {// Not much we can do here.
                string exMessage = GeneralHelper.GetExceptionMessage(ex);
                string t = Thread.CurrentThread.Name;
            }

        }

        /// <summary>
        /// Helper, process the items gathered in the execution queue.
        /// </summary>
        void ProcessThreads()
        {
            if (_queue.Count != 0)
            {
                int queueSize = _queue.Count;

                int awaken = 0;
                while (awaken < queueSize)
                {
                    if (AwakeSleepingThread() != null)
                    {
                        awaken++;
                        //break;
                    }
                    else
                    {// No more sleeping threads.
                        break;
                    }
                }

                if (awaken == 0)
                {// Running threads are below limit and nobody is sleeping, so run a new one.
                    CreateThread();
                }
            }
            else if (SleepingThreadsCount > 0 
                     && _threadIdle.TotalMilliseconds < ApplicationLifetimeHelper.ApplicationStopwatchMilliseconds - _lastQueueItemProcessedMillisecond
                     && _threadsHotSwap.Count > MinimumThreadsCount)
            {// Thread sleep timeout (execute only on timeout, and when combined threads count above minimum required).
                
                ThreadInfo info = null;
                lock (_sleepingThreads)
                {
                    if(_sleepingThreads.Count > 0)
                    {
                        info = _sleepingThreads.Pop();
                        info.MustDispose = true;
                        info.Event.Set();
                    }
                }

                if (info != null)
                {
                    RemoveThread(info.Thread);

                    // Finally awaken.
                    info.Event.Set();
                }
            }
        }

        /// <summary>
        /// 
        /// </summary>
        ThreadInfo AwakeSleepingThread()
        {
            if (_sleepingThreads.Count == 0)
            {
                return null;
            }

            //if (ActiveRunningThreadsCount >= MaximumTotalThreadsAllowed)
            //{
            //    return null;
            //}

            ThreadInfo threadInfo = null;
            lock (_sleepingThreads)
            {
                if (_sleepingThreads.Count > 0)
                {
                    threadInfo = _sleepingThreads.Pop();
                }
            }

            if (threadInfo == null)
            {
                return null;
            }

            // Wake up the thread so it can do some work.
            threadInfo.Event.Set();
            
            Interlocked.Increment(ref _totalThreadsAwakens);

            return threadInfo;
        }

        /// <summary>
        /// 
        /// </summary>
        ThreadInfo CreateThread()
        {
            if ((ApplicationLifetimeHelper.ApplicationStopwatchMilliseconds - _lastThreadCreatedMillisecond) < _minimumThreadCreationIntervalMilliseconds)
            {// Minimum inter thread creation time not met.
                return null;
            }

            if (IsRunning == false || _threadsHotSwap.Count >= MaximumThreadsCount)
            {
                return null;
            }

            _lastThreadCreatedMillisecond = ApplicationLifetimeHelper.ApplicationStopwatchMilliseconds;

            Thread newThread = new Thread(new ParameterizedThreadStart(ThreadExecute));
            newThread.SetApartmentState(_threadsApartmentState);
            newThread.Name = this._name + ".WorkerThread";

            ThreadInfo threadInfo;
            lock (this)
            {
                Dictionary<int, ThreadInfo> newThreads = new Dictionary<int, ThreadInfo>(_threadsHotSwap);
                threadInfo = new ThreadInfo() { Thread = newThread, ThreadId = newThread.ManagedThreadId };
                newThreads.Add(newThread.ManagedThreadId, threadInfo);

                // Hot Swap.
                _threadsHotSwap = newThreads;
            }

            Interlocked.Increment(ref _totalThreadsStarted);

            //newThread.Name = DefaultThreadName;
            newThread.Start(threadInfo);

            return threadInfo;
        }

        /// <summary>
        /// 
        /// </summary>
        bool RemoveThread(Thread thread)
        {
            bool result;
            lock (this)
            {
                Dictionary<int, ThreadInfo> newThreads = new Dictionary<int, ThreadInfo>(_threadsHotSwap);
                result = newThreads.Remove(thread.ManagedThreadId);

                if (result)
                {
                    // Hot Swap.
                    _threadsHotSwap = newThreads;
                }
            }

            return result;
        }

        /// <summary>
        /// 
        /// </summary>
        void ThreadExecute(object threadInfoParam)
        {
            ThreadInfo threadInfo = threadInfoParam as ThreadInfo;

            while (IsRunning)
            {
                TargetInfo targetInfo = null;
                if (_queue.Count != 0)
                {
                    lock (_queue)
                    {
                        if (_queue.Count > 0)
                        {
                            targetInfo = _queue.Dequeue();
                        }
                    }

                    Interlocked.Exchange(ref _lastQueueItemProcessedMillisecond, ApplicationLifetimeHelper.ApplicationStopwatchMilliseconds);
                }

                if (targetInfo == null)
                {
                    lock (_sleepingThreads)
                    {
                        // Keep this locked.
                        if (IsRunning == false)
                        {// Do not enter sleeping mode, if we are already stopped.
                            return;
                        }
                        _sleepingThreads.Push(threadInfo);
                    }

                    threadInfo.Event.WaitOne();

                    if (threadInfo.MustDispose)
                    {// Instructed to dispose.
                        return;
                    }
                }
                else
                {
                    try
                    {
                        object invokeResult = targetInfo.Invoke();
                    }
                    catch (Exception ex)
                    {
#if Matrix_Diagnostics
                        Monitor.OperationError(GeneralHelper.ProcessExceptionMessage("[" + _name + "] Thread executed caused an exception ", ex));
#endif
                    }
                }
            }
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Product Manager Ingenious Ltd, Bulgaria
Bulgaria Bulgaria
I worked for a few years as a C++/Win32 developer and software architect, and then moved on to the .NET environment where I was able to discover the beauty of managed programming.

I am currently involved in the development and management of Open Forex Platform (www.openforexplatform.com) and the Matrix Platform (www.matrixplatform.com).

Comments and Discussions