Extended Thread Pool
4.98/5 (23 votes)
Your own extensible and configurable Thread Pool.
Introduction
Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:
- A Thread Pool should be extensible and configurable
- A Thread Pool should be as simple as possible
So, I've created a Tiny Thread Pool. This Thread Pool is written using C# 4.0.
Tiny Thread Pool Features
- Really simple
- Extensible queues
- Extensible Task Items
- Limit on the maximum number of working threads
- Dynamic thread workers
- Task priority support
- Extensible logging
Tiny Thread Pool Design
ITaskItemrepresents a taskITaskQueuerepresents the task queue logicITaskQueueControllerrepresents the communication logic between the consumer and the producer (thread safe)WorkThreadrepresents a thread workerTinyThreadPoolcontrols work threads
Let's take a look at each class more deeply:
- The
ITaskItemrepresents some work that should be done.
public interface ITaskItem
{
void DoWork();
}
TaskItemPriority-WorkThreadpriority can be specified for each task.- The
ITaskQueueis another simple interface that manages the task queue.
/// <summary>
/// Represent the queue.
/// </summary>
public interface ITaskQueue
{
/// <summary>
/// Count of work item.
/// </summary>
int Count { get; }
/// <summary>
/// Dequeue the work item.
/// </summary>
/// <returns>The work item.</returns>
IWorkItem Dequeue();
/// <summary>
/// Enqueue the work item.
/// </summary>
/// <param name="item">The work item.</param>
void Enqueue(IWorkItem item);
}
ITaskQueueControllerprovides communication logic between the consumer and the producer.
public interface ITaskQueueController : IDisposable
{
int ConsumersWaiting { get; }
IWorkItem Dequeue();
void Enqueue(IWorkItem item);
}
I've implemented two task queue controllers derived from ITaskQueueController:
DefaultTaskQueueControllerBoundedTaskQueueController
Default Task Queue Controller
DefaultTaskQueueController is a thread-safe wrapper for the ITaskQueue:
public sealed class DefaultTaskQueueController : TaskQueueController
{
public DefaultTaskQueueController(ITaskQueue taskQueue)
: base(taskQueue)
{
}
protected override IWorkItem DequeueCore()
{
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDisposed)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDisposed)
{
return null;
}
return _taskQueue.Dequeue();
}
}
protected override void EnqueueCore(IWorkItem item)
{
lock (_locker)
{
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
}
}
Bounded Task Queue Controller
BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.
public sealed class BoundedTaskQueueController : TaskQueueController
{
private readonly int _maxTasksCount;
private int _producersWaiting;
public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
: base(taskQueue)
{
if (maxTasksCount < 1)
{
throw new ArgumentException("MaxTasksCount should be greater 0");
}
_maxTasksCount = maxTasksCount;
}
protected override IWorkItem DequeueCore()
{
IWorkItem taskItem;
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDisposed)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDisposed)
{
return null;
}
taskItem = _taskQueue.Dequeue();
if (_producersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
return taskItem;
}
protected override void EnqueueCore(IWorkItem item)
{
lock (_locker)
{
while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDisposed)
{
_producersWaiting++;
Monitor.Wait(_locker);
_producersWaiting--;
}
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
{
Monitor.PulseAll(_locker);
}
}
}
}
Tiny Thread Pool
The TinyThreadPool manages the ITaskQueueController. The TinyThreadPool is created thru Create method, for example
var threadPool = TinyThreadPool.Create(x =>
{
x.Name = "My ThreadPool";
x.MinThreads = 2;
x.MaxThreads = 10;
x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
});
All TinyThreadPool's properties have a default value, so the simplest way to create TinyThreadPool just call Default property.
var threadPool = TinyThreadPool.Default;
MultiThreadingCapacityType represent a threading capacity
public enum MultiThreadingCapacityType
{
/// <summary>
/// Represent all processors
/// </summary>
Global,
/// <summary>
/// Represent one processor
/// </summary>
PerProcessor
}
The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaiting = 0, a new WorkThread will be created.
You can add a task with a TaskItemPriority. Please note, DefaultTaskQueue does not validate TaskItemPriority use PriorityTaskQueue for priority tasks
/// <summary>
/// Add new task.
/// </summary>
/// <param name="taskItem">Represent task.</param>
/// <param name="priority">Task priority.</param>
/// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
public void AddTask(ITaskItem taskItem, TaskItemPriority priority = TaskItemPriority.Normal)
{
IWorkItem workItem = WorkItem.FromTaskItem(taskItem, priority);
AddWorkItem(workItem);
}
/// <summary>
/// Add new task as <see cref="Action" />.
/// </summary>
/// <param name="action">Represent task.</param>
/// <param name="priority">Task priority.</param>
/// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
public void AddTask(Action action, TaskItemPriority priority = TaskItemPriority.Normal)
{
IWorkItem workItem = WorkItem.FromAction(action, priority);
AddWorkItem(workItem);
}
Work Thread
The WorkThread class executes a task item and provides logging.
private void DoWork()
{
while (_isRun)
{
try
{
IWorkItem workItem = _taskQueueController.Dequeue();
if (workItem == null)
{
continue;
}
ProcessItem(workItem);
}
catch (Exception ex)
{
_log.Error(ex);
}
}
}
Thread Pool Extensibility
In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well.
Example
We create TinyThreadPool with custom settings. The SampleTask is derived by ITaskItem, please see details below.
internal class Program
{
private static ITinyThreadPool _threadPool;
private static void AddTasks()
{
for (int taskIndex = 0; taskIndex < 50; taskIndex++)
{
_threadPool.AddTask(new SampleTask(taskIndex));
}
}
private static void Main()
{
// create default TinyThreadPool instance or thru method TinyThreadPool.Create
// _threadPool = TinyThreadPool.Default;
_threadPool = TinyThreadPool.Create(x =>
{
x.Name = "My ThreadPool";
x.MinThreads = 2;
x.MaxThreads = 10;
x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
});
AddTasks();
Console.ReadKey();
}
private sealed class SampleTask : ITaskItem
{
private readonly int _taskIndex;
public SampleTask(int taskIndex)
{
_taskIndex = taskIndex;
}
public void DoWork()
{
Thread.Sleep(100);
Console.WriteLine("Task {0} has been finished", _taskIndex);
}
}
}
History
- 29 Jun 2008: Initial version.
- 02 Jul 2008
- +
TransactionalMsmqTaskItem,DefaultTaskQueue. - *
TaskQueueControllerclasses added disposable support. - *
WorkThreadupdated stop logic. - * Sample projects:
- CoreDefaultMsmqSample uses
Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue(see the App.config file for more details). - CoreDefaultSample uses
CoreDefaultSample.TaskQueue(see the App.config file for more details).
- CoreDefaultMsmqSample uses
- +
- 10 Jul 2008
- Added Mike.Strobel's suggestion.
- +
ActionTaskItem. - *
ExtendedThreadPooladdedAddTask(Action action)andAddTask(Action action, ThreadPriority priority)methods. - Added more tests
- 02 Oct 2009
- +
PriorityTaskQueue. - +
StatisticController. - *
ExtendedThreadPooladded supportStatisticController,MultiThreadingCapacityType. - Added support Unity 1.2
- Added CorePrioritySample. The project uses
Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue(see the App.config file for more details). - Added more tests
- +
- 07 Apr 2013
-
ExtendedThreadPoolv2 - Unity has been removed
-
ExtendedThreadPoolcreation has been simplified
-
- 22 Apr 2015
ExtendedThreadPool->TinyThreadPoolTinyThreadPoolthe newcreating API based on lambdaTinyThreadPoolis a part of theNelibur.SwordNuGet package
