Click here to Skip to main content
Click here to Skip to main content

Thread-safe priority queue in C#

, 5 Feb 2010
Rate this:
Please Sign up or sign in to vote.
This article describes a simple technique to implement functionality allowing the usage of prioritized queues within a multithreaded application.

Introduction

A few months ago, I had a project to develop an SMS notification gateway center capable of prioritizing incoming messages from a few sources and sending them to mobile operators. In the heart of this system, I decided to build my own component for queuing and prioritization. So, I started to search in the internet for some code to implement the required functionality - a prioritized queue.

Background

Because SMS can be used from highly capable and demanding systems like a cards authorization system, bank system etc., it has to be very fast, reliable, and simple. So, my approach was to KISS (keep it simple and stupid) as possible.

I mainly develop web applications, and there are no demands for multithreaded programming there. And so, this was my first significant attempt to create a real demanding multithreaded application.

The purpose of this queue will be to be the "mediator" between a few threads receiving incoming notifications and a few other threads responsible for sending queued messages to SMS operators. The big trouble was the limited number of SMSs per second which the mobile operators accept. So, I had to "throttle" SMSs in order to comply with the mobile operator requirements.

Sounds, pretty easy, but now another issue arises.. what will happen if a system sends 100,000 marketing SMS (spam) and I have a 20 SMS per second limit.. and a different system sends three SMSs that are urgent? That is where prioritization comes in the story.

A few days testing, writing, deleting, and questions.. and a class was born...

Using the code

As you can see, the the class includes and uses the standard Queue available in .NET, and for prioritization, I decided to use SortedDictionary.

private SortedDictionary<P, Queue<V>> list = 
          new SortedDictionary<P, Queue<V>>();

In simple worlds, I decided to use a sorted dictionary of standard queues. In the code below, list is the "shared" object containing data, and I have to make sure my class is thread safe. Normally, Queue and Dequeue are thread "unsafe" operations. So, I implemented a standard semaphore practice with:

Monitor.Pulse(list); 
Monitor.Wait(list);

and

lock (list) {}

I'm not going to describe how Monitor.Pulse, Wait.. or lock{} work.. Please use "Google" if you don't know what those do..

Let's see how the magic works in the Dequeue() method..

First of all, we will lock our object to ensure thread safety. Now, let's see how we are going to hang if there is no object within our SortedDictionary container.

while (list.Count() == 0 && !_FreeCurrentDequeue)
{
    Monitor.Wait(list);
}

If there are no items in the SortedDictionary, the program will stop and release the container with Monitor.Wait(list). This will be until somebody executes Monitor.Pulse(list).

This somebody in our case will be the Enqueue() method, immediately after inserting some objects in our queue.

q.Enqueue(value);
Monitor.Pulse(list);

Once our Monitor.Wait(list) is released, we are going to extract from the queue. Of course, we should start extracting from the queue available in the dictionary in the first place (the higher priority). Every time you try to extract some object from the class, you should get a queue with the highest priority. For this purpose, we will find the first queue with the highest priority within the SortedDictionary:

pair = list.First();

Immediately after finding the queue, we are going to extract the object in this queue..

V v = pair.Value.Dequeue();

Of course, in order to be able to process queues with lower priority, we will remove empty queues from the SortedDictionary.

if (pair.Value.Count == 0)
// nothing left of the top priority.
    list.Remove(pair.Key);

This will ensure on the next execution of the Dequeue method that list.First() will return the next low priority queue.. simply because we've just removed the highest priority one. And so on and so forth..

But there is a catch that if some thread hangs on the Dequeue method, and if there are no items inside the queues, the thread will hang forever.. not very nice.. if you want, for example, to stop your application, you will have to terminate it a not very nice fashion...

So, I decided to add a global variable and a _FreeCurrentDequeue function. If you want to release Dequeue, you just have to call this method, and all the Dequeue methods will be released.

public V Dequeue()
{
    KeyValuePair<P, Queue<V>> pair;
    lock (list)
    {
        //set every tyme to false, if somebody where 
        //calling PulseAndFreeDequeue we shall 
        //recover normal status afer first succesfull exit from dequeue!
        _FreeCurrentDequeue = false;

        // If the queue is empty, wait for an item to be added
        // Note that this is a while loop, as we may be pulsed
        // but not wake up before another thread has come in and
        // consumed the newly added object. In that case, we'll
        // have to wait for another pulse.
        while (list.Count() == 0 && !_FreeCurrentDequeue)
        {
            // This releases listLock, only reacquiring it
            // after being woken up by a call to Pulse
            Monitor.Wait(list);
        }
        //Pay attention taht if _FreeCurrentDequeue
        //became true we will exit from while, 
        //and if queue is empty exception will be raised.
        if (_FreeCurrentDequeue) return default(V);
        pair = list.First();
        V v = pair.Value.Dequeue();
        if (pair.Value.Count == 0) // nothing left of the top priority.
            list.Remove(pair.Key);
        return v;
    }
}

and here is the releasing method...

public void PulseAndFreeDequeue()        { 
    lock (list) {
        _FreeCurrentDequeue = true;
        //pulse to release Monitor.Wait(list) 
        Monitor.Pulse(list);
    }
}

One thing you should care about is if you call PulseAndFreeDequeue(), an empty object will be returned, and this may cause exceptions in the extracting module.. so ... just keep that in mind.

And, here is the Enqueue method:

public void Enqueue(P priority, V value)
{
    lock (list)
    {
        Queue<V> q;
        if (!list.TryGetValue(priority, out q))
        {
            q = new Queue<V>();
            list.Add(priority, q);
        }
        q.Enqueue(value);
        Monitor.Pulse(list);
    }
}

That's it .. I hope you like my first article.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

wili98

Bulgaria Bulgaria
No Biography provided

Comments and Discussions

 
QuestionHow are you getting list.First() to compile? PinmemberSiliconDragon22-Sep-10 5:58 
AnswerRe: How are you getting list.First() to compile? PinmemberMr C P23-May-11 22:12 
GeneralMy vote of 1 Pinmemberozbear8-Feb-10 12:18 
Generalshutdown PinmentorNicholas Butler6-Feb-10 1:14 
GeneralRe: shutdown Pinmemberwili9810-Feb-10 4:55 
GeneralNeed more info Pinmemberatarikg4-Feb-10 17:22 
GeneralRe: Need more info Pinmemberwili985-Feb-10 8:15 
GeneralRe: Need more info PinmvpJosh Fischer5-Feb-10 9:06 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web01 | 2.8.140814.1 | Last Updated 5 Feb 2010
Article Copyright 2010 by wili98
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid