Click here to Skip to main content
Click here to Skip to main content

Extended Thread Pool

, 6 Apr 2013 Ms-PL
Rate this:
Please Sign up or sign in to vote.
Your own extensible and configurable Thread Pool.

 

Introduction  

Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:

  • A Thread Pool should be extensible and configurable
  • A Thread Pool should be as simple as possible

So, I've created an Extended Thread Pool. This Thread Pool is written using C# 4.0.

Extended Thread Pool Features

  • Really simple 
  • Extensible queues
  • Extensible Task Items 
  • Limit on the maximum number of working threads
  • Dynamic thread workers
  • Task priority support 
  • Extensible logging

Extended Thread Pool Design

  • ITaskItem represents a task
  • ITaskQueue represents the task queue logic 
  • ITaskQueueController represents the communication logic between the consumer and the producer (thread safe)
  • WorkThread represents a thread worker
  • ExtendedThreadPool controls work threads

Let's take a look at each class more deeply:

  • The ITaskItem represents some work that should be done.
    public interface ITaskItem
    {
        void DoWork();
    } 
  • TaskItemPriority - WorkThread priority, can be specified for each task.
  • The ITaskQueue is another simple interface that manages the task queue.
    /// <summary>
    /// Represent the queue.
    /// </summary>
    public interface ITaskQueue
    {
        /// <summary>
        /// Count of work item.
        /// </summary>
        int Count { get; }

        /// <summary>
        /// Dequeue the work item.
        /// </summary>
        /// <returns>The work item.</returns>
        IWorkItem Dequeue();

        /// <summary>
        /// Enqueue the work item.
        /// </summary>
        /// <param name="item">The work item.</param>
        void Enqueue(IWorkItem item);
    }  
  • ITaskQueueController provides communication logic between the consumer and the producer.
    public interface ITaskQueueController : IDisposable
    {
        int ConsumersWaiting { get; }
        IWorkItem Dequeue();
        void Enqueue(IWorkItem item);
    } 

I've implemented two task queue controllers derived from ITaskQueueController:

  • DefaultTaskQueueController
  • BoundedTaskQueueController

Default Task Queue Controller

DefaultTaskQueueController is a thread safe wrapper for the ITaskQueue:

    public sealed class DefaultTaskQueueController : TaskQueueController
    {
        public DefaultTaskQueueController(ITaskQueue taskQueue)
            : base(taskQueue)
        {
        }

        protected override IWorkItem DequeueCore()
        {
            lock (_locker)
            {
                while (_taskQueue.Count == 0 && !_isDisposed)
                {
                    _consumersWaiting++;
                    Monitor.Wait(_locker);
                    _consumersWaiting--;
                }
                if (_isDisposed)
                {
                    return null;
                }
                return _taskQueue.Dequeue();
            }
        }

        protected override void EnqueueCore(IWorkItem item)
        {
            lock (_locker)
            {
                _taskQueue.Enqueue(item);
                if (_consumersWaiting > 0)
                {
                    Monitor.PulseAll(_locker);
                }
            }
        }
    } 

Bounded Task Queue Controller

BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.

    public sealed class BoundedTaskQueueController : TaskQueueController
    {
        private readonly int _maxTasksCount;
        private int _producersWaiting;

        public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
            : base(taskQueue)
        {
            if (maxTasksCount < 1)
            {
                throw new ArgumentException("MaxTasksCount should be greater 0");
            }
            _maxTasksCount = maxTasksCount;
        }

        protected override IWorkItem DequeueCore()
        {
            IWorkItem taskItem;
            lock (_locker)
            {
                while (_taskQueue.Count == 0 && !_isDisposed)
                {
                    _consumersWaiting++;
                    Monitor.Wait(_locker);
                    _consumersWaiting--;
                }
                if (_isDisposed)
                {
                    return null;
                }
                taskItem = _taskQueue.Dequeue();
                if (_producersWaiting > 0)
                {
                    Monitor.PulseAll(_locker);
                }
            }
            return taskItem;
        }

        protected override void EnqueueCore(IWorkItem item)
        {
            lock (_locker)
            {
                while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDisposed)
                {
                    _producersWaiting++;
                    Monitor.Wait(_locker);
                    _producersWaiting--;
                }
                _taskQueue.Enqueue(item);
                if (_consumersWaiting > 0)
                {
                    Monitor.PulseAll(_locker);
                }
            }
        }
    } 

Extended Thread Pool

The ExtendedThreadPool manages the ITaskQueueController. The  ExtendedThreadPool is created thru the Builder pattern, for exapmle 

        var threadPool = new ExtendedThreadPool.Builder
                        {
                            Name = "My ThreadPool",
                            MaxThreads = 10,
                            MultiThreadingCapacityType = MultiThreadingCapacityType.Global
                        }.Build();

All  ExtendedThreadPool's properties has default value, so the simplest way to create ExtendedThreadPool  just call Build method. 

        var threadPool = new ExtendedThreadPool.Builder()
                                                .Build(); 

MultiThreadingCapacityType represent threading capacity

    public enum MultiThreadingCapacityType
    {
        /// <summary>
        /// Represent all processors
        /// </summary>
        Global,
        /// <summary>
        /// Represent one processor
        /// </summary>
        PerProcessor
    }

The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaiting = 0, a new WorkThread will be created.  

You can add a task with a TaskItemPriority. Please note, DefaultTaskQueue does not vallidate TaskItemPriority use PriorityTaskQueue for priority tasks

        /// <summary>
        /// Add new task.
        /// </summary>
        /// <param name="taskItem">Represent task.</param>
        /// <param name="priority">Task priority.</param>
        /// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
        public void AddTask(ITaskItem taskItem, TaskItemPriority priority = TaskItemPriority.Normal)
        {
            IWorkItem workItem = WorkItem.FromTaskItem(taskItem, priority);
            AddWorkItem(workItem);
        }

        /// <summary>
        /// Add new task as <see cref="Action" />.
        /// </summary>
        /// <param name="action">Represent task.</param>
        /// <param name="priority">Task priority.</param>
        /// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
        public void AddTask(Action action, TaskItemPriority priority = TaskItemPriority.Normal)
        {
            IWorkItem workItem = WorkItem.FromAction(action, priority);
            AddWorkItem(workItem);
        } 

Work Thread

The WorkThread class executes a task item and provides logging.

        private void DoWork()
        {
            while (_isRun)
            {
                try
                {
                    IWorkItem workItem = _taskQueueController.Dequeue();
                    if (workItem == null)
                    {
                        continue;
                    }
                    ProcessItem(workItem);
                }
                catch (Exception ex)
                {
                    _log.Error(ex);
                }
            }
        } 

Thread Pool Extensibility 

In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well.  

Example

We create ExtendedThreadPool with default settings. The SampleTask is derived by ITaskItem, please see  details below. 

    internal sealed class Program
    {
        private static readonly ILog _log = LogManager.GetLogger(typeof(Program));
        private static IExtendedThreadPool _threadPool;

        private static void Main()
        {
            XmlConfigurator.Configure();
            _threadPool = new ExtendedThreadPool.Builder().Build();
            AddTasks();
            Console.ReadKey();
        }

        private static void AddTasks()
        {
            for (int taskIndex = 0; taskIndex < 100; taskIndex++)
            {
                _threadPool.AddTask(new SampleTask(taskIndex));
            }
        }

        private sealed class SampleTask : ITaskItem
        {
            private readonly int _taskIndex;

            public SampleTask(int taskIndex)
            {
                _taskIndex = taskIndex;
            }

            public void DoWork()
            {
                Thread.Sleep(1000);
                _log.InfoFormat("Task {0} has been finished", _taskIndex);
            }
        }
    } 

History

  • 29 Jun 2008: Initial version.
  • 02 Jul 2008
    • + TransactionalMsmqTaskItem, DefaultTaskQueue.
    • * TaskQueueController classes added disposable support.
    • * WorkThread updated stop logic.
    • * Sample projects: 
      • CoreDefaultMsmqSample uses Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue (see the App.config file for more details).
      • CoreDefaultSample uses CoreDefaultSample.TaskQueue (see the App.config file for more details).
  • 10 Jul 2008
    • Added Mike.Strobel's suggestion.
    • + ActionTaskItem.
    • * ExtendedThreadPool added AddTask(Action action) and AddTask(Action action, ThreadPriority priority) methods.
    • Added more tests
  • 02 Oct 2009
    • + PriorityTaskQueue.
    • + StatisticController.
    • * ExtendedThreadPool added support StatisticController, MultiThreadingCapacityType.
    • Added support Unity 1.2
    • Added CorePrioritySample. The project uses Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue (see the App.config file for more details).
    • Added more tests 
  • 07 Apr 2013 
    •  ExtendedThreadPool  v2
    •  Unity has been removed
    •  ExtendedThreadPool creation has been simplified 

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)

Share

About the Author

Sergey Morenko
Software Developer (Senior)
Russian Federation Russian Federation
B.Sc. in Computer Science.
Follow on   Twitter   LinkedIn

Comments and Discussions

 
GeneralThanks Pinmemberjackrabbits22-Aug-08 14:21 
GeneralRe: Thanks PinmemberGSerjo23-Aug-08 7:46 
Thanks for the kind words Smile | :)
GeneralKudos and a Suggestion Pinmembermike.strobel9-Jul-08 8:50 
GeneralRe: Kudos and a Suggestion PinmemberGSerjo9-Jul-08 9:07 
GeneralRe: Kudos and a Suggestion [modified] PinmemberGSerjo9-Jul-08 9:54 
GeneralRe: Kudos and a Suggestion Pinmembermike.strobel9-Jul-08 12:57 
GeneralInterlocks and kudos PinmemberRoss Korsky2-Jul-08 18:37 
GeneralRe: Interlocks and kudos PinmemberGSerjo3-Jul-08 0:33 
GeneralRe: Interlocks and kudos PinmemberRoss Korsky3-Jul-08 7:39 
GeneralRe: Interlocks and kudos PinmemberGSerjo3-Jul-08 8:56 
GeneralNeed clarifications PinmemberPIEBALDconsult30-Jun-08 6:07 
AnswerRe: Need clarifications PinmemberGSerjo30-Jun-08 8:01 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web04 | 2.8.141015.1 | Last Updated 6 Apr 2013
Article Copyright 2008 by Sergey Morenko
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid