Click here to Skip to main content
Click here to Skip to main content
Go to top

MSMQ Backed FIFO Queue (C# .NET)

, 20 May 2011
Rate this:
Please Sign up or sign in to vote.
A standard in-memory generic .NET queue that, when the configurable overflow limit is reached, will start using MSMQ as a backend to mitigate the issues with memory usage.

Having a requirement for a queue that is both high-performance when needed and can handle a large amount of entries without blowing my system's memory out of the water, I created the following utility queue.

In short, it is a standard in-memory generic .NET queue that, when the configurable overflow limit is reached, will start using MSMQ as a backend to mitigate the issues with memory usage. When used in a low volume scenario, the queue will act like a normal generic queue.

Any questions, comments, feel free to email me.

Also, see my article on CodeProject here: http://www.codeproject.com/KB/recipes/OverflowQueue.aspx.

public class OverflowQueue<T> where T : class
{
     Queue<T> internalQueue = new Queue<T>();

     BinaryMessageFormatter messageFormatter = new BinaryMessageFormatter();

     internal static int compactOverflowQueue = 
       AppConfigHelper.GetValue("CompactOverflowQueueAfterMessagesDequeued", 10000);
     internal static int maxInternalQueueSize = 
       AppConfigHelper.GetValue("MaxInternalOverflowQueueSize", 10000);

     QueuePath queuePath;

     MessageQueue overflowMSMQ;

     long currentQueueSize;
     long currentMSMQSize;

     bool useBackgroundMsmqPull;
     Thread backgroundMsmqPullThread;

     volatile bool pushingToMSMQ;

     object msmqPullLock = new object();
     object msmqLock = new object();

     int itemDequeued;

     public long CurrentMemoryQueueSize
     {
          get
          {
               return currentQueueSize;
          }
     }

     public long CurrentMSMQSize
     {
          get
          {
               return Interlocked.Read(ref currentMSMQSize);
          }
     }

     public long TotalCount
     {
          get
          {
               return Interlocked.Read(ref currentMSMQSize) + 
                 Interlocked.Read(ref currentQueueSize);
          }
     }

     public OverflowQueue(string queueName, bool purgeQueue, bool useBackgroundMsmqPull)
     {
          queuePath = new QueuePath(queueName, Environment.MachineName, false);
          overflowMSMQ = QueueCreator.CreateAndReturnQueue(queuePath);
          overflowMSMQ.Formatter = new BinaryMessageFormatter();

          if (purgeQueue)
               overflowMSMQ.Purge();

          this.useBackgroundMsmqPull = useBackgroundMsmqPull;

          if (useBackgroundMsmqPull)
          {
               // start a thread that pulls msmq messages back into the queue 
               backgroundMsmqPullThread = new Thread(BackgroundMsmqPull);
               backgroundMsmqPullThread.IsBackground = true;
               backgroundMsmqPullThread.Name = "BackgroundMsmqPullThread";
               backgroundMsmqPullThread.Start();
          }
     }

     void BackgroundMsmqPull()
     {
          while (true)
          {
               PullFromMSMQ();
               Thread.Sleep(1000);
          }
     }

     public void Enqueue(T item)
     {
          if (!pushingToMSMQ)
          {
               if (currentQueueSize >= maxInternalQueueSize)
               {
                    // We've busted the queue size... 
                    // start pushing to MSMQ until the current
                    // queue zeros out,
                    // then pop all entries (up to max) back into memory.
                    lock (msmqLock)
                    {
                         pushingToMSMQ = true;
                         PushToMSMQ(item);
                    }
               }
               else
               {
                    // We're still pushing into the memory queue
                    PushToMemoryQueue(item);
               }
          }
          else
          {
               // This lock looks like this (split if) because
               // I don't wnat to hold onto the lock for a push to the memory queue
               lock (msmqLock)
               {
                    if (pushingToMSMQ) // verify we're still pushing to MSMQ
                    {
                         PushToMSMQ(item);
                         return; // Skip the push to memory queue
                    }
               }

               // This will only get hit if we aren't still pushing to MSMQ
               PushToMemoryQueue(item);
          }
     }

     void PushToMemoryQueue(T item)
     {
          lock (internalQueue)
          internalQueue.Enqueue(item);

          Interlocked.Increment(ref currentQueueSize);
     }

     void PushToMSMQ(T item)
     {
          Message message = new Message(item, messageFormatter);
          overflowMSMQ.Send(message);
          Interlocked.Increment(ref currentMSMQSize);
     }

     public T Dequeue()
     {
          try
          {
               if (!useBackgroundMsmqPull)
               {
                    PullFromMSMQ();
               }
               else
               {
                    // This is here because if the background pull
                    // is on and a user tries to dequeue too quickly, 
                    // they can run up against an empty memory queue,
                    // but there could still be something in the MSMQ.
                    // So, we need to double check for them ;)

                    if (Interlocked.Read(ref currentQueueSize) == 0 
                         && Interlocked.Read(ref currentMSMQSize) > 0)
                    PullFromMSMQ();
               }

               T item = null;

               if (Interlocked.Read(ref currentQueueSize) > 0)
               {
                    lock(internalQueue)
                    item = internalQueue.Dequeue();
               }

               if (item != null)
               {
                    Interlocked.Increment(ref itemDequeued);
                    Interlocked.Decrement(ref currentQueueSize);

                    return item;
               }
               else
               {
                    throw new Exception("Nothing to dequeue!");
               }
          }
          finally
          {
               if (itemDequeued >= compactOverflowQueue)
               {
                    lock (internalQueue)
                    {                      
                         if (itemDequeued >= compactOverflowQueue)
                         {
                              // Compact the internal queue to save space
                              internalQueue.TrimExcess();
                              itemDequeued = 0;
                         }
                    }
               }
          }
     }

     void PullFromMSMQ()
     {
          // We've been putting all new messages into MSMQ...
          // now is the time to get them out and put them back into memory.
          while (Interlocked.Read(ref currentMSMQSize) > 0 && 
                 Interlocked.Read(ref currentQueueSize) < maxInternalQueueSize)
                 // currentQueueSize should be low here
          {
               Message message = overflowMSMQ.Receive();

               // decrement the MSMQ size
               Interlocked.Decrement(ref currentMSMQSize);

               T item = message.Body as T;
               PushToMemoryQueue(item);
          }
          if (Interlocked.Read(ref currentMSMQSize) <= 0)
          {
               lock (msmqLock)
               // lock around this to prevent the count
               // from being wrong when turning off msmq pushing
               {
                    if (Interlocked.Read(ref currentMSMQSize) <= 0)
                         pushingToMSMQ = false;
               }
          }
     }
}

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Aron Weiler
Technical Lead CareFusion
United States United States
I just looooove software.
 
Check out my technical blog here: The Fyslexic Duck. You can find most of what I've put on CodeProject there, plus some additional technical articles.

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Mobile
Web02 | 2.8.140926.1 | Last Updated 20 May 2011
Article Copyright 2011 by Aron Weiler
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid