Click here to Skip to main content
Click here to Skip to main content

Implementing Message Queue Using Counting Semaphores

, 26 Jun 2009
Rate this:
Please Sign up or sign in to vote.
Implementing message queue in C Sharp
Article_src

Introduction

Message Queuing is an effective and efficient technique for inter process or even inter thread communication. Microsoft’s MSMQ is a very good implementation of Message Queuing technology. However, MSMQ is obviously not the desired choice when you need a simple Message Queue data structure that will be used by multiple threads within single application. Also, MSMQ is very slow for real-time distributed applications for e.g. a socket reads UDP multicast messages that need to be processed in a different thread with as soon as possible. MSMQ can’t be the best choice here.

This article demonstrates the techniques for implementing a Message Queue data structure using counting semaphores.

Background

Some basic ideas of wait handle, local semaphore and threads are recommended to understand the code.

Using the Code

Source code of the MessageQueue class is shown below: 

using System;
using System.Collections.Generic;
using System.Threading;
public class MessageQueue<T> : IDisposable
{
    private readonly int _QUEUE_SIZE;
    // a semaphore object is used to notify enqueue happend signal to the dequeue
    // subrouting
    private Semaphore _semaphore;
    // an internal queue data structure holds the messages in a queue
    private Queue<T> _internalQueue;
    // a private object that is used to acquire a lock on it to ensure synchronous
    // execution of some operations in multithreaded environment
    private object _syncLock;
    /**************************************************************************
    * Construct the message queue object with the maximum size limit.         *
    * If no of messages in the queue meets the maximum size of the queue, any *
    * subsequent enqueue will be discareded. i.e. those message will be lost  *
    * until you dequeue any message i.e. provide a room for new message to    *
    * enter the queue.                                                        *
    **************************************************************************/
    public MessageQueue(int queueSize)
    {
        _syncLock = new object();
        _QUEUE_SIZE = queueSize;
        _internalQueue = new Queue<T>(_QUEUE_SIZE);
        _semaphore = new Semaphore(0, _QUEUE_SIZE);
    }
    /***********************************************************************
    * Reset the MessageQueue                                               *
    ***********************************************************************/
    public void Reset()
    {
        // instantiate the semaphore with initial count 0 i.e. the semaphore is
        // entirely woned and there is no room to enter
        _semaphore = new Semaphore(0, _QUEUE_SIZE);
        // clear all existing messages from the message queue
        _internalQueue.Clear();
    }
    /**********************************************************************
    * Enqueue message in to the Message Queue                             *
    **********************************************************************/
    public void EnqueueMessage(T message)
    {
        lock (_syncLock)
        {
            if (_semaphore != null && message != null)
            {
                try
                {
                    // try to provide a room in the semaphore so that DequeueMessage
                    // can enter into it
                    _semaphore.Release();
                    // now enqueue the message in to the internal queue data structure
                    _internalQueue.Enqueue(message);
                }
                catch { }
            }
        }
    }
    /*********************************************************************
    * Dequeue message from the Message Queue                             *
    *********************************************************************/
    public T DequeueMessage()
    {
        // try to acquire a room in the semaphore and sleep until the room is available
        _semaphore.WaitOne();
        // if any room could be acquired, proceed to next step. i.e. dequeue message from 
        // the internal queue and return it
        lock (_syncLock)
        {
            T message = _internalQueue.Dequeue();
            return message;
        }
    }
    
    /********************************************************************
    * Dispose the Message Queue object                                  *
    ********************************************************************/
    public void Dispose()
    {
        // if the semaphore is not null, close it and set it to null
        if (_semaphore != null)
        {
            _semaphore.Close();
            _semaphore = null;
        }
        // clear the items of the internal queue
        _internalQueue.Clear();
    }
}

The following program demonstrates the usage of this class:

using System;
using System.Threading;
namespace ConsoleApplication1 
{
    public class Program
    {        
        static int __receivedCount = 0;
        // instantiate the message queue object with maximum size limit 1000. i.e. in
        // any case there will not be more than 1000 items in the queue at a time        
        static MessageQueue<object> __messageQueue = new MessageQueue<object>(1000);
        public static void Main()
        {
            // fork a thread that dequeues messages from the message queue and
            // increments the __receivedCount
            Thread dequeueThread = new Thread(new ThreadStart(_doDequeue));
            dequeueThread.Start();
            
            // we will try to enqueue 10000000 messages in side the queue and will
            // excpect to received 100% of them in the dequeue thread.
            int messageCount = 10000000;
            for (int i = 0; i < messageCount; i++)
            {                
                __messageQueue.EnqueueMessage(new object());
                Console.SetCursorPosition(30, 10);
                Console.WriteLine((i + 1).ToString().PadLeft(10) + " of " + 
                    messageCount.ToString());
            }
            
            Console.WriteLine("Press any key...");
            Console.Read();
            int receivedPercent = (int)((__receivedCount / (double)messageCount) * 100);
            Console.WriteLine(
                "{0} out of {1} message received.\nReceived = {2}%\nLost = {3}%",
                __receivedCount, messageCount, receivedPercent, (100 - receivedPercent));
            Console.Read();
            // abort the dequeue thread
            try 
            {
               dequeueThread.Abort();
            }
            catch { }
            // dispose the message queue object
            __messageQueue.Dispose();
        }
        static void _doDequeue()
        {
            while (true)
            {
                object message = __messageQueue.DequeueMessage();
                __receivedCount++;
            }
        }
    }
}

Points of Interest

With this implementation of the MessageQueue, the internal queue size will never infinitely increase which could cause OutOfMemoryException. Additionally the using of semaphore will take the queue to sleeping state and will not consume any CPU cycles when there is no message available to be dequeued.

History

  • 26th June, 2009: Initial post

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

Bunty1983
Software Developer Rebaca Technologies
India India
No Biography provided

Comments and Discussions

 
General[My vote of 1] Another vote of 1 PinmemberPete Appleton29-Jun-09 22:57 
This code is terrible, IMO - I would not be happy finding this in a code review on my team!
 
1. As pointed out by others, DOCUMENT your code.
2. Stick to standard naming conventions ("_dequeueMessages" is not an appropriate function name)
3. Don't terminate threads with .Abort(); use a termination flag. Calling .Abort() can result in the worker thread being killed whilst it's processing the message, not just during a semaphore wait.
4. Use "standard" signatures for events (ie, use an EventArgs-derived class). No, I don't like it either, but standard/vendor mandated practice shouldn't be ignored without a very good reason
5. Check that delegates are assigned before calling them. This code will crash if the Start() function is called without an event handler being assigned.
 
--
What's a signature?

GeneralRe: [My vote of 1] Another vote of 1 PinmemberBunty 19837-Jul-09 0:08 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web04 | 2.8.140721.1 | Last Updated 26 Jun 2009
Article Copyright 2009 by Bunty1983
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid