![]() |
General Programming »
Threads, Processes & IPC »
Thread Pooling
Advanced
License: The Microsoft Public License (Ms-PL)
Extended Thread PoolBy GSerjoYour own extensible and configurable Thread Pool. |
C# (C# 2.0, C# 3.0), Windows, .NET (.NET 2.0, .NET 3.0, .NET 3.5), Architect, Dev
|
|
Advanced Search Add to IE Search |
|
|
|
||||||||||||||||
Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:
So, I've created an Extended Thread Pool. This Thread Pool is written using C# 3.0.
TaskItemsQueued, TaskItemsStarted, ConsumersWaiting, MaxThreadsITaskItem represents a taskITaskQueue represents the task queue logicITaskQueueController represents the communication logic between the consumer and the producer (thread safe)WorkThread represents a thread workerExtendedThreadPool controls work threadsLet's take a look at each class more deeply:
ITaskItem represents some work that should be done.public interface ITaskItem
{
void DoWork();
}
TaskItemPriority - WorkThread priority, can be specified for each task.ITaskQueue is another simple interface that manages the task queue.public interface ITaskQueue
{
int Count { get; }
void Enqueue(ITaskItem item);
ITaskItem Dequeue();
}
ITaskQueueController provides communication logic between the consumer and the producer.public interface ITaskQueueController : IDisposable
{
int ConsumersWaiting { get; }
void Enqueue(ITaskItem item);
ITaskItem Dequeue();
}
I've implemented two task queue controllers derived from ITaskQueueController:
DefaultTaskQueueControllerBoundedTaskQueueControllerDefaultTaskQueueController is a thread safe wrapper for the ITaskQueue:
public class DefaultTaskQueueController : TaskQueueControllerBase
{
public DefaultTaskQueueController(ITaskQueue taskQueue)
: base(taskQueue)
{
}
#region Overrides of TaskQueueControllerBase
public override void Enqueue(ITaskItem item)
{
lock (_locker)
{
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
Monitor.PulseAll(_locker);
}
}
public override ITaskItem Dequeue()
{
ITaskItem taskItem;
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDispose)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDispose)
return null;
taskItem = _taskQueue.Dequeue();
}
return taskItem;
}
#endregion
}
BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.
public class BoundedTaskQueueController : TaskQueueControllerBase
{
private readonly int _maxTasksCount;
private int _producersWaiting;
public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
: base(taskQueue)
{
if (maxTasksCount < 1)
throw new ArgumentException("MaxTasksCount should be greater 0");
_maxTasksCount = maxTasksCount;
}
public override void Enqueue(ITaskItem item)
{
lock (_locker)
{
while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDispose)
{
_producersWaiting++;
Monitor.Wait(_locker);
_producersWaiting--;
}
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
Monitor.PulseAll(_locker);
}
}
public override ITaskItem Dequeue()
{
ITaskItem taskItem;
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDispose)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDispose)
return null;
taskItem = _taskQueue.Dequeue();
if (_producersWaiting > 0)
Monitor.PulseAll(_locker);
}
return taskItem;
}
}
The ExtendedThreadPool manages the ITaskQueueController.
/// <summary>
/// Create new Instance of ExtendedThreadPool
/// </summary>
/// <param name="minThreads">MinThreads per processor</param>
/// <param name="maxThreads">MaxThreads per processor</param>
public ExtendedThreadPool(int minThreads, int maxThreads)
: this(minThreads, maxThreads, MultiThreadingCapacityType.PerProcessor)
{
}
public ExtendedThreadPool(int minThreads, int maxThreads,
MultiThreadingCapacityType multiThreadingCapacityType)
{
SetThreadingRange(minThreads, maxThreads, multiThreadingCapacityType);
_statisticController = new StatisticController(_maxThreads);
}
MultiThreadingCapacityTyperepresent threading capacity
public enum MultiThreadingCapacityType
{
/// <summary>
/// Represent all processors
/// </summary>
Global,
/// <summary>
/// Represent one processor
/// </summary>
PerProcessor
}
The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaitin = 0, a new WorkThread will be created.
/// <summary>
/// Add taskItem with default TaskItemPriority.Normal"/>
/// </summary>
/// <param name="taskItem"></param>
public void AddTask(ITaskItem taskItem)
{
AddWorkItem(new WorkItem(taskItem));
}
You can add a task with a TaskItemPriority. Please note, DefaultTaskQueue does not vallidate TaskItemPriority use PriorityTaskQueue for priority tasks
public void AddTask(ITaskItem taskItem, TaskItemPriority priority)
{
AddWorkItem(new WorkItem(taskItem, priority));
}
The WorkThread class executes a task item and provides logging.
public void Start()
{
while (_isRun)
{
WorkItem workItem = null;
try
{
workItem = _taskQueueController.Dequeue();
if (workItem.IsNull())
continue;
_statisticController.WorkItemStarted();
DoWork(workItem);
}
catch (Exception ex)
{
_log.Error(ex.Message);
}
finally
{
if (workItem.IsNotNull())
_statisticController.WorkItemCompleted();
}
}
}
The ExtendedThreadPool supports IoC, ITaskQueueController is marked with DependencyAttribute.
[Dependency]
public ITaskQueueController TaskQueueController { private get; set; }
In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well. I've used Unity to configure the ExtendedThreadPool.
In one of my projects, I had to use a transaction MSMQ with multi-threading. The SampleTask is derived by ITaskItem, please see the CoreDefaultMsmqSample project for more details.
public void DoWork()
{
using (var transaction = new MessageQueueTransaction())
{
try
{
transaction.Begin();
Message msmqMessage =
_queue.Receive(TimeSpan.FromMilliseconds(500), transaction);
if (msmqMessage != null)
{
var message = (ExternalMessage) msmqMessage.Body;
LogManager.GetLogger(GetType()).Info("Task has been " +
"done, info {0}".FormatWith(message.Data));
//Do work
Thread.Sleep(1000);
}
}
catch (Exception ex)
{
transaction.Abort();
LogManager.GetLogger(GetType()).Error(ex.Message);
return;
}
transaction.Commit();
}
}
TransactionalMsmqTaskItem, DefaultTaskQueue.TaskQueueController classes added disposable support.WorkThread updated stop logic.Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue (see the App.config file for more details).CoreDefaultSample.TaskQueue (see the App.config file for more details).ActionTaskItem.ExtendedThreadPool added AddTask(Action action) and AddTask(Action action, ThreadPriority priority) methods.PriorityTaskQueue.StatisticController.ExtendedThreadPool added support StatisticController, MultiThreadingCapacityType.Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue (see the App.config file for more details).| You must Sign In to use this message board. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
General
News
Question
Answer
Joke
Rant
Admin
|
PermaLink |
Privacy |
Terms of Use
Last Updated: 2 Oct 2009 Editor: Sean Ewington |
Copyright 2008 by GSerjo Everything else Copyright © CodeProject, 1999-2009 Web19 | Advertise on the Code Project |