MSMQ Backed FIFO Queue (C# .NET)





0/5 (0 vote)
A standard in-memory generic .NET queue that, when the configurable overflow limit is reached, will start using MSMQ as a backend to mitigate the issues with memory usage
Having a requirement for a queue that is both high-performance when needed and can handle a large amount of entries without blowing my system's memory out of the water, I created the following utility queue.
In short, it is a standard in-memory generic .NET queue that, when the configurable overflow limit is reached, will start using MSMQ as a backend to mitigate the issues with memory usage. When used in a low volume scenario, the queue will act like a normal generic queue.
If you have any questions or comments, feel free to email me.
Also, see my article on CodeProject here.
public class OverflowQueue<T> where T : class
{
Queue<T> internalQueue = new Queue<T>();
BinaryMessageFormatter messageFormatter = new BinaryMessageFormatter();
internal static int compactOverflowQueue =
AppConfigHelper.GetValue("CompactOverflowQueueAfterMessagesDequeued", 10000);
internal static int maxInternalQueueSize =
AppConfigHelper.GetValue("MaxInternalOverflowQueueSize", 10000);
QueuePath queuePath;
MessageQueue overflowMSMQ;
long currentQueueSize;
long currentMSMQSize;
bool useBackgroundMsmqPull;
Thread backgroundMsmqPullThread;
volatile bool pushingToMSMQ;
object msmqPullLock = new object();
object msmqLock = new object();
int itemDequeued;
public long CurrentMemoryQueueSize
{
get
{
return currentQueueSize;
}
}
public long CurrentMSMQSize
{
get
{
return Interlocked.Read(ref currentMSMQSize);
}
}
public long TotalCount
{
get
{
return Interlocked.Read(ref currentMSMQSize) +
Interlocked.Read(ref currentQueueSize);
}
}
public OverflowQueue(string queueName, bool purgeQueue, bool useBackgroundMsmqPull)
{
queuePath = new QueuePath(queueName, Environment.MachineName, false);
overflowMSMQ = QueueCreator.CreateAndReturnQueue(queuePath);
overflowMSMQ.Formatter = new BinaryMessageFormatter();
if (purgeQueue)
overflowMSMQ.Purge();
this.useBackgroundMsmqPull = useBackgroundMsmqPull;
if (useBackgroundMsmqPull)
{
// start a thread that pulls msmq messages back into the queue
backgroundMsmqPullThread = new Thread(BackgroundMsmqPull);
backgroundMsmqPullThread.IsBackground = true;
backgroundMsmqPullThread.Name = "BackgroundMsmqPullThread";
backgroundMsmqPullThread.Start();
}
}
void BackgroundMsmqPull()
{
while (true)
{
PullFromMSMQ();
Thread.Sleep(1000);
}
}
public void Enqueue(T item)
{
if (!pushingToMSMQ)
{
if (currentQueueSize >= maxInternalQueueSize)
{
// We've busted the queue size...
// start pushing to MSMQ until the current
// queue zeros out,
// then pop all entries (up to max) back into memory.
lock (msmqLock)
{
pushingToMSMQ = true;
PushToMSMQ(item);
}
}
else
{
// We're still pushing into the memory queue
PushToMemoryQueue(item);
}
}
else
{
// This lock looks like this (split if) because
// I don't wnat to hold onto the lock for a push to the memory queue
lock (msmqLock)
{
if (pushingToMSMQ) // verify we're still pushing to MSMQ
{
PushToMSMQ(item);
return; // Skip the push to memory queue
}
}
// This will only get hit if we aren't still pushing to MSMQ
PushToMemoryQueue(item);
}
}
void PushToMemoryQueue(T item)
{
lock (internalQueue)
internalQueue.Enqueue(item);
Interlocked.Increment(ref currentQueueSize);
}
void PushToMSMQ(T item)
{
Message message = new Message(item, messageFormatter);
overflowMSMQ.Send(message);
Interlocked.Increment(ref currentMSMQSize);
}
public T Dequeue()
{
try
{
if (!useBackgroundMsmqPull)
{
PullFromMSMQ();
}
else
{
// This is here because if the background pull
// is on and a user tries to dequeue too quickly,
// they can run up against an empty memory queue,
// but there could still be something in the MSMQ.
// So, we need to double check for them ;)
if (Interlocked.Read(ref currentQueueSize) == 0
&& Interlocked.Read(ref currentMSMQSize) > 0)
PullFromMSMQ();
}
T item = null;
if (Interlocked.Read(ref currentQueueSize) > 0)
{
lock(internalQueue)
item = internalQueue.Dequeue();
}
if (item != null)
{
Interlocked.Increment(ref itemDequeued);
Interlocked.Decrement(ref currentQueueSize);
return item;
}
else
{
throw new Exception("Nothing to dequeue!");
}
}
finally
{
if (itemDequeued >= compactOverflowQueue)
{
lock (internalQueue)
{
if (itemDequeued >= compactOverflowQueue)
{
// Compact the internal queue to save space
internalQueue.TrimExcess();
itemDequeued = 0;
}
}
}
}
}
void PullFromMSMQ()
{
// We've been putting all new messages into MSMQ...
// now is the time to get them out and put them back into memory.
while (Interlocked.Read(ref currentMSMQSize) > 0 &&
Interlocked.Read(ref currentQueueSize) < maxInternalQueueSize)
// currentQueueSize should be low here
{
Message message = overflowMSMQ.Receive();
// decrement the MSMQ size
Interlocked.Decrement(ref currentMSMQSize);
T item = message.Body as T;
PushToMemoryQueue(item);
}
if (Interlocked.Read(ref currentMSMQSize) <= 0)
{
lock (msmqLock)
// lock around this to prevent the count
// from being wrong when turning off msmq pushing
{
if (Interlocked.Read(ref currentMSMQSize) <= 0)
pushingToMSMQ = false;
}
}
}
}