
Introduction
Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:
- A Thread Pool should be extensible and configurable
- A Thread Pool should be as simple as possible
So, I've created an Extended Thread Pool. This Thread Pool is written using C# 4.0.
Extended Thread Pool Features
- Really simple
- Extensible queues
- Extensible Task Items
- Limit on the maximum number of working threads
- Dynamic thread workers
- Task priority support
- Extensible logging
Extended Thread Pool Design
ITaskItem represents a task
ITaskQueue represents the task queue logic
ITaskQueueController represents the communication logic between the consumer and the producer (thread safe)
WorkThread represents a thread worker
ExtendedThreadPool controls work threads
Let's take a look at each class more deeply:
- The
ITaskItem represents some work that should be done.
public interface ITaskItem
{
void DoWork();
} TaskItemPriority - WorkThread priority, can be specified for each task.- The
ITaskQueue is another simple interface that manages the task queue.
public interface ITaskQueue
{
int Count { get; }
IWorkItem Dequeue();
void Enqueue(IWorkItem item);
} ITaskQueueController provides communication logic between the consumer and the producer.
public interface ITaskQueueController : IDisposable
{
int ConsumersWaiting { get; }
IWorkItem Dequeue();
void Enqueue(IWorkItem item);
}
I've implemented two task queue controllers derived from ITaskQueueController:
DefaultTaskQueueController
BoundedTaskQueueController
Default Task Queue Controller
DefaultTaskQueueController is a thread safe wrapper for the ITaskQueue:
public sealed class DefaultTaskQueueController : TaskQueueController
{
public DefaultTaskQueueController(ITaskQueue taskQueue)
: base(taskQueue)
{
}
protected override IWorkItem DequeueCore()
{
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDisposed)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDisposed)
{
return null;
}
return _taskQueue.Dequeue();
}
}
protected override void EnqueueCore(IWorkItem item)
{
lock (_locker)
{
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
}
}
Bounded Task Queue Controller
BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.
public sealed class BoundedTaskQueueController : TaskQueueController
{
private readonly int _maxTasksCount;
private int _producersWaiting;
public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
: base(taskQueue)
{
if (maxTasksCount < 1)
{
throw new ArgumentException("MaxTasksCount should be greater 0");
}
_maxTasksCount = maxTasksCount;
}
protected override IWorkItem DequeueCore()
{
IWorkItem taskItem;
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDisposed)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDisposed)
{
return null;
}
taskItem = _taskQueue.Dequeue();
if (_producersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
return taskItem;
}
protected override void EnqueueCore(IWorkItem item)
{
lock (_locker)
{
while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDisposed)
{
_producersWaiting++;
Monitor.Wait(_locker);
_producersWaiting--;
}
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
}
}
Extended Thread Pool
The ExtendedThreadPool manages the ITaskQueueController. The ExtendedThreadPool is created thru the Builder pattern, for exapmle
var threadPool = new ExtendedThreadPool.Builder
{
Name = "My ThreadPool",
MaxThreads = 10,
MultiThreadingCapacityType = MultiThreadingCapacityType.Global
}.Build();
All ExtendedThreadPool's properties has default value, so the simplest way to create ExtendedThreadPool just call Build method.
var threadPool = new ExtendedThreadPool.Builder()
.Build();
MultiThreadingCapacityType represent threading capacity
public enum MultiThreadingCapacityType
{
Global,
PerProcessor
}
The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaiting = 0, a new WorkThread will be created.
You can add a task with a TaskItemPriority. Please note, DefaultTaskQueue does not vallidate TaskItemPriority use PriorityTaskQueue for priority tasks
public void AddTask(ITaskItem taskItem, TaskItemPriority priority = TaskItemPriority.Normal)
{
IWorkItem workItem = WorkItem.FromTaskItem(taskItem, priority);
AddWorkItem(workItem);
}
public void AddTask(Action action, TaskItemPriority priority = TaskItemPriority.Normal)
{
IWorkItem workItem = WorkItem.FromAction(action, priority);
AddWorkItem(workItem);
}
Work Thread
The WorkThread class executes a task item and provides logging.
private void DoWork()
{
while (_isRun)
{
try
{
IWorkItem workItem = _taskQueueController.Dequeue();
if (workItem == null)
{
continue;
}
ProcessItem(workItem);
}
catch (Exception ex)
{
_log.Error(ex);
}
}
}
Thread Pool Extensibility
In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well.
Example
We create ExtendedThreadPool with default settings. The SampleTask is derived by ITaskItem, please see details below.
internal sealed class Program
{
private static readonly ILog _log = LogManager.GetLogger(typeof(Program));
private static IExtendedThreadPool _threadPool;
private static void Main()
{
XmlConfigurator.Configure();
_threadPool = new ExtendedThreadPool.Builder().Build();
AddTasks();
Console.ReadKey();
}
private static void AddTasks()
{
for (int taskIndex = 0; taskIndex < 100; taskIndex++)
{
_threadPool.AddTask(new SampleTask(taskIndex));
}
}
private sealed class SampleTask : ITaskItem
{
private readonly int _taskIndex;
public SampleTask(int taskIndex)
{
_taskIndex = taskIndex;
}
public void DoWork()
{
Thread.Sleep(1000);
_log.InfoFormat("Task {0} has been finished", _taskIndex);
}
}
}
History
- 29 Jun 2008: Initial version.
- 02 Jul 2008
- +
TransactionalMsmqTaskItem, DefaultTaskQueue.
- *
TaskQueueController classes added disposable support.
- *
WorkThread updated stop logic.
- * Sample projects:
- CoreDefaultMsmqSample uses
Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue (see the App.config file for more details).
- CoreDefaultSample uses
CoreDefaultSample.TaskQueue (see the App.config file for more details).
- 10 Jul 2008
- Added Mike.Strobel's suggestion.
- +
ActionTaskItem.
- *
ExtendedThreadPool added AddTask(Action action) and AddTask(Action action, ThreadPriority priority) methods.
- Added more tests
- 02 Oct 2009
- +
PriorityTaskQueue.
- +
StatisticController.
- *
ExtendedThreadPool added support StatisticController, MultiThreadingCapacityType.
- Added support Unity 1.2
- Added CorePrioritySample. The project uses
Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue (see the App.config file for more details).
- Added more tests
- 07 Apr 2013
-
ExtendedThreadPool v2 - Unity has been removed
-
ExtendedThreadPool creation has been simplified
B.Sc. in Computer Science.