Click here to Skip to main content
6,306,412 members and growing! (17,574 online)
Email Password   helpLost your password?
General Programming » Threads, Processes & IPC » Thread Pooling     Advanced License: The Microsoft Public License (Ms-PL)

Extended Thread Pool

By GSerjo

Your own extensible and configurable Thread Pool.
C# (C# 2.0, C# 3.0), Windows, .NET (.NET 2.0, .NET 3.0, .NET 3.5), Architect, Dev
Posted:29 Jun 2008
Updated:9 Jul 2008
Views:14,097
Bookmarked:41 times
Announcements
Loading...
 
Search    
Advanced Search
printPrint   Broken Article?Report       add Share
  Discuss Discuss   Recommend Article Email
12 votes for this article.
Popularity: 4.86 Rating: 4.50 out of 5

1
1 vote, 8.3%
2
1 vote, 8.3%
3

4
10 votes, 83.3%
5

DefaultSample

DefaultMsmqSample

Introduction

Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:

  • A Thread Pool should be extensible and configurable
  • A Thread Pool should be as simple as possible

So, I've created an Extended Thread Pool. This Thread Pool is written using C# 3.0.

Extended Thread Pool Features

  • IoC support (I've used Unity)
  • Extensible queues
  • Extensible Task Items
  • Limit on the maximum number of working threads
  • Dynamic thread workers
  • Thread priority support
  • Extensible logging

Extended Thread Pool Design

  • ITaskItem represents a task
  • ITaskQueue represents the task queue logic
  • ITaskQueueController represents the communication logic between the consumer and the producer (thread safe)
  • WorkThread represents a thread worker
  • ExtendedThreadPool controls work threads

Let's take a look at each class more deeply:

  • The ITaskItem represents some work that should be done.
  • public interface ITaskItem
    {
        ThreadPriority Priority { get; }
        void DoWork();
    }
  • ThreadPriority - WorkThread priority, can be specified for each task.
  • The ITaskQueue is another simple interface that manages the task queue.
  • public interface ITaskQueue
    {
        int Count { get; }
        void Enqueue(ITaskItem item);
        ITaskItem Dequeue();
    }
  • ITaskQueueController provides communication logic between the consumer and the producer.
  • public interface ITaskQueueController : IDisposable
    {
        int ConsumersWaiting { get; }
        void Enqueue(ITaskItem item);
        ITaskItem Dequeue();
    }

I've implemented two task queue controllers derived from ITaskQueueController:

  • DefaultTaskQueueController
  • BoundedTaskQueueController

Default Task Queue Controller

DefaultTaskQueueController is a thread safe wrapper for the ITaskQueue:

public class DefaultTaskQueueController : TaskQueueControllerBase
{
    public DefaultTaskQueueController(ITaskQueue taskQueue)
        : base(taskQueue)
    {
    }

    #region Overrides of TaskQueueControllerBase

    public override void Enqueue(ITaskItem item)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(item);
            if (_consumersWaiting > 0)
                Monitor.PulseAll(_locker);
        }
    }

    public override ITaskItem Dequeue()
    {
        ITaskItem taskItem;
        lock (_locker)
        {
            while (_taskQueue.Count == 0 && !_isDispose)
            {
                _consumersWaiting++;
                Monitor.Wait(_locker);
                _consumersWaiting--;
            }
            if (_isDispose)
                return null;
            taskItem = _taskQueue.Dequeue();
        }
        return taskItem;
    }

    #endregion
}

Bounded Task Queue Controller

BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.

public class BoundedTaskQueueController : TaskQueueControllerBase
{
    private readonly int _maxTasksCount;
    private int _producersWaiting;

    public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
        : base(taskQueue)
    {
        if (maxTasksCount < 1)
            throw new ArgumentException("MaxTasksCount should be greater 0");
        _maxTasksCount = maxTasksCount;
    }

    public override void Enqueue(ITaskItem item)
    {
        lock (_locker)
        {
            while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDispose)
            {
                _producersWaiting++;
                Monitor.Wait(_locker);
                _producersWaiting--;
            }
            _taskQueue.Enqueue(item);
            if (_consumersWaiting > 0)
                Monitor.PulseAll(_locker);
        }
    }

    public override ITaskItem Dequeue()
    {
        ITaskItem taskItem;
        lock (_locker)
        {
            while (_taskQueue.Count == 0 && !_isDispose)
            {
                _consumersWaiting++;
                Monitor.Wait(_locker);
                _consumersWaiting--;
            }
            if (_isDispose)
                return null;
            taskItem = _taskQueue.Dequeue();
            if (_producersWaiting > 0)
                Monitor.PulseAll(_locker);
        }
        return taskItem;
    }
}

Extended Thread Pool

The ExtendedThreadPool manages the ITaskQueueController. The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaitin = 0, a new WorkThread will be created.

public void AddTask(ITaskItem item)
{
    if (item.IsNull())
        throw new ArgumentNullException("item");
    if (!(Enum.IsDefined(typeof (ThreadPriority), item.Priority)))
        throw new ArgumentException("priority");
    TaskQueueController.Enqueue(item);
    if (IsStartNewWorker())
        CreateWorkThread();
}

Work Thread

The WorkThread class executes a task item and provides logging.

public void Start()
{
    while (_isRun)
    {
        try
        {
            ITaskItem item = _taskQueueController.Dequeue();
            if (item.IsNull())
                continue;
            DoWork(item);
        }
        catch (Exception ex)
        {
            _logger.Error(ex.Message);
        }
    }
}

Thread Pool Extensibility

The ExtendedThreadPool supports IoC, ITaskQueueController is marked with DependencyAttribute.

[Dependency]
public ITaskQueueController TaskQueueController { private get; set; }

In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well. I've used Unity to configure the ExtendedThreadPool.

App.config

Example

In one of my projects, I had to use a transaction MSMQ with multi-threading. The SampleTask is derived by ITaskItem, please see the CoreDefaultMsmqSample project for more details.

public void DoWork()
{
    using (var transaction = new MessageQueueTransaction())
    {
        try
        {
            transaction.Begin();
            Message msmqMessage = 
              _queue.Receive(TimeSpan.FromMilliseconds(500), transaction);
            if (msmqMessage != null)
            {
                var message = (ExternalMessage) msmqMessage.Body;
                LogManager.GetLogger(GetType()).Info("Task has been " + 
                           "done, info {0}".FormatWith(message.Data));
                //Do work
                Thread.Sleep(1000);
            }
        }
        catch (Exception ex)
        {
            transaction.Abort();
            LogManager.GetLogger(GetType()).Error(ex.Message);
            return;
        }
        transaction.Commit();
    }
}

History

  • 29 Jun 2008: Initial version.
  • 02 Jul 2008
    • + TransactionalMsmqTaskItem, DefaultTaskQueue.
    • * TaskQueueController classes added disposable support.
    • * WorkThread updated stop logic.
    • * Sample projects:
      • CoreDefaultMsmqSample uses Core.Threading.TaskQueues.DefaultTaskQueue (see the App.config file for more details).
      • CoreDefaultSample uses CoreDefaultSample.TaskQueue (see the App.config file for more details).
  • 10 Jul 2008
    • Added Mike.Strobel's suggestion.
    • + ActionTaskItem.
    • * ExtendedThreadPool added AddTask(Action action) and AddTask(Action action, ThreadPriority priority) methods.
    • Added more tests.

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)

About the Author

GSerjo


Member
B.Sc. in Computer Science.
Occupation: Architect
Company: Lyncell
Location: Russian Federation Russian Federation

Other popular Threads, Processes & IPC articles:

Article Top
You must Sign In to use this message board.
FAQ FAQ 
 
Noise Tolerance  Layout  Per page   
 Msgs 1 to 12 of 12 (Total in Forum: 12) (Refresh)FirstPrevNext
GeneralThanks Pinmemberjackrabbits15:21 22 Aug '08  
GeneralRe: Thanks PinmemberGSerjo8:46 23 Aug '08  
GeneralKudos and a Suggestion Pinmembermike.strobel9:50 9 Jul '08  
GeneralRe: Kudos and a Suggestion PinmemberGSerjo10:07 9 Jul '08  
GeneralRe: Kudos and a Suggestion [modified] PinmemberGSerjo10:54 9 Jul '08  
GeneralRe: Kudos and a Suggestion Pinmembermike.strobel13:57 9 Jul '08  
GeneralInterlocks and kudos PinmemberRoss Korsky19:37 2 Jul '08  
GeneralRe: Interlocks and kudos PinmemberGSerjo1:33 3 Jul '08  
GeneralRe: Interlocks and kudos PinmemberRoss Korsky8:39 3 Jul '08  
GeneralRe: Interlocks and kudos PinmemberGSerjo9:56 3 Jul '08  
GeneralNeed clarifications PinmemberPIEBALDconsult7:07 30 Jun '08  
AnswerRe: Need clarifications PinmemberGSerjo9:01 30 Jun '08  

General General    News News    Question Question    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

PermaLink | Privacy | Terms of Use
Last Updated: 9 Jul 2008
Editor: Smitha Vijayan
Copyright 2008 by GSerjo
Everything else Copyright © CodeProject, 1999-2009
Web13 | Advertise on the Code Project