Click here to Skip to main content
Click here to Skip to main content

Work Queue based multi-threading

By , 27 Nov 2004
Rate this:
Please Sign up or sign in to vote.

Sample Image - WorkQueueThreading.jpg

Introduction

The use of multiple threads in an application is a proven method of keeping your application responsive to a user while making use of the CPU in between user events. The BlackHen.Threading class library allows an application to queue work that is performed concurrently to the main thread, typically the User Interface. It provides the following features:

  • A work queue that schedules work based on priority.
  • A limit on the number of concurrently running work.
  • A finite state machine that allows the monitoring of work.
  • Exception processing when work fails.
  • A thread pool that manages the creation and destruction of work threads.
  • Audit information on work.

Deliverables

I've included the threading class library (BlackHen.Threading.dll) along with a sample program, a unit testing library (BlackHen.Threading.Tests.dll), and class level documentation. The documentation is a compiled help file and was produced with NDoc. The unit testing library is to be used with the NUnit tool.

The sample program allows you to specify the number of threads to use and the concurrency limit (how much work can be done concurrently) in the bottom left pane. You can use the bottom right pane to start producing work. The top pane shows the number of work items in each state (the processing pipe line), it refreshes every 1/5th of a second. A progress bar appears at the bottom to show you the percentage of work completed. The sample work is a random sleep between 100 and 500 milliseconds. While work is being performed, you can still modify the settings and produce more work; since, this was the aim of the project!

Design

The major entities of this design are:

  • WorkItem - represents some work that must be performed.
  • WorkQueue - schedules the work.
  • Resource - performs the work.
  • ResourcePool - provides a pool of resources that can be used to perform work.

Many implementations of multithreading combine the WorkQueue and ResourcePool. By separating them, work items can be grouped into work queues and managed as a single entity. For example, a producer of work can add work items to its own work queue and then be informed when all its work has completed.

Interfaces are defined that provide the contract between the work queue and the other entities.

The IWork interface represents some work that can be performed. The IWorkItem interface allows the work to be manipulated by an IWorkQueue.

   public interface IWork
   {
     void Perform();
   }

   public interface IWorkItem : IWork 
   { 
      IWorkQueue WorkQueue {get; set;} 
      WorkItemState State {get; set;} 
      Exception FailedException {get; set;}
      ThreadPriority Priority {get; set;}
   }
  • WorkQueue is the work queue that manages this work item.
  • Priority is the relative importance of this work item versus another.
  • State represents where the work item is in processing pipeline.
  • FailedException is the cause of the work item failure.
  • Perform does the actual work.

The IResourcePool interface provides a pool of resources that can be used to perform a work item.

   public interface IResourcePool
   {
     void BeginWork(IWorkItem workItem);
   }
  • BeginWork queues the work item for execution. When a resource in the pool becomes available, the work item State is set to Running and its Perform method is invoked.

The IWorkQueue interface schedules work items for execution.

   public interface IWorkQueue 
   { 
     void WorkItemStateChanged (IWorkItem workItem, WorkItemState previousState);
     void HandleResourceException(ResourceExceptionEventArgs e);
   }
  • WorkItemStateChanged is invoked by a work item to inform a work queue that its state has changed.
  • HandleResourceException is invoked by a resource pool when an exception is thrown outside of normal processing.

Work Item

The WorkItem class implements the IWorkItem interface. A work item can be considered as a finite state machine that describes the processing pipeline.

Work item state

As each state is entered, the time is captured for auditing purposes and its WorkQueue is informed of the state transition.

When a work item is added to the work queue, it enters either the Scheduled or Queued state depending upon the work load. Eventually, a resource (work thread) will get the work item and the work item enters the Running state. If an exception is thrown while performing the work item, the Failing state is entered.

A WorkItem is an abstract class. Derived classes must implement the Perform method, that does the actual work.

Work Queue

The WorkQueue class implements the IWorkQueue interface. It manages a queue of work items and is responsible for moving a work item through the processing pipeline.

The Add method is used to inject a WorkItem into the pipeline. If the concurrency limit is not reached, the work item is scheduled immediately. Otherwise it is placed in a queue and will be scheduled for execution when other work items complete.

 public void Add(IWorkItem workItem) {

       // Assign it to this queue.
       workItem.WorkQueue = this;

       // Can we schedule it for execution now?
       if (!pausing && runningItems < ConcurrentLimit)
       {
          workItem.State = WorkItemState.Scheduled;
       }
       else
       {
          // Add the workitem to queue.
          lock (this)
          {
             queue.Push(workItem);
             workItem.State = WorkItemState.Queued;
          }
       }
    }

The WorkItemStateChanged method, which is invoked by a WorkItem, is used to progress the work item through the processing pipeline. It raises the ChangedWorkItemState event. Then, the following actions are performed, based on the new State of the workItem:

