
Introduction
The use of multiple threads in an application is a proven method of keeping your application responsive to a user while making use of the CPU in between user events. The BlackHen.Threading
class library allows an application to queue work that is performed concurrently to the main thread, typically the User Interface. It provides the following features:
- A work queue that schedules work based on priority.
- A limit on the number of concurrently running work.
- A finite state machine that allows the monitoring of work.
- Exception processing when work fails.
- A thread pool that manages the creation and destruction of work threads.
- Audit information on work.
Deliverables
I've included the threading class library (BlackHen.Threading.dll) along with a sample program, a unit testing library (BlackHen.Threading.Tests.dll), and class level documentation. The documentation is a compiled help file and was produced with NDoc. The unit testing library is to be used with the NUnit tool.
The sample program allows you to specify the number of threads to use and the concurrency limit (how much work can be done concurrently) in the bottom left pane. You can use the bottom right pane to start producing work. The top pane shows the number of work items in each state (the processing pipe line), it refreshes every 1/5th of a second. A progress bar appears at the bottom to show you the percentage of work completed. The sample work is a random sleep between 100 and 500 milliseconds. While work is being performed, you can still modify the settings and produce more work; since, this was the aim of the project!
Design
The major entities of this design are:
WorkItem
- represents some work that must be performed.
WorkQueue
- schedules the work.
Resource
- performs the work.
ResourcePool
- provides a pool of resources that can be used to perform work.
Many implementations of multithreading combine the WorkQueue
and ResourcePool
. By separating them, work items can be grouped into work queues and managed as a single entity. For example, a producer of work can add work items to its own work queue and then be informed when all its work has completed.
Interfaces are defined that provide the contract between the work queue and the other entities.
The IWork
interface represents some work that can be performed. The IWorkItem
interface allows the work to be manipulated by an IWorkQueue
.
public interface IWork
{
void Perform();
}
public interface IWorkItem : IWork
{
IWorkQueue WorkQueue {get; set;}
WorkItemState State {get; set;}
Exception FailedException {get; set;}
ThreadPriority Priority {get; set;}
}
WorkQueue
is the work queue that manages this work item.
Priority
is the relative importance of this work item versus another.
State
represents where the work item is in processing pipeline.
FailedException
is the cause of the work item failure.
Perform
does the actual work.
The IResourcePool
interface provides a pool of resources that can be used to perform a work item.
public interface IResourcePool
{
void BeginWork(IWorkItem workItem);
}
BeginWork
queues the work item for execution. When a resource in the pool becomes available, the work item State
is set to Running and its Perform
method is invoked.
The IWorkQueue
interface schedules work items for execution.
public interface IWorkQueue
{
void WorkItemStateChanged (IWorkItem workItem, WorkItemState previousState);
void HandleResourceException(ResourceExceptionEventArgs e);
}
WorkItemStateChanged
is invoked by a work item to inform a work queue that its state has changed.
HandleResourceException
is invoked by a resource pool when an exception is thrown outside of normal processing.
Work Item
The WorkItem
class implements the IWorkItem
interface. A work item can be considered as a finite state machine that describes the processing pipeline.

