Click here to Skip to main content
15,867,453 members
Articles / Programming Languages / C#
Article

Work Queue based multi-threading

Rate me:
Please Sign up or sign in to vote.
4.96/5 (67 votes)
27 Nov 20046 min read 279.7K   8.7K   237   70
Allows an application to queue work that is performed concurrently to the main thread while maintaining exception processing.

Sample Image - WorkQueueThreading.jpg

Introduction

The use of multiple threads in an application is a proven method of keeping your application responsive to a user while making use of the CPU in between user events. The BlackHen.Threading class library allows an application to queue work that is performed concurrently to the main thread, typically the User Interface. It provides the following features:

  • A work queue that schedules work based on priority.
  • A limit on the number of concurrently running work.
  • A finite state machine that allows the monitoring of work.
  • Exception processing when work fails.
  • A thread pool that manages the creation and destruction of work threads.
  • Audit information on work.

Deliverables

I've included the threading class library (BlackHen.Threading.dll) along with a sample program, a unit testing library (BlackHen.Threading.Tests.dll), and class level documentation. The documentation is a compiled help file and was produced with NDoc. The unit testing library is to be used with the NUnit tool.

The sample program allows you to specify the number of threads to use and the concurrency limit (how much work can be done concurrently) in the bottom left pane. You can use the bottom right pane to start producing work. The top pane shows the number of work items in each state (the processing pipe line), it refreshes every 1/5th of a second. A progress bar appears at the bottom to show you the percentage of work completed. The sample work is a random sleep between 100 and 500 milliseconds. While work is being performed, you can still modify the settings and produce more work; since, this was the aim of the project!

Design

The major entities of this design are:

  • WorkItem - represents some work that must be performed.
  • WorkQueue - schedules the work.
  • Resource - performs the work.
  • ResourcePool - provides a pool of resources that can be used to perform work.

Many implementations of multithreading combine the WorkQueue and ResourcePool. By separating them, work items can be grouped into work queues and managed as a single entity. For example, a producer of work can add work items to its own work queue and then be informed when all its work has completed.

Interfaces are defined that provide the contract between the work queue and the other entities.

The IWork interface represents some work that can be performed. The IWorkItem interface allows the work to be manipulated by an IWorkQueue.

C#
public interface IWork
{
  void Perform();
}

public interface IWorkItem : IWork
{
   IWorkQueue WorkQueue {get; set;}
   WorkItemState State {get; set;}
   Exception FailedException {get; set;}
   ThreadPriority Priority {get; set;}
}
  • WorkQueue is the work queue that manages this work item.
  • Priority is the relative importance of this work item versus another.
  • State represents where the work item is in processing pipeline.
  • FailedException is the cause of the work item failure.
  • Perform does the actual work.

The IResourcePool interface provides a pool of resources that can be used to perform a work item.

C#
public interface IResourcePool
{
  void BeginWork(IWorkItem workItem);
}
  • BeginWork queues the work item for execution. When a resource in the pool becomes available, the work item State is set to Running and its Perform method is invoked.

The IWorkQueue interface schedules work items for execution.

C#
public interface IWorkQueue
{
  void WorkItemStateChanged (IWorkItem workItem, WorkItemState previousState);
  void HandleResourceException(ResourceExceptionEventArgs e);
}
  • WorkItemStateChanged is invoked by a work item to inform a work queue that its state has changed.
  • HandleResourceException is invoked by a resource pool when an exception is thrown outside of normal processing.

Work Item

The WorkItem class implements the IWorkItem interface. A work item can be considered as a finite state machine that describes the processing pipeline.

Work item state

As each state is entered, the time is captured for auditing purposes and its WorkQueue is informed of the state transition.

When a work item is added to the work queue, it enters either the Scheduled or Queued state depending upon the work load. Eventually, a resource (work thread) will get the work item and the work item enters the Running state. If an exception is thrown while performing the work item, the Failing state is entered.

A WorkItem is an abstract class. Derived classes must implement the Perform method, that does the actual work.

Work Queue

The WorkQueue class implements the IWorkQueue interface. It manages a queue of work items and is responsible for moving a work item through the processing pipeline.

The Add method is used to inject a WorkItem into the pipeline. If the concurrency limit is not reached, the work item is scheduled immediately. Otherwise it is placed in a queue and will be scheduled for execution when other work items complete.

