Click here to Skip to main content
11,432,119 members (60,721 online)
Click here to Skip to main content

Worker Queue: Serialised Access to the ThreadPool

, 14 Jun 2004 CPOL
Rate this:
Please Sign up or sign in to vote.
An abstraction layer for applications to intercept access between the application and threadpool, to better manage processing upon it.

Introduction

The System.Threading.ThreadPool is an excellent way to process work asynchronously, it offers a simple fire and forget method to processing asynchronously. However, it offers no sequencing of the items to be processed on the thread pool.

Background

When placing items onto the ThreadPool, the only guarantee you have is that at some point in the near future [you hope], the thread pool will get round to processing your item by invoking your function through the WaitCallBack delegate. The following example illustrates how the order in which the items are processed has little to do with the order in which the items are placed on the queue.

        private void button1_Click(object sender, System.EventArgs e)
        {
            Console.WriteLine("Starting Task Generation");

            for(int iCount=0;iCount<100;iCount++)
            {
                PointlessTask oTask = new PointlessTask(iCount);
                ThreadPool.QueueUserWorkItem(new WaitCallback(oTask.DoWork));
            }

            Console.WriteLine("Completed Task Generation");
        }

        public class PointlessTask
        {
            private int m_TaskId;

            public PointlessTask(int TaskId)
            {
                m_TaskId=TaskId;
            }

            public void DoWork(object NotUsed)
            {
                Thread.Sleep(1000);
                Console.WriteLine(m_TaskId);
            }
        }

//RESULTS IN OUTPUT WINDOW

//        Starting Task Generation
//        Completed Task Generation
//        0
//        1
//        2
//        3
//        4
//        5
//        6
//        7
//        8
//        10
//        11
//        9
//        13
//        14
//        12
//        15
//        17
//        19
//        18

Running the above code sample shows how the thread pool queues up all the tasks on the pool and executes them in a semi sequential order. This article describes a technique to offer guaranteed serialized processing of items on the thread pool.

An additional problem which occurs with the above technique is that if you place many (25 * no. of processors) work items on the thread pool, so that it buffers items, and another code path places a work item, it will be queued, for the other work items to be processed. In terms of the thread pool, a layer of abstraction between the thread pool and the applications, various code paths would enable better load balancing between each of the application's code paths.

One possible solution for this is to create a specific thread and run the tasks upon, however, there is considerable burden on creating and managing threads yourself and also more code to write! It is more efficient to process asynchronously on the thread pool. This class has been designed to give developers a fire and forget technique to allow for serialized processing of items on the thread pool.

Objectives

Provide a reusable object that can easily be used to fire and forget work items in a serialized manner upon the thread pool.

Provide a simple mechanism to control the flow of work items to the thread pool, and balance thread pool usage between different code paths.

Provide a simple mechanism that doesn't flood the thread pool, so that other important tasks can operate on the thread pool in some sort of logical order.

Using the code

The solution is implemented as one abstract class BaseSerialisedWorkerQueue, an additional EventSerialisedWorkerQueue is provided for ease of use in implementation.

using System;
using System.Collections;
using System.Threading;

namespace _ThreadQueue
{
    /// <summary>
    /// Use this to invoke on the worker thread work that must be process 
    /// sequentially. The example below shows
    /// a simple example of the usage of this class.    
    /// </summary>
    /// <example>        
    public class WorkerQueue:BaseSerialisedWorkerQueue
    {
        public WorkerQueue()
        {

        }

        protected override void PerformWork(object WorkerItem)
        {
            Console.WriteLine(WorkerItem.ToString());
        }
    }
    ///    </example>
    public abstract class BaseSerialisedWorkerQueue
    {
        private WaitCallback  m_WaitCallback;

        private Queue m_Queue;
        private bool m_IsProcessing;

        public BaseSerialisedWorkerQueue()
        {
            m_IsProcessing = false;
            m_WaitCallback=new WaitCallback(PerformWork_Internal);
            m_Queue=new Queue();
        }

        public void Add(object WorkerItem)
        {
            m_Queue.Enqueue(WorkerItem);
            if (!m_IsProcessing)
                SpawnWork(null);
        }

        /// <summary>
        /// This is the method is used from both ADD and the CALLBACK once 
        /// the queued work has been processed If it is used from Add, then 
        /// null is used as end invoke isnt required, however, from methods 
        /// that do used a async result an end invoke is required.
        /// </summary>
        /// <PARAM name="AsyncResult"></PARAM>
        private void SpawnWork(IAsyncResult AsyncResult)
        {
            lock(this)
            {
                if(AsyncResult!=null)
                {
                    m_WaitCallback.EndInvoke(AsyncResult);
                }

                // oWorker item will be null if the queue was empty
                if(m_Queue.Count>0)
                {
                  m_IsProcessing = true;
                  //Gets the next piece of work to perform
                  object oWorkerItem = m_Queue.Dequeue();
                  //Hooks up the callback to this method.        
                  AsyncCallback oAsyncCallback = 
                          new AsyncCallback(this.SpawnWork);    
                  //Invokes the work on the threadpool
                  m_WaitCallback.BeginInvoke(oWorkerItem, 
                                     oAsyncCallback, null);
                }
                else
                {
                    m_IsProcessing = false;
                }
            }
        }

