Click here to Skip to main content
Licence Ms-PL
First Posted 29 Jun 2008
Views 32,661
Downloads 440
Bookmarked 75 times

Extended Thread Pool

By GSerjo | 2 Oct 2009
Your own extensible and configurable Thread Pool.

1
1 vote, 5.6%
2
1 vote, 5.6%
3
1 vote, 5.6%
4
15 votes, 83.3%
5
4.96/5 - 18 votes
2 removed
μ 4.61, σa 1.47 [?]

DefaultSample

CorePrioritySample.JPG

Introduction

Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:

  • A Thread Pool should be extensible and configurable
  • A Thread Pool should be as simple as possible

So, I've created an Extended Thread Pool. This Thread Pool is written using C# 3.0.

Extended Thread Pool Features

  • IoC support (I've used Unity1.2)
  • Extensible queues
  • Extensible Task Items
  • Limit on the maximum number of working threads
  • Dynamic thread workers
  • Task priority support
  • Thread Pool statistic support: TaskItemsQueued, TaskItemsStarted, ConsumersWaiting, MaxThreads
  • Extensible logging

Extended Thread Pool Design

  • ITaskItem represents a task
  • ITaskQueue represents the task queue logic
  • ITaskQueueController represents the communication logic between the consumer and the producer (thread safe)
  • WorkThread represents a thread worker
  • ExtendedThreadPool controls work threads

Let's take a look at each class more deeply:

  • The ITaskItem represents some work that should be done.
  • public interface ITaskItem
    {
        void DoWork();
    }
  • TaskItemPriority - WorkThread priority, can be specified for each task.
  • The ITaskQueue is another simple interface that manages the task queue.
  • public interface ITaskQueue
    {
        int Count { get; }
        void Enqueue(ITaskItem item);
        ITaskItem Dequeue();
    }
  • ITaskQueueController provides communication logic between the consumer and the producer.
  • public interface ITaskQueueController : IDisposable
    {
        int ConsumersWaiting { get; }
        void Enqueue(ITaskItem item);
        ITaskItem Dequeue();
    }

I've implemented two task queue controllers derived from ITaskQueueController:

  • DefaultTaskQueueController
  • BoundedTaskQueueController

Default Task Queue Controller

DefaultTaskQueueController is a thread safe wrapper for the ITaskQueue:

public class DefaultTaskQueueController : TaskQueueControllerBase
{
    public DefaultTaskQueueController(ITaskQueue taskQueue)
        : base(taskQueue)
    {
    }
 
    #region Overrides of TaskQueueControllerBase
 
    public override void Enqueue(ITaskItem item)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(item);
            if (_consumersWaiting > 0)
                Monitor.PulseAll(_locker);
        }
    }
 
    public override ITaskItem Dequeue()
    {
        ITaskItem taskItem;
        lock (_locker)
        {
            while (_taskQueue.Count == 0 && !_isDispose)
            {
                _consumersWaiting++;
                Monitor.Wait(_locker);
                _consumersWaiting--;
            }
            if (_isDispose)
                return null;
            taskItem = _taskQueue.Dequeue();
        }
        return taskItem;
    }
 
    #endregion
}

Bounded Task Queue Controller

BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.

public class BoundedTaskQueueController : TaskQueueControllerBase
{
    private readonly int _maxTasksCount;
    private int _producersWaiting;
 
    public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
        : base(taskQueue)
    {
        if (maxTasksCount < 1)
            throw new ArgumentException("MaxTasksCount should be greater 0");
        _maxTasksCount = maxTasksCount;
    }
 
    public override void Enqueue(ITaskItem item)
    {
        lock (_locker)
        {
            while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDispose)
            {
                _producersWaiting++;
                Monitor.Wait(_locker);
                _producersWaiting--;
            }
            _taskQueue.Enqueue(item);
            if (_consumersWaiting > 0)
                Monitor.PulseAll(_locker);
        }
    }
 
    public override ITaskItem Dequeue()
    {
        ITaskItem taskItem;
        lock (_locker)
        {
            while (_taskQueue.Count == 0 && !_isDispose)
            {
                _consumersWaiting++;
                Monitor.Wait(_locker);
                _consumersWaiting--;
            }
            if (_isDispose)
                return null;
            taskItem = _taskQueue.Dequeue();
            if (_producersWaiting > 0)
                Monitor.PulseAll(_locker);
        }
        return taskItem;
    }
}

Extended Thread Pool

The ExtendedThreadPool manages the ITaskQueueController.

	/// <summary>
        /// Create new Instance of ExtendedThreadPool
        /// </summary>
        /// <param name="minThreads">MinThreads per processor</param>
        /// <param name="maxThreads">MaxThreads per processor</param>
        public ExtendedThreadPool(int minThreads, int maxThreads)
            : this(minThreads, maxThreads, MultiThreadingCapacityType.PerProcessor)
        {
        }
 
        public ExtendedThreadPool(int minThreads, int maxThreads,
            MultiThreadingCapacityType multiThreadingCapacityType)
        {
            SetThreadingRange(minThreads, maxThreads, multiThreadingCapacityType);
            _statisticController = new StatisticController(_maxThreads);
        }

MultiThreadingCapacityTyperepresent threading capacity

    public enum MultiThreadingCapacityType
    {
        /// <summary>
        /// Represent all processors
        /// </summary>
        Global,
        /// <summary>
        /// Represent one processor
        /// </summary>
        PerProcessor
    }

The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaitin = 0, a new WorkThread will be created.

	/// <summary>
        /// Add taskItem with default TaskItemPriority.Normal"/>
        /// </summary>
        /// <param name="taskItem"></param>
        public void AddTask(ITaskItem taskItem)
        {
            AddWorkItem(new WorkItem(taskItem));
        }

You can add a task with a TaskItemPriority. Please note, DefaultTaskQueue does not vallidate TaskItemPriority use PriorityTaskQueue for priority tasks

	public void AddTask(ITaskItem taskItem, TaskItemPriority priority)
        {
            AddWorkItem(new WorkItem(taskItem, priority));
        }

Work Thread

The WorkThread class executes a task item and provides logging.

	public void Start()
        {
            while (_isRun)
            {
                WorkItem workItem = null;
                try
                {
                    workItem = _taskQueueController.Dequeue();
                    if (workItem.IsNull())
                        continue;
                    _statisticController.WorkItemStarted();
                    DoWork(workItem);
                }
                catch (Exception ex)
                {
                    _log.Error(ex.Message);
                }
                finally
                {
                    if (workItem.IsNotNull())
                        _statisticController.WorkItemCompleted();
                }
            }
        }

Thread Pool Extensibility

The ExtendedThreadPool supports IoC, ITaskQueueController is marked with DependencyAttribute.

[Dependency]
public ITaskQueueController TaskQueueController { private get; set; }

In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well. I've used Unity to configure the ExtendedThreadPool.

App.config

Example

In one of my projects, I had to use a transaction MSMQ with multi-threading. The SampleTask is derived by ITaskItem, please see the CoreDefaultMsmqSample project for more details.

public void DoWork()
{
    using (var transaction = new MessageQueueTransaction())
    {
        try
        {
            transaction.Begin();
            Message msmqMessage = 
              _queue.Receive(TimeSpan.FromMilliseconds(500), transaction);
            if (msmqMessage != null)
            {
                var message = (ExternalMessage) msmqMessage.Body;
                LogManager.GetLogger(GetType()).Info("Task has been " + 
                           "done, info {0}".FormatWith(message.Data));
                //Do work
                Thread.Sleep(1000);
            }
        }
        catch (Exception ex)
        {
            transaction.Abort();
            LogManager.GetLogger(GetType()).Error(ex.Message);
            return;
        }
        transaction.Commit();
    }
}

History

  • 29 Jun 2008: Initial version.
  • 02 Jul 2008
    • + TransactionalMsmqTaskItem, DefaultTaskQueue.
    • * TaskQueueController classes added disposable support.
    • * WorkThread updated stop logic.
    • * Sample projects:
      • CoreDefaultMsmqSample uses Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue (see the App.config file for more details).
      • CoreDefaultSample uses CoreDefaultSample.TaskQueue (see the App.config file for more details).
  • 10 Jul 2008
    • Added Mike.Strobel's suggestion.
    • + ActionTaskItem.
    • * ExtendedThreadPool added AddTask(Action action) and AddTask(Action action, ThreadPriority priority) methods.
    • Added more tests
  • 02 Oct 2009
    • + PriorityTaskQueue.
    • + StatisticController.
    • * ExtendedThreadPool added support StatisticController, MultiThreadingCapacityType.
    • Added support Unity 1.2
    • Added CorePrioritySample. The project uses Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue (see the App.config file for more details).
    • Added more tests

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)

About the Author

GSerjo

Architect

Russian Federation Russian Federation

Member
B.Sc. in Computer Science.

Sign Up to vote   Poor Excellent
Add a reason or comment to your vote: x
Votes of 3 or less require a comment

Comments and Discussions

 
You must Sign In to use this message board. (secure sign-in)
 
Search this forum  
 FAQ
    Noise  Layout  Per page   
  Refresh
GeneralSimple genius PinmemberGSFriend5:56 9 Sep '09  
GeneralRe: Simple genius PinmemberGSerjo6:24 9 Sep '09  
GeneralThanks Pinmemberjackrabbits15:21 22 Aug '08  
GeneralRe: Thanks PinmemberGSerjo8:46 23 Aug '08  
GeneralKudos and a Suggestion Pinmembermike.strobel9:50 9 Jul '08  
GeneralRe: Kudos and a Suggestion PinmemberGSerjo10:07 9 Jul '08  
GeneralRe: Kudos and a Suggestion [modified] PinmemberGSerjo10:54 9 Jul '08  
GeneralRe: Kudos and a Suggestion Pinmembermike.strobel13:57 9 Jul '08  
GeneralInterlocks and kudos PinmemberRoss Korsky19:37 2 Jul '08  
GeneralRe: Interlocks and kudos PinmemberGSerjo1:33 3 Jul '08  
GeneralRe: Interlocks and kudos PinmemberRoss Korsky8:39 3 Jul '08  
GeneralRe: Interlocks and kudos PinmemberGSerjo9:56 3 Jul '08  
GeneralNeed clarifications PinmemberPIEBALDconsult7:07 30 Jun '08  
AnswerRe: Need clarifications PinmemberGSerjo9:01 30 Jun '08  

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

Permalink | Advertise | Privacy | Mobile
Web02 | 2.5.120206.1 | Last Updated 2 Oct 2009
Article Copyright 2008 by GSerjo
Everything else Copyright © CodeProject, 1999-2012
Terms of Use
Layout: fixed | fluid