State Action
Scheduled Assign the workItem to the WorkPool; this will start its execution
Running Raises the RunningWorkItem event.
Failing Raises the FailedWorkItem event.
Completed
  • Raises the CompletedWorkItem event and
  • Schedules the next workItem in the queue.
  • WorkThreadPool

    The WorkThreadPool class implements the IResource interface. It manages a pool of work threads, and has the responsibility of informing the WorkQueue of any exceptions encountered and when a work item is completed.

    Work is scheduled for execution by calling the BeginWork method. This method adds the work to a PriorityQueue, thanks BenDi.

    public void BeginWork(IWorkItem workItem)
    {
       if (workItem == null)
          throw new ArgumentNullException();
    
       lock (this)
       {
          // Queue the work.
          workQueue.Push(workItem);
    
          // If all workers are busy, then create a thread if the limit
          // is not reached.
          if (waiters == 0 && workers.Count < MaxThreads)
             CreateThread();
    
          // Wakeup a worker.
          Monitor.Pulse(this);
       }
    }

    The Monitor.Pulse method is used to wakeup a worker thread. See the next section to look at what a worker thread is doing. The Pulse method is defined as, according to MSDN:

    The thread that currently owns the lock on the specified object invokes this method to signal the next thread in line for the lock. Upon receiving the pulse, the waiting thread is moved to the ready queue. When the thread that invoked Pulse releases the lock, the next thread in the ready queue (which is not necessarily the thread that was pulsed) acquires the lock.

    Exception Handling

    Exceptions are a standard way of signaling an error; the method that detects an error throws and a caller method catches it. Exception handling employs the stack to specify which method catches an exception. The issue with a multithreaded application is that each thread has a separate stack, thus the calling method, on a different thread, never catches an exception thrown by a work thread!

    The class library specifies how exceptions are handled based on where (in the processing pipeline) it was thrown. It also defines the events an application can attach to detect the exception.

    • ProcessingWork - this is when the work is actually being performed, i.e., the IWorkItem.Perform method is active. The work item's State is set to Failing and its FailException property is set to the thrown exception. To detect this, use the WorkQueue.FailedWorkItem event.
    • PreProcessing and PostProcessing - a work item is assigned to the thread but not in IWorkItem.Perform. A ResourceExceptionEventArgs is created and is passed to the WorkQueue. To detect this, use the WorkQueue.WorkerException event.
    • Other - this is when the thread is trying to obtain a WorkItem. A ResourceExceptionEventArgs is created and is passed to the WorkThreadPool. To detect this, use the WorkThreadPool.ThreadException event.

    The following code illustrates how a worker thread implements the above scenarios:

      public void Start()
      {
          try
          { 
             while (true)
             {
               IWorkItem work = null; 
               lock (threadPool)
               {
                 if (threadPool.workQueue.Count == 0)
                 {
                   Monitor.Wait(threadPool); 
                 } 
                 work = (IWorkItem) threadPool.workQueue.Pop();
               }
               DoWork(work);
             } 
          }
          catch(Exception e)
          {
            // This should not happen!!! 
            threadPool.OnThreadException
              (new ResourceExceptionEventArgs(this, e));
          }
       }
    
       public void DoWork (IWorkItem workItem)
       {
          try
          {
             workItem.State = WorkItemState.Running;
             try
             {
                workItem.Perform();
             }
             catch (Exception e)
             {
                workItem.FailedException = e;
                workItem.State = WorkItemState.Failing;
             }
             workItem.State = WorkItemState.Completed;
          }
          catch (Exception e)
          {
             // If no work queue for the item, then let the WorkThreadPool raise
             // the exception event.
             if (workItem == null || workItem.WorkQueue == null)
                throw;
    
             workItem.WorkQueue.HandleResourceException
                (new ResourceExceptionEventArgs(this, e));
          }
       }

    Future Work

    The following enhancements are planned:

    1. An InvokedWorkItem class, derived from WorkItem. This will allow execution of a method in class not derived from WorkItem.
    2. A thread harvester that will kill work threads when they are no longer required.
    3. An installer that places the class library in GAC (Global Assembly Cache) and makes the help file available to Visual Studio.

    References

    1. Priority queue, by BenDi.
    2. Monitor class, by Microsoft.

    License

    This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

    A list of licenses authors might use can be found here

    About the Author

    Richard Schneider
    Web Developer
    New Zealand New Zealand
    I have been involved with computer engineering (both hardware and software) since 1975. During these almost 30 years, I have mainly been associated with start-up companies, except for a 3-year stint at Digital Equipment Corp. and 2 years at Telecom New Zealand Ltd. My positions have included Analyst, Software Engineer, R&D Manager and Director of Research and Development.

    Comments and Discussions

     
    BugProblem with ending of work ..... PinmemberAER Gene1-Jul-11 5:33 
    GeneralMy vote of 5 Pinmemberzeltera8-Nov-10 5:59 
    GeneralMy vote of 5 Pinmemberzeltera26-Oct-10 3:31 
    QuestionHow to stop all thread? Pinmemberzhaojicheng20-Dec-09 5:44 
    Generalthank you Pinmembercodeproject_ericyin6-Jul-09 23:19 
    Generalthanks a lot. PinmemberBassam Alugili5-Dec-08 5:33 
    QuestionWaitAll Deadlock PinmemberAFSEKI26-Aug-08 6:49 
    AnswerRe: WaitAll Deadlock PinmemberAFSEKI26-Aug-08 19:27 
    GeneralRe: WaitAll Deadlock Pinmemberalbertly9-Feb-10 19:14 
    I also encounter with the deadlock, i re-implement this project based on .net framework 3.5, but when i click the "Produce work items", the application hangs. however, i dont confirm when and where cause this deadlock. who can give me some advice or opinions regarding how to detect the deadlock.
     
    Another question, when i run this application step by step , it runs well. i also don't know the reason.
    who can help me? thanks in advance.
    Questionweb farm issue / Database persistance? Pinmemberpkellner24-Aug-08 10:03 

    General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

    Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

    | Advertise | Privacy | Mobile
    Web03 | 2.8.140415.2 | Last Updated 27 Nov 2004
    Article Copyright 2004 by Richard Schneider
    Everything else Copyright © CodeProject, 1999-2014
    Terms of Use
    Layout: fixed | fluid