Click here to Skip to main content
15,881,424 members
Articles / Programming Languages / C# 4.0

Extended Thread Pool

Rate me:
Please Sign up or sign in to vote.
4.98/5 (25 votes)
6 Apr 2013Ms-PL3 min read 81.6K   1.8K   119   12
Your own extensible and configurable Thread Pool.

Image 1  

Introduction

Different kinds of Thread Pools are available: Smart Thread Pool, BlackHen.Threading, but nothing supports extensions. All Thread Pools are hard coded, and it's too difficult to work with any. I've tried to address these issues:

  • A Thread Pool should be extensible and configurable
  • A Thread Pool should be as simple as possible

So, I've created a Tiny Thread Pool. This Thread Pool is written using C# 4.0.

Tiny Thread Pool Features

  • Really simple 
  • Extensible queues
  • Extensible Task Items 
  • Limit on the maximum number of working threads
  • Dynamic thread workers
  • Task priority support 
  • Extensible logging

Tiny Thread Pool Design

  • ITaskItem represents a task
  • ITaskQueue represents the task queue logic 
  • ITaskQueueController represents the communication logic between the consumer and the producer (thread safe)
  • WorkThread represents a thread worker
  • TinyThreadPool controls work threads

Let's take a look at each class more deeply:

  • The ITaskItem represents some work that should be done.
C#
public interface ITaskItem
{
    void DoWork();
}
  • TaskItemPriority - WorkThread priority can be specified for each task.
  • The ITaskQueue is another simple interface that manages the task queue.
C#
/// <summary>
/// Represent the queue.
/// </summary>
public interface ITaskQueue
{
    /// <summary>
    /// Count of work item.
    /// </summary>
    int Count { get; }

    /// <summary>
    /// Dequeue the work item.
    /// </summary>
    /// <returns>The work item.</returns>
    IWorkItem Dequeue();

    /// <summary>
    /// Enqueue the work item.
    /// </summary>
    /// <param name="item">The work item.</param>
    void Enqueue(IWorkItem item);
}
  • ITaskQueueController provides communication logic between the consumer and the producer.
C#
public interface ITaskQueueController : IDisposable
{
    int ConsumersWaiting { get; }
    IWorkItem Dequeue();
    void Enqueue(IWorkItem item);
}

I've implemented two task queue controllers derived from ITaskQueueController:

  • DefaultTaskQueueController
  • BoundedTaskQueueController

Default Task Queue Controller

DefaultTaskQueueController is a thread-safe wrapper for the ITaskQueue:

C#
public sealed class DefaultTaskQueueController : TaskQueueController
{
    public DefaultTaskQueueController(ITaskQueue taskQueue)
        : base(taskQueue)
    {
    }

    protected override IWorkItem DequeueCore()
    {
        lock (_locker)
        {
            while (_taskQueue.Count == 0 && !_isDisposed)
            {
                _consumersWaiting++;
                Monitor.Wait(_locker);
                _consumersWaiting--;
            }
            if (_isDisposed)
            {
                return null;
            }
            return _taskQueue.Dequeue();
        }
    }

    protected override void EnqueueCore(IWorkItem item)
    {
        lock (_locker)
        {
            _taskQueue.Enqueue(item);
            if (_consumersWaiting > 0)
            {
                Monitor.PulseAll(_locker);
            }
        }
    }
}

Bounded Task Queue Controller

BoundedTaskQueueController (thread safe) - if the producer task or tasks create items at a rate faster than the consumer can process them, the system can run into unbounded memory usage. The BoundedTaskQueueController allows you to place a limit on the size that the queue may reach before the producer is forced to block.

C#
public sealed class BoundedTaskQueueController : TaskQueueController
{
    private readonly int _maxTasksCount;
    private int _producersWaiting;

    public BoundedTaskQueueController(ITaskQueue taskQueue, int maxTasksCount)
        : base(taskQueue)
    {
        if (maxTasksCount < 1)
        {
            throw new ArgumentException("MaxTasksCount should be greater 0");
        }
        _maxTasksCount = maxTasksCount;
    }

    protected override IWorkItem DequeueCore()
    {
        IWorkItem taskItem;
        lock (_locker)
        {
            while (_taskQueue.Count == 0 && !_isDisposed)
            {
                _consumersWaiting++;
                Monitor.Wait(_locker);
                _consumersWaiting--;
            }
            if (_isDisposed)
            {
                return null;
            }
            taskItem = _taskQueue.Dequeue();
            if (_producersWaiting > 0)
            {
                Monitor.PulseAll(_locker);
            }
        }
        return taskItem;
    }

