Click here to Skip to main content
15,891,372 members
Articles / Programming Languages / C#

Dealing with Progressive Operations

Rate me:
Please Sign up or sign in to vote.
4.92/5 (26 votes)
10 May 2010CPOL30 min read 38.3K   319   60  
Through a clean OOP solution to deal with progressive operations, I will implicitly show you how OOP principles can work together to make a full, clean solution.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using Erik.Utilities.Interfaces;

namespace Erik.Utilities.Threading
{
    public sealed class AbortableThreadPool : 
        IAbortable, IDisposable
    {
        #region Static part

        /// <summary>
        /// No more than 25 threads will be running simultaneously, and this
        /// value does not depend on the number of pools you create. For example,
        /// if you create three pools, the sum of executing threads in the three
        /// pools will never be greater than MaxSimultaneousThreads value
        /// </summary>
        const int MaxSimultaneousThreads = 25;
        const int TimeoutMilliseconds = 10;

        static Semaphore _semaphore;
        static List<AbortableThreadPool> _instances;

        static TimeSpan _timeout;

        static AbortableThreadPool()
        {
            _semaphore = new Semaphore(MaxSimultaneousThreads, MaxSimultaneousThreads);
            _instances = new List<AbortableThreadPool>();

            _timeout = TimeSpan.FromMilliseconds(TimeoutMilliseconds);
        }

        public static AbortableThreadPool NewInstance()
        {
            return NewInstance(MaxSimultaneousThreads);
        }

        public static AbortableThreadPool NewInstance(int concurrentThreads)
        {
            if (concurrentThreads > MaxSimultaneousThreads || concurrentThreads < 1)
                throw new ArgumentException();

            AbortableThreadPool pool = new AbortableThreadPool();
            pool._maxThreadsRunning = concurrentThreads;

            lock(_instances)
                _instances.Add(pool);

            return pool;
        }

        public static void AbortAll()
        {
            AbortableThreadPool[] array = new AbortableThreadPool[_instances.Count];

            lock (_instances)
                _instances.CopyTo(array);

            foreach (AbortableThreadPool pool in array)
                pool.Abort();
        }

        #endregion

        #region Instance part

        object _lock;

        int _maxThreadsRunning;
        Dictionary<int, Thread> _runningThreads;
        Queue<ThreadStart> _waitingCallbacks;

        System.Timers.Timer _tmr;
        bool _isDisposed = false;

        private AbortableThreadPool() 
        {
            _runningThreads = new Dictionary<int, Thread>();
            _waitingCallbacks = new Queue<ThreadStart>();

            _lock = new object();

            _tmr = new System.Timers.Timer(_timeout.TotalMilliseconds);
            _tmr.Elapsed += new System.Timers.ElapsedEventHandler(_tmr_Elapsed);

            _tmr.Start();
        }

        ~AbortableThreadPool()
        {
            lock (_lock)
            {
                if (_runningThreads.Count != 0 || _waitingCallbacks.Count != 0)
                    Abort();

                _isDisposed = true;
                _tmr.Dispose();
            }
        }

        public void AddNewOperation(ThreadStart callback)
        {
            CheckDisposed();

            lock (_lock)
                _waitingCallbacks.Enqueue(callback);
        }

        void _tmr_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
        {
            lock (_lock)
            {
                _tmr.Stop();

                // Check for waiting callbacks
                if (_waitingCallbacks.Count != 0)
                {
                    if (_runningThreads.Count < _maxThreadsRunning)
                    {
                        // I can run a new callback if _semaphore is free
                        if (_semaphore.WaitOne(_timeout))
                        {
                            ThreadStart callback = _waitingCallbacks.Dequeue();

                            Thread th = new Thread(
                                new ParameterizedThreadStart(RunThread));
                            th.IsBackground = true;

                            _runningThreads.Add(th.ManagedThreadId, th);

                            th.Start(callback);
                        }
                    }
                }

                if (!_isDisposed)
                    _tmr.Start();
            }
        }

        void RunThread(object obj)
        {
            ThreadStart callback = obj as ThreadStart;

            // Just make the callback
            callback();

            // When callback is done, just remove the thread
            lock (_lock)
                _runningThreads.Remove(Thread.CurrentThread.ManagedThreadId);

            // And signal the semaphore
            _semaphore.Release(1);
        }

        void CheckDisposed()
        {
            lock (_lock)
            {
                if (_isDisposed)
                    throw new ObjectDisposedException("AbortableThreadPool");
            }
        }

        #region IAbortable members

        public event EventHandler Aborted;

        public void Abort()
        {
            CheckDisposed();

            lock (_lock)
            {
                // Clear the waiting calbacks queue
                _waitingCallbacks.Clear();

                // And abort all of the running threads
                foreach (Thread th in _runningThreads.Values)
                {
                    // Abort the thread
                    th.Abort();
                    // And signal the semaphore
                    _semaphore.Release(1);
                }

                _runningThreads.Clear();

                // Dispose the pool
                Dispose(false);

                // Raise Aborted event
                if (Aborted != null)
                    Aborted(this, EventArgs.Empty);

                // Clean Aborted event
                Aborted = null;
            }
        }
        #endregion

        #endregion

        #region IDisposable Members

        void Dispose(bool cleanAbortedEvent)
        {
            lock (_lock)
            {
                if (!_isDisposed)
                {
                    if (_waitingCallbacks.Count != 0 || _runningThreads.Count != 0)
                        throw new InvalidOperationException("The thread pool is still running");

                    GC.SuppressFinalize(this);

                    _tmr.Dispose();
                    _isDisposed = true;

                    lock (_instances)
                        _instances.Remove(this);

                    if (cleanAbortedEvent)
                        Aborted = null;
                }
            }
        }

        public void Dispose()
        {
            Dispose(true);
        }

        #endregion
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Technical Lead
Spain Spain
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions