Click here to Skip to main content
Licence Ms-PL
First Posted 29 Jun 2008
Views 34,446
Downloads 564
Bookmarked 75 times

Extended Thread Pool

By | 2 Oct 2009 | Article
Your own extensible and configurable Thread Pool.

DefaultSample

CorePrioritySample.JPG

Introduction

Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:

  • A Thread Pool should be extensible and configurable
  • A Thread Pool should be as simple as possible

So, I've created an Extended Thread Pool. This Thread Pool is written using C# 3.0.

Extended Thread Pool Features

  • IoC support (I've used Unity1.2)
  • Extensible queues
  • Extensible Task Items
  • Limit on the maximum number of working threads
  • Dynamic thread workers
  • Task priority support
  • Thread Pool statistic support: TaskItemsQueued, TaskItemsStarted, ConsumersWaiting, MaxThreads
  • Extensible logging

Extended Thread Pool Design

  • ITaskItem represents a task
  • ITaskQueue represents the task queue logic
  • ITaskQueueController represents the communication logic between the consumer and the producer (thread safe)
  • WorkThread represents a thread worker
  • ExtendedThreadPool controls work threads

Let's take a look at each class more deeply:

  • The ITaskItem represents some work that should be done.
  • public interface ITaskItem
    {
        void DoWork();
    }
  • TaskItemPriority - WorkThread priority, can be specified for each task.
  • The ITaskQueue is another simple interface that manages the task queue.
  • public interface ITaskQueue
    {
        int Count { get; }
        void Enqueue(ITaskItem item);
        ITaskItem Dequeue();
    }
  • ITaskQueueController provides communication logic between the consumer and the producer.
  • public interface ITaskQueueController : IDisposable
    {
        int ConsumersWaiting { get; }
        void Enqueue(ITaskItem item);
        ITaskItem Dequeue();
    }

I've implemented two task queue controllers derived from ITaskQueueController:

  • DefaultTaskQueueController
  • BoundedTaskQueueController

Default Task Queue Controller

DefaultTaskQueueController is a thread safe wrapper for the ITaskQueue:

public class DefaultTaskQueueController : TaskQueueControllerBase
{
    public DefaultTaskQueueController(ITaskQueue taskQueue)
        : base(taskQueue)
    {
    }
 
    #region Overrides of TaskQueueControllerBase
 
    public override void Enqueue(ITaskItem item)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(item);
            if (_consumersWaiting > 0)
                Monitor.PulseAll(_locker);
        }
    }
 
    public override ITaskItem Dequeue()
    {
        ITaskItem taskItem;
        lock (_locker)
        {
            while (_taskQueue.Count == 0 && !_isDispose)
            {
                _consumersWaiting++;
                Monitor.Wait(_locker);
                _consumersWaiting--;
            }
            if (_isDispose)
                return null;
            taskItem = _taskQueue.Dequeue();
        }
        return taskItem;
    }
 
    #endregion
}

Bounded Task Queue Controller

BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.

public class BoundedTaskQueueController : TaskQueueControllerBase
{
    private readonly int _maxTasksCount;
    private int _producersWaiting;
 
    public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
        : base(taskQueue)
    {
        if (maxTasksCount < 1)
            throw new ArgumentException("MaxTasksCount should be greater 0");
        _maxTasksCount = maxTasksCount;
    }
 
    public override void Enqueue(ITaskItem item)
    {
        lock (_locker)
        {
            while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDispose)
            {
                _producersWaiting++;
                Monitor.Wait(_locker);
                _producersWaiting--;
            }
            _taskQueue.Enqueue(item);
            if (_consumersWaiting > 0)
                Monitor.PulseAll(_locker);
        }
    }
 
    public override ITaskItem Dequeue()
    {
        ITaskItem taskItem;
        lock (_locker)
        {
            while (_taskQueue.Count == 0 && !_isDispose)
            {
                _consumersWaiting++;
                Monitor.Wait(_locker);
                _consumersWaiting--;
            }
            if (_isDispose)
                return null;
            taskItem = _taskQueue.Dequeue();
            if (_producersWaiting > 0)
                Monitor.PulseAll(_locker);
        }
        return taskItem;
    }
}

Extended Thread Pool

The ExtendedThreadPool manages the ITaskQueueController.

	/// <summary>
        /// Create new Instance of ExtendedThreadPool
        /// </summary>
        /// <param name="minThreads">MinThreads per processor</param>
        /// <param name="maxThreads">MaxThreads per processor</param>
        public ExtendedThreadPool(int minThreads, int maxThreads)
            : this(minThreads, maxThreads, MultiThreadingCapacityType.PerProcessor)
        {
        }
 
        public ExtendedThreadPool(int minThreads, int maxThreads,
            MultiThreadingCapacityType multiThreadingCapacityType)
        {
            SetThreadingRange(minThreads, maxThreads, multiThreadingCapacityType);
            _statisticController = new StatisticController(_maxThreads);
        }

MultiThreadingCapacityTyperepresent threading capacity

    public enum MultiThreadingCapacityType
    {
        /// <summary>
        /// Represent all processors
        /// </summary>
        Global,
        /// <summary>
        /// Represent one processor
        /// </summary>
        PerProcessor
    }

The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaitin = 0, a new WorkThread will be created.

	/// <summary>
        /// Add taskItem with default TaskItemPriority.Normal"/>
        /// </summary>
        /// <param name="taskItem"></param>
        public void AddTask(ITaskItem taskItem)
        {
            AddWorkItem(new WorkItem(taskItem));
        }

You can add a task with a TaskItemPriority. Please note, DefaultTaskQueue does not vallidate TaskItemPriority use PriorityTaskQueue for priority tasks

	public void AddTask(ITaskItem taskItem, TaskItemPriority priority)
        {
            AddWorkItem(new WorkItem(taskItem, priority));
        }