    protected override void EnqueueCore(IWorkItem item)
    {
        lock (_locker)
        {
            while (_taskQueue.Count == (_maxTasksCount - 1) && !_isDisposed)
            {
                _producersWaiting++;
                Monitor.Wait(_locker);
                _producersWaiting--;
            }
            _taskQueue.Enqueue(item);
            if (_consumersWaiting > 0)
            {
                Monitor.PulseAll(_locker);
            }
        }
    }
}

Tiny Thread Pool

The TinyThreadPool manages the ITaskQueueController. The  TinyThreadPool is created thru Create method, for example 

C#
var threadPool = TinyThreadPool.Create(x =>
{
    x.Name = "My ThreadPool";
    x.MinThreads = 2;
    x.MaxThreads = 10;
    x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
});

All  TinyThreadPool's properties have a default value, so the simplest way to create TinyThreadPool  just call Default property. 

C#
var threadPool = TinyThreadPool.Default;

MultiThreadingCapacityType represent a threading capacity

C#
public enum MultiThreadingCapacityType
{
    /// <summary>
    /// Represent all processors
    /// </summary>
    Global,
    /// <summary>
    /// Represent one processor
    /// </summary>
    PerProcessor
}

The AddTask method is used to add a task into the task pipeline. If the maximum thread limit is not reached and ConsamersWaiting = 0, a new WorkThread will be created.  

You can add a task with a TaskItemPriority. Please note, DefaultTaskQueue does not validate TaskItemPriority use PriorityTaskQueue for priority tasks

C#
/// <summary>
/// Add new task.
/// </summary>
/// <param name="taskItem">Represent task.</param>
/// <param name="priority">Task priority.</param>
/// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
public void AddTask(ITaskItem taskItem, TaskItemPriority priority = TaskItemPriority.Normal)
{
    IWorkItem workItem = WorkItem.FromTaskItem(taskItem, priority);
    AddWorkItem(workItem);
}

/// <summary>
/// Add new task as <see cref="Action" />.
/// </summary>
/// <param name="action">Represent task.</param>
/// <param name="priority">Task priority.</param>
/// <remarks>Default task priority is <see cref="TaskItemPriority.Normal" />.</remarks>
public void AddTask(Action action, TaskItemPriority priority = TaskItemPriority.Normal)
{
    IWorkItem workItem = WorkItem.FromAction(action, priority);
    AddWorkItem(workItem);
}

Work Thread

The WorkThread class executes a task item and provides logging.

C#
private void DoWork()
{
    while (_isRun)
    {
        try
        {
            IWorkItem workItem = _taskQueueController.Dequeue();
            if (workItem == null)
            {
                continue;
            }
            ProcessItem(workItem);
        }
        catch (Exception ex)
        {
            _log.Error(ex);
        }
    }
}

Thread Pool Extensibility 

In case you need a more powerful task queue, you need to implement the ITaskQueue, and don't worry about thread safety: you can create your own ITaskQueueController as well.  

Example

We create TinyThreadPool with custom settings. The SampleTask is derived by ITaskItem, please see  details below. 

C#
internal class Program
{
    private static ITinyThreadPool _threadPool;

    private static void AddTasks()
    {
        for (int taskIndex = 0; taskIndex < 50; taskIndex++)
        {
            _threadPool.AddTask(new SampleTask(taskIndex));
        }
    }

    private static void Main()
    {
        // create default TinyThreadPool instance or thru method TinyThreadPool.Create
        // _threadPool = TinyThreadPool.Default;

        _threadPool = TinyThreadPool.Create(x =>
        {
            x.Name = "My ThreadPool";
            x.MinThreads = 2;
            x.MaxThreads = 10;
            x.MultiThreadingCapacity = MultiThreadingCapacity.Global;
        });

        AddTasks();
        Console.ReadKey();
    }

    private sealed class SampleTask : ITaskItem
    {
        private readonly int _taskIndex;

        public SampleTask(int taskIndex)
        {
            _taskIndex = taskIndex;
        }

        public void DoWork()
        {
            Thread.Sleep(100);
            Console.WriteLine("Task {0} has been finished", _taskIndex);
        }
    }
}