        /// <summary>
        /// This abstraction is here to ensure
        /// that any blown exceptions dont cause the 
        /// recursive call to dispatch the next message continue
        /// </summary>
        /// <PARAM name="WorkerItem"></PARAM>
        private void PerformWork_Internal(object WorkerItem)
        {
            try
            {
                //Calls the abstract method from safe try catch: Ensures the 
                //callback will always occur
                PerformWork(WorkerItem);
            }
            catch
            {

            }
        }

        protected abstract void PerformWork(object WorkerItem);
    }

    /// <summary>
    /// Used when you dont want to derived, but instead hookup to an event.
    /// </summary>
    public class EventSerialisedWorkerQueue:BaseSerialisedWorkerQueue
    {
        public delegate void ProcessWorkerItem(object WorkerItem);
        public event ProcessWorkerItem onProcessWorkerItem;

        public EventSerialisedWorkerQueue()
        {

        }

        protected override void PerformWork(object WorkerItem)
        {
            if(onProcessWorkerItem!=null)onProcessWorkerItem(WorkerItem);
        }    
    }
}

Points of Interest

The code flow:

  1. Queue new work item (Object) via the abstract class' BaseSerialisedWorkerQueue.Add(object).
  2. If the thread pool isn't already processing, m_WaitCallBack is called asynchronously on the thread pool. This, once executed by the thread pool, calls Process_Internal through the WaitCallBack delegate. To stop exceptions stopping the processing of subsequent items in the queue, Process_Internal has only a try{}catch{} block. This ensures that even if the processing of the work item fails and the exception isn't handled properly, the AsyncCallback will always be called and the next item in the queue processed.
  3. Once an item has been processed, AsyncCallback is called which executes the processing loop. A point of interest is that EndInvoke is called on the delegate, this stops an effective memory leak: see .NET Remoting - Events. Events? Events! by Dmitry_Belikov.

Using the code

To use the code, it is simple, there is a choice in implementation technique either via the event driven or the derived method. The usage of the queue will depend on the implementation chosen technique. The Form1.cs has 2 implementations of the class.

public class DerivedWorkerQueue:BaseSerialisedWorkerQueue
{
    protected override void PerformWork(object WorkerItem)
    {
        PointlessTask oPointlessTask = (PointlessTask)WorkerItem;
        oPointlessTask.DoWork(null);
    }    
}

public class EventWorkerQueue
{
    private EventSerialisedWorkerQueue m_ESWorkerQueue;

    public EventWorkerQueue()
    {
        m_ESWorkerQueue=new EventSerialisedWorkerQueue();
        m_ESWorkerQueue.onProcessWorkerItem+=
        new 
         _ThreadQueue.EventSerialisedWorkerQueue.ProcessWorkerItem
         (m_EventSerialisedWorkerQueue_onProcessWorkerItem);
    }

    private void 
      m_EventSerialisedWorkerQueue_onProcessWorkerItem(object WorkerItem)
    {
        PointlessTask oPointlessTask = (PointlessTask)WorkerItem;
        oPointlessTask.DoWork(null);
    }

    public void Add(PointlessTask Task)
    {
        m_ESWorkerQueue.Add(Task);
    }
}

I hope you have found this article useful, I am by no means a threading expert. In fact, prior to .NET, I had not been able to explore the powers of threading. However, this class has been in use in a production system for over a year now, and seems to do its job well. But I would love to know if there was a better way, or if there is something wrong with this style of solution. This is my first article, and I am keen to know what you think of it. Please, can you take a second to rate it?

History

  • 10 June 2004: Version 1.0

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

jburrow
Software Developer
United Kingdom United Kingdom
No Biography provided

Comments and Discussions

 
GeneralMy vote of 5 Pin
Member 39608118-Feb-11 23:53
memberMember 39608118-Feb-11 23:53 
GeneralNot sure the ThreadPool doesn't execute threads in added order Pin
Jean-Pierre_14-May-07 12:15
memberJean-Pierre_14-May-07 12:15 
GeneralRe: Not sure the ThreadPool doesn't execute threads in added order Pin
Jean-Pierre_14-May-07 12:25
memberJean-Pierre_14-May-07 12:25 
GeneralRe: Not sure the ThreadPool doesn't execute threads in added order Pin
Jean-Pierre_14-May-07 12:33
memberJean-Pierre_14-May-07 12:33 
AnswerRe: Not sure the ThreadPool doesn't execute threads in added order Pin
jburrow14-May-07 13:53
memberjburrow14-May-07 13:53 
GeneralRe: Not sure the ThreadPool doesn't execute threads in added order Pin
Jean-Pierre_14-May-07 21:25
memberJean-Pierre_14-May-07 21:25 
GeneralRe: Not sure the ThreadPool doesn't execute threads in added order Pin
jburrow16-May-07 8:10
memberjburrow16-May-07 8:10 
GeneralCongratulations Pin
X3m30-Aug-06 4:39
memberX3m30-Aug-06 4:39 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web04 | 2.8.150428.2 | Last Updated 15 Jun 2004
Article Copyright 2004 by jburrow
Everything else Copyright © CodeProject, 1999-2015
Layout: fixed | fluid