

Introduction
Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:
- A Thread Pool should be extensible and configurable
- A Thread Pool should be as simple as possible
So, I've created an Extended Thread Pool. This Thread Pool is written using C# 3.0.
Extended Thread Pool Features
- IoC support (I've used Unity1.2)
- Extensible queues
- Extensible Task Items
- Limit on the maximum number of working threads
- Dynamic thread workers
- Task priority support
- Thread Pool statistic support:
TaskItemsQueued, TaskItemsStarted, ConsumersWaiting, MaxThreads
- Extensible logging
Extended Thread Pool Design
ITaskItem represents a task
ITaskQueue represents the task queue logic
ITaskQueueController represents the communication logic between the consumer and the producer (thread safe)
WorkThread represents a thread worker
ExtendedThreadPool controls work threads
Let's take a look at each class more deeply:
- The
ITaskItem represents some work that should be done.
public interface ITaskItem
{
void DoWork();
}
TaskItemPriority - WorkThread priority, can be specified for each task.
The ITaskQueue is another simple interface that manages the task queue.
public interface ITaskQueue
{
int Count { get; }
void Enqueue(ITaskItem item);
ITaskItem Dequeue();
}
ITaskQueueController provides communication logic between the consumer and the producer.
public interface ITaskQueueController : IDisposable
{
int ConsumersWaiting { get; }
void Enqueue(ITaskItem item);
ITaskItem Dequeue();
}
I've implemented two task queue controllers derived from ITaskQueueController:
DefaultTaskQueueController
BoundedTaskQueueController
Default Task Queue Controller
DefaultTaskQueueController is a thread safe wrapper for the ITaskQueue:
public class DefaultTaskQueueController : TaskQueueControllerBase
{
public DefaultTaskQueueController(ITaskQueue taskQueue)
: base(taskQueue)
{
}
#region Overrides of TaskQueueControllerBase
public override void Enqueue(ITaskItem item)
{
lock (_locker)
{
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
Monitor.PulseAll(_locker);
}
}
public override ITaskItem Dequeue()
{
ITaskItem taskItem;
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDispose)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDispose)
return null;
taskItem = _taskQueue.Dequeue();
}
return taskItem;
}
#endregion
}
Bounded Task Queue Controller
BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.
public class BoundedTaskQueueController : TaskQueueControllerBase
{
private readonly int _maxTasksCount;
private int _producersWaiting;
public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
: base(taskQueue)
{
if (maxTasksCount < 1)
throw new ArgumentException("MaxTasksCount should be greater 0");
_maxTasksCount = maxTasksCount;
}
public override void Enqueue(ITaskItem item)
{
lock (_locker)
{
while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDispose)
{
_producersWaiting++;
Monitor.Wait(_locker);
_producersWaiting--;
}
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
Monitor.PulseAll(_locker);
}
}
public override ITaskItem Dequeue()
{
ITaskItem taskItem;
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDispose)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDispose)
return null;
taskItem = _taskQueue.Dequeue();
if (_producersWaiting > 0)
Monitor.PulseAll(_locker);
}
return taskItem;
}
}
Extended Thread Pool
The ExtendedThreadPool manages the ITaskQueueController.
public ExtendedThreadPool(int minThreads, int maxThreads)
: this(minThreads, maxThreads, MultiThreadingCapacityType.PerProcessor)
{
}
public ExtendedThreadPool(int minThreads, int maxThreads,
MultiThreadingCapacityType multiThreadingCapacityType)
{
SetThreadingRange(minThreads, maxThreads, multiThreadingCapacityType);
_statisticController = new StatisticController(_maxThreads);
}
MultiThreadingCapacityTyperepresent threading capacity
public enum MultiThreadingCapacityType
{
Global,
PerProcessor
}
The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaitin = 0, a new WorkThread will be created.
public void AddTask(ITaskItem taskItem)
{
AddWorkItem(new WorkItem(taskItem));
}
You can add a task with a TaskItemPriority. Please note, DefaultTaskQueue does not vallidate TaskItemPriority use PriorityTaskQueue for priority tasks
public void AddTask(ITaskItem taskItem, TaskItemPriority priority)
{
AddWorkItem(new WorkItem(taskItem, priority));
}
Work Thread
The WorkThread class executes a task item and provides logging.
public void Start()
{
while (_isRun)
{
WorkItem workItem = null;
try
{
workItem = _taskQueueController.Dequeue();
if (workItem.IsNull())
continue;
_statisticController.WorkItemStarted();
DoWork(workItem);
}
catch (Exception ex)
{
_log.Error(ex.Message);
}
finally
{
if (workItem.IsNotNull())
_statisticController.WorkItemCompleted();
}
}
}
Thread Pool Extensibility
The ExtendedThreadPool supports IoC, ITaskQueueController is marked with DependencyAttribute.
[Dependency]
public ITaskQueueController TaskQueueController { private get; set; }
In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well. I've used Unity to configure the ExtendedThreadPool.

Example
In one of my projects, I had to use a transaction MSMQ with multi-threading. The SampleTask is derived by ITaskItem, please see the CoreDefaultMsmqSample project for more details.
public void DoWork()
{
using (var transaction = new MessageQueueTransaction())
{
try
{
transaction.Begin();
Message msmqMessage =
_queue.Receive(TimeSpan.FromMilliseconds(500), transaction);
if (msmqMessage != null)
{
var message = (ExternalMessage) msmqMessage.Body;
LogManager.GetLogger(GetType()).Info("Task has been " +
"done, info {0}".FormatWith(message.Data));
Thread.Sleep(1000);
}
}
catch (Exception ex)
{
transaction.Abort();
LogManager.GetLogger(GetType()).Error(ex.Message);
return;
}
transaction.Commit();
}
}
History
- 29 Jun 2008: Initial version.
- 02 Jul 2008
- +
TransactionalMsmqTaskItem, DefaultTaskQueue.
- *
TaskQueueController classes added disposable support.
- *
WorkThread updated stop logic.
- * Sample projects:
- CoreDefaultMsmqSample uses
Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue (see the App.config file for more details).
- CoreDefaultSample uses
CoreDefaultSample.TaskQueue (see the App.config file for more details).
- 10 Jul 2008
- Added Mike.Strobel's suggestion.
- +
ActionTaskItem.
- *
ExtendedThreadPool added AddTask(Action action) and AddTask(Action action, ThreadPriority priority) methods.
- Added more tests
- 02 Oct 2009
- +
PriorityTaskQueue.
- +
StatisticController.
- *
ExtendedThreadPool added support StatisticController, MultiThreadingCapacityType.
- Added support Unity 1.2
- Added CorePrioritySample. The project uses
Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue (see the App.config file for more details).
- Added more tests