As each state is entered, the time is captured for auditing purposes and its WorkQueue
is informed of the state transition.
When a work item is added to the work queue, it enters either the Scheduled or Queued state depending upon the work load. Eventually, a resource (work thread) will get the work item and the work item enters the Running state. If an exception is thrown while performing the work item, the Failing state is entered.
A WorkItem
is an abstract class. Derived classes must implement the Perform
method, that does the actual work.
Work Queue
The WorkQueue
class implements the IWorkQueue
interface. It manages a queue of work items and is responsible for moving a work item through the processing pipeline.
The Add
method is used to inject a WorkItem
into the pipeline. If the concurrency limit is not reached, the work item is scheduled immediately. Otherwise it is placed in a queue and will be scheduled for execution when other work items complete.
public void Add(IWorkItem workItem) {
workItem.WorkQueue = this;
if (!pausing && runningItems < ConcurrentLimit)
{
workItem.State = WorkItemState.Scheduled;
}
else
{
lock (this)
{
queue.Push(workItem);
workItem.State = WorkItemState.Queued;
}
}
}
The WorkItemStateChanged
method, which is invoked by a WorkItem
, is used to progress the work item through the processing pipeline. It raises the ChangedWorkItemState
event. Then, the following actions are performed, based on the new State
of the workItem
:
State
| Action |
Scheduled |
Assign the workItem to the WorkPool ; this will start its execution |
Running |
Raises the RunningWorkItem event. |
Failing |
Raises the FailedWorkItem event. |
Completed |
Raises the CompletedWorkItem event and
Schedules the next workItem in the queue. |
WorkThreadPool
The WorkThreadPool
class implements the IResource
interface. It manages a pool of work threads, and has the responsibility of informing the WorkQueue
of any exceptions encountered and when a work item is completed.
Work is scheduled for execution by calling the BeginWork
method. This method adds the work to a PriorityQueue, thanks BenDi.
public void BeginWork(IWorkItem workItem)
{
if (workItem == null)
throw new ArgumentNullException();
lock (this)
{
workQueue.Push(workItem);
if (waiters == 0 && workers.Count < MaxThreads)
CreateThread();
Monitor.Pulse(this);
}
}
The Monitor.Pulse
method is used to wakeup a worker thread. See the next section to look at what a worker thread is doing. The Pulse
method is defined as, according to MSDN:
The thread that currently owns the lock on the specified object invokes this method to signal the next thread in line for the lock. Upon receiving the pulse, the waiting thread is moved to the ready queue. When the thread that invoked Pulse
releases the lock, the next thread in the ready queue (which is not necessarily the thread that was pulsed) acquires the lock.
Exception Handling
Exceptions are a standard way of signaling an error; the method that detects an error throw
s and a caller method catch
es it. Exception handling employs the stack to specify which method catches an exception. The issue with a multithreaded application is that each thread has a separate stack, thus the calling method, on a different thread, never catches an exception thrown by a work thread!
The class library specifies how exceptions are handled based on where (in the processing pipeline) it was thrown. It also defines the events an application can attach to detect the exception.
ProcessingWork
- this is when the work is actually being performed, i.e., the IWorkItem.Perform
method is active. The work item's State
is set to Failing
and its FailException
property is set to the thrown exception. To detect this, use the WorkQueue.FailedWorkItem
event.
PreProcessing
and PostProcessing
- a work item is assigned to the thread but not in IWorkItem.Perform
. A ResourceExceptionEventArgs
is created and is passed to the WorkQueue
. To detect this, use the WorkQueue.WorkerException
event.
Other
- this is when the thread is trying to obtain a WorkItem
. A ResourceExceptionEventArgs
is created and is passed to the WorkThreadPool
. To detect this, use the WorkThreadPool.ThreadException
event.
The following code illustrates how a worker thread implements the above scenarios:
public void Start()
{
try
{
while (true)
{
IWorkItem work = null;
lock (threadPool)
{
if (threadPool.workQueue.Count == 0)
{
Monitor.Wait(threadPool);
}
work = (IWorkItem) threadPool.workQueue.Pop();
}
DoWork(work);
}
}
catch(Exception e)
{
threadPool.OnThreadException
(new ResourceExceptionEventArgs(this, e));
}
}
public void DoWork (IWorkItem workItem)
{
try
{
workItem.State = WorkItemState.Running;
try
{
workItem.Perform();
}
catch (Exception e)
{
workItem.FailedException = e;
workItem.State = WorkItemState.Failing;
}
workItem.State = WorkItemState.Completed;
}
catch (Exception e)
{
if (workItem == null || workItem.WorkQueue == null)
throw;
workItem.WorkQueue.HandleResourceException
(new ResourceExceptionEventArgs(this, e));
}
}
Future Work
The following enhancements are planned:
- An
InvokedWorkItem
class, derived from WorkItem
. This will allow execution of a method in class not derived from WorkItem
.
- A thread harvester that will kill work threads when they are no longer required.
- An installer that places the class library in GAC (Global Assembly Cache) and makes the help file available to Visual Studio.
References
- Priority queue, by BenDi.
- Monitor class, by Microsoft.