![]() |
General Programming »
Threads, Processes & IPC »
Thread Pooling
Advanced
License: The Microsoft Public License (Ms-PL)
Extended Thread PoolBy GSerjoYour own extensible and configurable Thread Pool. |
C# (C# 2.0, C# 3.0), Windows, .NET (.NET 2.0, .NET 3.0, .NET 3.5), Architect, Dev
|
|
Advanced Search |
|
|
|
||||||||||||||||
Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:
So, I've created an Extended Thread Pool. This Thread Pool is written using C# 3.0.
ITaskItem represents a taskITaskQueue represents the task queue logicITaskQueueController represents the communication logic between the consumer and the producer (thread safe)WorkThread represents a thread workerExtendedThreadPool controls work threadsLet's take a look at each class more deeply:
ITaskItem represents some work that should be done.public interface ITaskItem
{
ThreadPriority Priority { get; }
void DoWork();
}
ThreadPriority - WorkThread priority, can be specified for each task.ITaskQueue is another simple interface that manages the task queue.public interface ITaskQueue
{
int Count { get; }
void Enqueue(ITaskItem item);
ITaskItem Dequeue();
}
ITaskQueueController provides communication logic between the consumer and the producer.public interface ITaskQueueController : IDisposable
{
int ConsumersWaiting { get; }
void Enqueue(ITaskItem item);
ITaskItem Dequeue();
}
I've implemented two task queue controllers derived from ITaskQueueController:
DefaultTaskQueueControllerBoundedTaskQueueControllerDefaultTaskQueueController is a thread safe wrapper for the ITaskQueue:
public class DefaultTaskQueueController : TaskQueueControllerBase
{
public DefaultTaskQueueController(ITaskQueue taskQueue)
: base(taskQueue)
{
}
#region Overrides of TaskQueueControllerBase
public override void Enqueue(ITaskItem item)
{
lock (_locker)
{
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
Monitor.PulseAll(_locker);
}
}
public override ITaskItem Dequeue()
{
ITaskItem taskItem;
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDispose)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDispose)
return null;
taskItem = _taskQueue.Dequeue();
}
return taskItem;
}
#endregion
}
BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.
public class BoundedTaskQueueController : TaskQueueControllerBase
{
private readonly int _maxTasksCount;
private int _producersWaiting;
public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
: base(taskQueue)
{
if (maxTasksCount < 1)
throw new ArgumentException("MaxTasksCount should be greater 0");
_maxTasksCount = maxTasksCount;
}
public override void Enqueue(ITaskItem item)
{
lock (_locker)
{
while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDispose)
{
_producersWaiting++;
Monitor.Wait(_locker);
_producersWaiting--;
}
_taskQueue.Enqueue(item);
if (_consumersWaiting > 0)
Monitor.PulseAll(_locker);
}
}
public override ITaskItem Dequeue()
{
ITaskItem taskItem;
lock (_locker)
{
while (_taskQueue.Count == 0 && !_isDispose)
{
_consumersWaiting++;
Monitor.Wait(_locker);
_consumersWaiting--;
}
if (_isDispose)
return null;
taskItem = _taskQueue.Dequeue();
if (_producersWaiting > 0)
Monitor.PulseAll(_locker);
}
return taskItem;
}
}
The ExtendedThreadPool manages the ITaskQueueController. The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaitin = 0, a new WorkThread will be created.
public void AddTask(ITaskItem item)
{
if (item.IsNull())
throw new ArgumentNullException("item");
if (!(Enum.IsDefined(typeof (ThreadPriority), item.Priority)))
throw new ArgumentException("priority");
TaskQueueController.Enqueue(item);
if (IsStartNewWorker())
CreateWorkThread();
}
The WorkThread class executes a task item and provides logging.
public void Start()
{
while (_isRun)
{
try
{
ITaskItem item = _taskQueueController.Dequeue();
if (item.IsNull())
continue;
DoWork(item);
}
catch (Exception ex)
{
_logger.Error(ex.Message);
}
}
}
The ExtendedThreadPool supports IoC, ITaskQueueController is marked with DependencyAttribute.
[Dependency]
public ITaskQueueController TaskQueueController { private get; set; }
In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well. I've used Unity to configure the ExtendedThreadPool.
In one of my projects, I had to use a transaction MSMQ with multi-threading. The SampleTask is derived by ITaskItem, please see the CoreDefaultMsmqSample project for more details.
public void DoWork()
{
using (var transaction = new MessageQueueTransaction())
{
try
{
transaction.Begin();
Message msmqMessage =
_queue.Receive(TimeSpan.FromMilliseconds(500), transaction);
if (msmqMessage != null)
{
var message = (ExternalMessage) msmqMessage.Body;
LogManager.GetLogger(GetType()).Info("Task has been " +
"done, info {0}".FormatWith(message.Data));
//Do work
Thread.Sleep(1000);
}
}
catch (Exception ex)
{
transaction.Abort();
LogManager.GetLogger(GetType()).Error(ex.Message);
return;
}
transaction.Commit();
}
}
TransactionalMsmqTaskItem, DefaultTaskQueue.TaskQueueController classes added disposable support.WorkThread updated stop logic.Core.Threading.TaskQueues.DefaultTaskQueue (see the App.config file for more details).CoreDefaultSample.TaskQueue (see the App.config file for more details).ActionTaskItem.ExtendedThreadPool added AddTask(Action action) and AddTask(Action action, ThreadPriority priority) methods.| You must Sign In to use this message board. | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
General
News
Question
Answer
Joke
Rant
Admin
|
PermaLink |
Privacy |
Terms of Use
Last Updated: 9 Jul 2008 Editor: Smitha Vijayan |
Copyright 2008 by GSerjo Everything else Copyright © CodeProject, 1999-2009 Web13 | Advertise on the Code Project |