Click here to Skip to main content
Click here to Skip to main content

A multi-thread queue for the producers/consumers process

, 15 Jan 2013 CPOL
Rate this:
Please Sign up or sign in to vote.
How to write a simple multi-thread queue for the typical producer-consumer process

Introduction

Multithreading is one of the very powerful techniques you can use to increase performance in your application, parallelize tasks and do asynchronous elaborations.

There are a lot of possibilities; one of the very common is the typical process of producers/consumers.  

In this article I wish to show a simple way to parallelize the consumers of a queue in memory. Using this simple approach you can create a pool of consumers and then feed them by the queue.

Background 

.Net provides a simple class for handling a queue structure. .NET also provides a ConcurrentQueue class, but for this sample I’ve used the basic generic Queue class. I don’t either use the Synchronized method because lock is handled inside my class in order to dispatch elements to the different threads. 

Using the code 

The library can be used inside every project. This first draft is composed of only two classes and one interface. The first class is the generic class for creating a multi-thread queue. Different threads can access this queue to write new elements and a pool of readers can be created to read new elements in parallel. The second one is the abstract class to create a reader thread. 

// Instance a multi threading queue for double values.
MultiThreadQueueLib.MultiThreadQueue<double> queue = new MultiThreadQueueLib.MultiThreadQueue<double>();     

Reading the values inserted in the queue is very simple. You have to create a new class inherited from ThreadDequeuer and define the method OnNewElement for this new class. This method is called out every time a new element is available on the queue.  

class ThreadDequeuer : MultiThreadQueueLib.MultiThreadDequeuer
{
    …
    public ThreadDequeuer(MultiThreadQueueLib.IMultiThreadQueue queue) 
        : base(queue)
    {
    }
    protected override void OnNewElement(object element)
    {
        …
    }
} 
To start the reader(s) you have to instance your dequeuer class and then call the method Start.

Points of interest 

The synchronization is done using the basic lock mechanism. Asynchronous access by the reader is realized by a manual event. When a new element is added in the queue, the event is set so every reader that is waiting for that event is released. After that, every reader tries to read. If an element is available, it’s processed. When reading elements queue size go down to 0, the event is reset.

This approach is simple and independent of the reader number. Also, it’s thought for cases in which write operation are very frequent reducing the synchronization mechanism to a minimum. It means that it’s not the best performance. Also, it works with framework 3.5 when framework 4.0 and 4.5 have introduced new mechanisms for thread synchronization.  

/// <summary> 
/// Enqueue a new element in the queue. If the queue is empty the manual resent event is setted in order to allow readers  
/// to process the incoming elements.
/// </summary> 
/// <param name="value">The value to insert in the queue.</param>
public void Enqueue(V value)
{
    lock (list)
    {
        list.Enqueue(value);
        if (list.Count == 1)
        {
            newElementEvent.Set();
        }
    }
}
/// <summary>
/// Check if the queue already contains the value.
/// </summary>
/// <param name="value">The value to check</param>
/// <returns>True if the queue contains the value, otherwise false.</returns>
public bool Contains(V value)
{
    lock (list)
    {
        return list.Contains(value);
    }
}
/// <summary>
/// Wait for new element and unqueue it. When the queue is empty the manual reset event is reset.
/// A timeout for the waiting could be defined.
/// </summary>
/// <param name="millisecondsTimeout">The timeout for waiting a new element.</param>
/// <returns>The new element to process. Null in case there's not a new element to process.</returns>
public object Unqueue(int millisecondsTimeout)
{
    bool flag = this.newElementEvent.WaitOne(millisecondsTimeout);
    if (!flag)
    {
        return null;
    }
    lock (list)
    {
        if (list.Count == 0)
        {
            return null;
        }
        V ret = list.Dequeue();
        if (list.Count == 0)
        {
            this.newElementEvent.Reset();
        }
        return ret;
    }
}
/// <summary>
/// Wait for new element and unqueue it. When the queue is empty the manual reset event is reset.
/// This method has no timeout.
/// </summary>
/// <returns>The new element to process. Null in case there's not a new element to process.</returns>
public object Unqueue()
{
    return this.Unqueue(-1);
}

So, the reader is always waiting for a new incoming element and when it gets the element, the OnNewElement method is called with this one.  

/// <summary>
/// Thread loop. Wait for a new incoming element and in case the thread gets the new element, the mehod OnNewElement is called.
/// </summary>
protected virtual void Run()
{
    while (!shutdown)
    {
        try
        {
            object newElement = this.sourceQueue.Unqueue();
            if (newElement != null)
            {
                OnNewElement(newElement);
            }
        }
        catch (ThreadInterruptedException thInt)
        {
        }
        catch (ThreadAbortException thAbort)
        {
            shutdown = true;
        }
        catch (Exception ex)
        {
            Console.Out.WriteLine(ex);
        }
    }
    Console.Out.WriteLine("Exit from Run. Thread is stopped.");
} 

 The attached solution contains the library source code and a simple test console application. 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Alessandro Lentini
Team Leader Mediatech Solutions
Italy Italy
I’m an IT Project Manager for an Italian Betting Company and over the last 2 years I acquired experience in Betting area.
I have developed code in different object oriented languages (C#, C++, Java) for more than 10 years using a set of technology such as .Net, J2EE, multithreading, etc…

Comments and Discussions

 
SuggestionHave you looked at new .NET 4 classes in System.Collections.Concurrent? PinmemberMatt T Heffron15-Jan-13 9:40 
GeneralRe: Have you looked at new .NET 4 classes in System.Collections.Concurrent? PinmemberAlessandro Lentini15-Jan-13 9:50 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Terms of Use | Mobile
Web01 | 2.8.141220.1 | Last Updated 15 Jan 2013
Article Copyright 2013 by Alessandro Lentini
Everything else Copyright © CodeProject, 1999-2014
Layout: fixed | fluid