Work Thread

The WorkThread class executes a task item and provides logging.

	public void Start()
        {
            while (_isRun)
            {
                WorkItem workItem = null;
                try
                {
                    workItem = _taskQueueController.Dequeue();
                    if (workItem.IsNull())
                        continue;
                    _statisticController.WorkItemStarted();
                    DoWork(workItem);
                }
                catch (Exception ex)
                {
                    _log.Error(ex.Message);
                }
                finally
                {
                    if (workItem.IsNotNull())
                        _statisticController.WorkItemCompleted();
                }
            }
        }

Thread Pool Extensibility

The ExtendedThreadPool supports IoC, ITaskQueueController is marked with DependencyAttribute.

[Dependency]
public ITaskQueueController TaskQueueController { private get; set; }

In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well. I've used Unity to configure the ExtendedThreadPool.

App.config

Example

In one of my projects, I had to use a transaction MSMQ with multi-threading. The SampleTask is derived by ITaskItem, please see the CoreDefaultMsmqSample project for more details.

public void DoWork()
{
    using (var transaction = new MessageQueueTransaction())
    {
        try
        {
            transaction.Begin();
            Message msmqMessage = 
              _queue.Receive(TimeSpan.FromMilliseconds(500), transaction);
            if (msmqMessage != null)
            {
                var message = (ExternalMessage) msmqMessage.Body;
                LogManager.GetLogger(GetType()).Info("Task has been " + 
                           "done, info {0}".FormatWith(message.Data));
                //Do work
                Thread.Sleep(1000);
            }
        }
        catch (Exception ex)
        {
            transaction.Abort();
            LogManager.GetLogger(GetType()).Error(ex.Message);
            return;
        }
        transaction.Commit();
    }
}

History

  • 29 Jun 2008: Initial version.
  • 02 Jul 2008
    • + TransactionalMsmqTaskItem, DefaultTaskQueue.
    • * TaskQueueController classes added disposable support.
    • * WorkThread updated stop logic.
    • * Sample projects:
      • CoreDefaultMsmqSample uses Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue (see the App.config file for more details).
      • CoreDefaultSample uses CoreDefaultSample.TaskQueue (see the App.config file for more details).
  • 10 Jul 2008
    • Added Mike.Strobel's suggestion.
    • + ActionTaskItem.
    • * ExtendedThreadPool added AddTask(Action action) and AddTask(Action action, ThreadPriority priority) methods.
    • Added more tests
  • 02 Oct 2009
    • + PriorityTaskQueue.
    • + StatisticController.
    • * ExtendedThreadPool added support StatisticController, MultiThreadingCapacityType.
    • Added support Unity 1.2
    • Added CorePrioritySample. The project uses Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue (see the App.config file for more details).
    • Added more tests

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)

About the Author

GSerjo

Architect

Russian Federation Russian Federation

Member

B.Sc. in Computer Science.

Sign Up to vote   Poor Excellent
Add a reason or comment to your vote: x
Votes of 3 or less require a comment

Comments and Discussions

 
You must Sign In to use this message board. (secure sign-in)
 
Search this forum  
 FAQ
    Noise  Layout  Per page   
  Refresh
GeneralSimple genius PinmemberGSFriend4:56 9 Sep '09  
GeneralRe: Simple genius PinmemberGSerjo5:24 9 Sep '09  
GeneralThanks Pinmemberjackrabbits14:21 22 Aug '08  
GeneralRe: Thanks PinmemberGSerjo7:46 23 Aug '08  
GeneralKudos and a Suggestion Pinmembermike.strobel8:50 9 Jul '08  
GeneralRe: Kudos and a Suggestion PinmemberGSerjo9:07 9 Jul '08  
GeneralRe: Kudos and a Suggestion [modified] PinmemberGSerjo9:54 9 Jul '08  
GeneralRe: Kudos and a Suggestion Pinmembermike.strobel12:57 9 Jul '08  
GeneralInterlocks and kudos PinmemberRoss Korsky18:37 2 Jul '08  
GeneralRe: Interlocks and kudos PinmemberGSerjo0:33 3 Jul '08  
GeneralRe: Interlocks and kudos PinmemberRoss Korsky7:39 3 Jul '08  
GeneralRe: Interlocks and kudos PinmemberGSerjo8:56 3 Jul '08  
GeneralNeed clarifications PinmemberPIEBALDconsult6:07 30 Jun '08  
AnswerRe: Need clarifications PinmemberGSerjo8:01 30 Jun '08  
Hi
 
I mean that thread pools: Smart Thread Pool, BlackHen.Threading are hard coded and if I want to add different type of WorkQueue/TaskQueue or change thread communication logic I have to spend too many time for it.
 
IoC - Inversion of control
 
"The Unity Application Block (Unity) is a lightweight, extensible dependency injection container with support for constructor, property, and method call injection. It provides developers with the following advantages:
 
It provides simplified object creation, especially for hierarchical object structures and dependencies, which simplifies application code.
It supports abstraction of requirements; this allows developers to specify dependencies at run time or in configuration and simplify management of crosscutting concerns.
It increases flexibility by deferring component configuration to the container.
It has a service location capability; this allows clients to store or cache the container. This is especially useful in ASP.NET Web applications where the developers can persist the container in the ASP.NET session or application." (c) MSDN
 
On the ExtendedThreadPool you can change all and you should not drop source code
 
P.S. no I did not, I will, Thanks

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Mobile
Web01 | 2.5.120529.1 | Last Updated 2 Oct 2009
Article Copyright 2008 by GSerjo
Everything else Copyright © CodeProject, 1999-2012
Terms of Use
Layout: fixed | fluid