History

  • 29 Jun 2008: Initial version.
  • 02 Jul 2008
    • + TransactionalMsmqTaskItem, DefaultTaskQueue.
    • * TaskQueueController classes added disposable support.
    • * WorkThread updated stop logic.
    • * Sample projects: 
      • CoreDefaultMsmqSample uses Core.Threading.ThreadPools.TaskQueues.DefaultTaskQueue (see the App.config file for more details).
      • CoreDefaultSample uses CoreDefaultSample.TaskQueue (see the App.config file for more details).
  • 10 Jul 2008
    • Added Mike.Strobel's suggestion.
    • + ActionTaskItem.
    • * ExtendedThreadPool added AddTask(Action action) and AddTask(Action action, ThreadPriority priority) methods.
    • Added more tests
  • 02 Oct 2009
    • + PriorityTaskQueue.
    • + StatisticController.
    • * ExtendedThreadPool added support StatisticController, MultiThreadingCapacityType.
    • Added support Unity 1.2
    • Added CorePrioritySample. The project uses Core.Threading.ThreadPools.TaskQueues.PriorityTaskQueue (see the App.config file for more details).
    • Added more tests 
  • 07 Apr 2013 
    •  ExtendedThreadPool  v2
    •  Unity has been removed
    •  ExtendedThreadPool creation has been simplified 
  • 22 Apr 2015
    • ExtendedThreadPool -> TinyThreadPool
    • TinyThreadPool the new  creating API based on lambda
    • TinyThreadPool is a part of the Nelibur.Sword NuGet package

License

This article, along with any associated source code and files, is licensed under The Microsoft Public License (Ms-PL)


Written By
Software Developer (Senior)
United States United States
B.Sc. in Computer Science.

Comments and Discussions

 
GeneralThanks Pin
jackrabbits22-Aug-08 14:21
jackrabbits22-Aug-08 14:21 
GeneralRe: Thanks Pin
Sergey Morenko23-Aug-08 7:46
professionalSergey Morenko23-Aug-08 7:46 
GeneralKudos and a Suggestion Pin
mike.strobel9-Jul-08 8:50
mike.strobel9-Jul-08 8:50 
GeneralRe: Kudos and a Suggestion Pin
Sergey Morenko9-Jul-08 9:07
professionalSergey Morenko9-Jul-08 9:07 
GeneralRe: Kudos and a Suggestion [modified] Pin
Sergey Morenko9-Jul-08 9:54
professionalSergey Morenko9-Jul-08 9:54 
GeneralRe: Kudos and a Suggestion Pin
mike.strobel9-Jul-08 12:57
mike.strobel9-Jul-08 12:57 
GeneralInterlocks and kudos Pin
Ross Korsky2-Jul-08 18:37
Ross Korsky2-Jul-08 18:37 
GeneralRe: Interlocks and kudos Pin
Sergey Morenko3-Jul-08 0:33
professionalSergey Morenko3-Jul-08 0:33 
GeneralRe: Interlocks and kudos Pin
Ross Korsky3-Jul-08 7:39
Ross Korsky3-Jul-08 7:39 
GeneralRe: Interlocks and kudos Pin
Sergey Morenko3-Jul-08 8:56
professionalSergey Morenko3-Jul-08 8:56 
GeneralNeed clarifications Pin
PIEBALDconsult30-Jun-08 6:07
mvePIEBALDconsult30-Jun-08 6:07 
AnswerRe: Need clarifications Pin
Sergey Morenko30-Jun-08 8:01
professionalSergey Morenko30-Jun-08 8:01 
Hi

I mean that thread pools: Smart Thread Pool, BlackHen.Threading are hard coded and if I want to add different type of WorkQueue/TaskQueue or change thread communication logic I have to spend too many time for it.

IoC - Inversion of control

"The Unity Application Block (Unity) is a lightweight, extensible dependency injection container with support for constructor, property, and method call injection. It provides developers with the following advantages:

It provides simplified object creation, especially for hierarchical object structures and dependencies, which simplifies application code.
It supports abstraction of requirements; this allows developers to specify dependencies at run time or in configuration and simplify management of crosscutting concerns.
It increases flexibility by deferring component configuration to the container.
It has a service location capability; this allows clients to store or cache the container. This is especially useful in ASP.NET Web applications where the developers can persist the container in the ASP.NET session or application." (c) MSDN

On the ExtendedThreadPool you can change all and you should not drop source code

P.S. no I did not, I will, Thanks

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.