C#
public void Add(IWorkItem workItem) {

      // Assign it to this queue.
      workItem.WorkQueue = this;

      // Can we schedule it for execution now?
      if (!pausing && runningItems < ConcurrentLimit)
      {
         workItem.State = WorkItemState.Scheduled;
      }
      else
      {
         // Add the workitem to queue.
         lock (this)
         {
            queue.Push(workItem);
            workItem.State = WorkItemState.Queued;
         }
      }
   }

The WorkItemStateChanged method, which is invoked by a WorkItem, is used to progress the work item through the processing pipeline. It raises the ChangedWorkItemState event. Then, the following actions are performed, based on the new State of the workItem:

StateAction
ScheduledAssign the workItem to the WorkPool; this will start its execution
RunningRaises the RunningWorkItem event.
FailingRaises the FailedWorkItem event.
Completed
  • Raises the CompletedWorkItem event and
  • Schedules the next workItem in the queue.
  • WorkThreadPool

    The WorkThreadPool class implements the IResource interface. It manages a pool of work threads, and has the responsibility of informing the WorkQueue of any exceptions encountered and when a work item is completed.

    Work is scheduled for execution by calling the BeginWork method. This method adds the work to a PriorityQueue, thanks BenDi.

    C#
    public void BeginWork(IWorkItem workItem)
    {
       if (workItem == null)
          throw new ArgumentNullException();
    
       lock (this)
       {
          // Queue the work.
          workQueue.Push(workItem);
    
          // If all workers are busy, then create a thread if the limit
          // is not reached.
          if (waiters == 0 && workers.Count < MaxThreads)
             CreateThread();
    
          // Wakeup a worker.
          Monitor.Pulse(this);
       }
    }

    The Monitor.Pulse method is used to wakeup a worker thread. See the next section to look at what a worker thread is doing. The Pulse method is defined as, according to MSDN:

    The thread that currently owns the lock on the specified object invokes this method to signal the next thread in line for the lock. Upon receiving the pulse, the waiting thread is moved to the ready queue. When the thread that invoked Pulse releases the lock, the next thread in the ready queue (which is not necessarily the thread that was pulsed) acquires the lock.

    Exception Handling

    Exceptions are a standard way of signaling an error; the method that detects an error throws and a caller method catches it. Exception handling employs the stack to specify which method catches an exception. The issue with a multithreaded application is that each thread has a separate stack, thus the calling method, on a different thread, never catches an exception thrown by a work thread!

    The class library specifies how exceptions are handled based on where (in the processing pipeline) it was thrown. It also defines the events an application can attach to detect the exception.

    • ProcessingWork - this is when the work is actually being performed, i.e., the IWorkItem.Perform method is active. The work item's State is set to Failing and its FailException property is set to the thrown exception. To detect this, use the WorkQueue.FailedWorkItem event.
    • PreProcessing and PostProcessing - a work item is assigned to the thread but not in IWorkItem.Perform. A ResourceExceptionEventArgs is created and is passed to the WorkQueue. To detect this, use the WorkQueue.WorkerException event.
    • Other - this is when the thread is trying to obtain a WorkItem. A ResourceExceptionEventArgs is created and is passed to the WorkThreadPool. To detect this, use the WorkThreadPool.ThreadException event.

    The following code illustrates how a worker thread implements the above scenarios:

    C#
    public void Start()
    {
        try
        {
           while (true)
           {
             IWorkItem work = null;
             lock (threadPool)
             {
               if (threadPool.workQueue.Count == 0)
               {
                 Monitor.Wait(threadPool);
               }
               work = (IWorkItem) threadPool.workQueue.Pop();
             }
             DoWork(work);
           }
        }
        catch(Exception e)
        {
          // This should not happen!!!
          threadPool.OnThreadException
            (new ResourceExceptionEventArgs(this, e));
        }
     }
    
     public void DoWork (IWorkItem workItem)
     {
        try
        {
           workItem.State = WorkItemState.Running;
           try
           {
              workItem.Perform();
           }
           catch (Exception e)
           {
              workItem.FailedException = e;
              workItem.State = WorkItemState.Failing;
           }
           workItem.State = WorkItemState.Completed;
        }
        catch (Exception e)
        {
           // If no work queue for the item, then let the WorkThreadPool raise
           // the exception event.
           if (workItem == null || workItem.WorkQueue == null)
              throw;
    
           workItem.WorkQueue.HandleResourceException
              (new ResourceExceptionEventArgs(this, e));
        }
     }
    

    Future Work

    The following enhancements are planned:

    1. An InvokedWorkItem class, derived from WorkItem. This will allow execution of a method in class not derived from WorkItem.
    2. A thread harvester that will kill work threads when they are no longer required.
    3. An installer that places the class library in GAC (Global Assembly Cache) and makes the help file available to Visual Studio.

    References

    1. Priority queue, by BenDi.
    2. Monitor class, by Microsoft.

    License

    This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

    A list of licenses authors might use can be found here


    Written By
    Web Developer
    New Zealand New Zealand
    I have been involved with computer engineering (both hardware and software) since 1975. During these almost 30 years, I have mainly been associated with start-up companies, except for a 3-year stint at Digital Equipment Corp. and 2 years at Telecom New Zealand Ltd. My positions have included Analyst, Software Engineer, R&D Manager and Director of Research and Development.

    Comments and Discussions

     
    QuestionFIFO schdeuling Pin
    Yoav Melamed8-Aug-16 11:30
    Yoav Melamed8-Aug-16 11:30 
    QuestionMaxThreads vs ConcurrentLimit Pin
    Heerser21-Jan-15 4:35
    Heerser21-Jan-15 4:35 
    Questionget output from thread Pin
    chai9332320-Oct-14 23:22
    chai9332320-Oct-14 23:22 
    BugProblem with ending of work ..... Pin
    AER Gene1-Jul-11 5:33
    AER Gene1-Jul-11 5:33 
    GeneralMy vote of 5 Pin
    zeltera8-Nov-10 5:59
    zeltera8-Nov-10 5:59 
    GeneralMy vote of 5 Pin
    zeltera26-Oct-10 3:31
    zeltera26-Oct-10 3:31 
    QuestionHow to stop all thread? Pin
    zhaojicheng20-Dec-09 5:44
    zhaojicheng20-Dec-09 5:44 
    Generalthank you Pin
    codeproject_ericyin6-Jul-09 23:19
    codeproject_ericyin6-Jul-09 23:19 
    Generalthanks a lot. Pin
    Bassam Alugili5-Dec-08 5:33
    Bassam Alugili5-Dec-08 5:33 
    thanks a lot.
    QuestionWaitAll Deadlock Pin
    AFSEKI26-Aug-08 6:49
    AFSEKI26-Aug-08 6:49 
    AnswerRe: WaitAll Deadlock Pin
    AFSEKI26-Aug-08 19:27
    AFSEKI26-Aug-08 19:27 
    GeneralRe: WaitAll Deadlock Pin
    albertly9-Feb-10 19:14
    albertly9-Feb-10 19:14 
    Questionweb farm issue / Database persistance? Pin
    pkellner24-Aug-08 10:03
    pkellner24-Aug-08 10:03 
    QuestionQueue Sequence for WorkItem with the same priority Pin
    Chris Voon10-Dec-07 22:16
    Chris Voon10-Dec-07 22:16 
    QuestionNeed Advice: Pass the end result for each completed task Pin
    bebangs13-Nov-07 22:02
    bebangs13-Nov-07 22:02 
    AnswerRe: Need Advice: Pass the end result for each completed task Pin
    bebangs14-Nov-07 20:52
    bebangs14-Nov-07 20:52 
    GeneralRe: Need Advice: Pass the end result for each completed task Pin
    Pravinraut21-Nov-07 22:06
    Pravinraut21-Nov-07 22:06 
    QuestionHow do i set and catch a timeout exception? Pin
    workingbird4-Jul-07 21:39
    workingbird4-Jul-07 21:39 
    Questionmemory overflow Pin
    mr. song14-Mar-07 22:24
    mr. song14-Mar-07 22:24 
    QuestionShall we add EndInvoke? Pin
    marcusts24-Jan-07 14:07
    marcusts24-Jan-07 14:07 
    GeneralAdding new Workitems to the queue Pin
    blackers26-Jan-06 13:53
    blackers26-Jan-06 13:53 
    Generalgratuitous praise and thanks Pin
    x0n8-Oct-05 17:49
    x0n8-Oct-05 17:49 
    GeneralRe: gratuitous praise and thanks Pin
    Keith-O28-Nov-06 10:22
    Keith-O28-Nov-06 10:22 
    GeneralThouing all the threads Pin
    keshavcn19-Sep-05 0:46
    keshavcn19-Sep-05 0:46 
    GeneralRe: Thouing all the threads Pin
    Richard Schneider19-Sep-05 20:13
    Richard Schneider19-Sep-05 20:13 

    General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

    Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.