Click here to Skip to main content
15,896,154 members
Articles / Programming Languages / C#

Throwing a Great Block

Rate me:
Please Sign up or sign in to vote.
4.87/5 (27 votes)
26 May 2015CPOL6 min read 22.7K   34   12
Utilizing the .NET BlockingCollection class

Last year, I was working on a cloud-hosted Windows service for a client that contained an application-specific logging implementation. The existing architecture had log entries posted at various process points, i.e., file discovery, pickup, dropoff, and download. The log code would post a message to the Microsoft Messaging Queueing service (MSMQ) and a separate database writer service would dequeue those messages and post them to a series of tables in SQL Server.

Lagging The Play

While this setup worked perfectly well, it had one minor issue - the queueing of a log message to MSMQ happened sequentially. That means that while the service was attempting to post a log message to the queue, all other file processing was temporarily suspended. Since posting a log message to MSMQ means you're performing an inter-process communication, there will be a noticeable lag imposed on the calling thread. Add to that the possibility that the MSMQ service could be located on another server and you've now imposed network lag time on the calling process as well. That's potentially alotta-lag! In the worst possible case, if MSMQ cannot be reached for some reason, file processing could be suspended for a very long time. For a platform that expects to be able to process thousands of messages a day, this was clearly not going to work as a long term solution. However, the client wanted to retain the use of MSMQ as a persistent message forwarding mechanism so that if the writer service was unavailable, the log messages would not end up getting lost.

Block For Me

It seemed clear that what was needed was some way for the service to save log messages internally for near-term posting to MSMQ in a way that would minimally impact file processing. What came to mind initially was to have an internal Queue object on which the service could store log messages that could be dequeued and posted to MSMQ by another thread. It's a classic Producer-Consumer pattern. While this is a threading implementation that is not of surpassing difficulty to implement it has some subtleties that make it non-trivial. First, all access to the Queue object has to be thread-safe. Second, the MSMQ posting thread needs to enter a low-CPU-load no-operation loop while it's waiting for a log message to be queued. Wouldn't it be nice if there was something built into the .NET Framework to do all this?

Well, sometimes Microsoft gets it right. In the .NET Framework 4 release, Microsoft added something called a Blocking Collection that does exactly what we needed. It allows for thread-safe Producer-Consumer patterns that do not consume CPU resources when there is nothing on the queue.

Here's an example of how to implement it in a simple console application.

First, we'll need a message class. In the service for the client, the log information message was more complex, but this should give you the general idea.

C#
namespace BlockingCollectionExample
{
    class MyMessage
    {
        public int MessageId { get; set; }
        public string Message { get; set; }

        public string ToString()
        {
            return string.Format("Message with ID {0:#,##0} and value {1}.", MessageId, Message);
        }

    }
}

The real "meat" of the operation is in the class that encapsulates the blocking collection. Here's the first portion of the class definition.

C#
using System.Collections.Concurrent;
using System.Threading;

namespace BlockingCollectionExample
{
    class MyQueue : IDisposable    
    {    
        private BlockingCollection<MyMessage> messageQueue;    
        private Thread dequeueThread;    

        bool stopped = false;    
        bool isStopping = false;    

        public MyQueue()    
        {    
            messageQueue = new BlockingCollection<MyMessage>(new ConcurrentQueue<MyMessage>());    
            dequeueThread = new Thread(new ThreadStart(zDequeueMessageThread));    
            dequeueThread.Name = "TransactionPostThread";    
            dequeueThread.Start();    
            stopped = false;    
        }    

        ~MyQueue()    
        {    
            Dispose(true);    
        }    
...

You'll notice that the class implements the IDisposable interface. This is so that the thread that dequeues the messages from the blocking collection can clean up after itself. This will be seen in another section of the code for this class.

You'll also notice that when the BlockingCollection is defined, we specify the class of objects that will be placed on the collection. However, when we instantiate the collection, we signify that it should use a ConcurrentQueue object as the backing data store for the blocking collection. This ensures that the items placed in the collection will be handled in a thread-safe manner on a first-in, first-out (FIFO) basis.

The finalizer method merely calls our Dispose method with a parameter indicating that this was called from the class' destructor, a common patterm for IDisposable implementations. The Dispose methods will be shown in their entirety later in this post.

C#
public void AddLog(MyMessage message)
{
    Console.WriteLine("Enqueueing: " + message.ToString());
    messageQueue.Add(message);
}

private void DequeueMessageThread()
{
    try
    {
        while (true)
        {
            MyMessage message = messageQueue.Take();
            Console.WriteLine("Dequeueing: " + message.ToString());

            if (messageQueue.IsCompleted)
            {
                break;
            }
        }
    }
    catch (InvalidOperationException)
    {
        // if invalid op it's because queue was completed
    }
    catch (ThreadAbortException)
    {
        // Thread aborted due to queue issue, ignore
    }
    catch (Exception)
    {
        throw;
    }
}
...

The AddLog method is very simple; it invokes the blocking collection's Add method to enqueue the message in a thread safe manner. The DequeueMessageThread method appears to be an endless loop that keeps attempting to dequeue a message, causing a CPU spike from the tight looping. But here's where the magic of the blocking collection comes into play. The Take method of the blocking collection will enter into a low-CPU wait state if nothing is found on the queue, blocking the loop from proceeding. As soon as a message is enqueued, the Take method will return from the wait state and the loop will proceed. Note that the Take method will also return immediately if the blocking collection has been closed down, indicating completion, hence the IsCompleted check right after the call.

The exception handler in the method captures two specific exceptions:

  1. The InvalidOperationException will be signaled if the blocking collection is stopped. We'll see this in the Dispose method.
  2. The ThreadAbortException will be signaled if the thread had to be killed because the Dispose method timed out waiting for the thread to finish.
C#
public void Dispose()
{
    Dispose(false);
}

private void Dispose(bool fromDestructor)
{
    isStopping = true;
    int logShutdownTimeout = 30000;

    Console.WriteLine("Shutting down queue. Waiting for dequeue thread completion.");

    // Signal queue that we're shutting down
    messageQueue.CompleteAdding();

    // Wait for thread to complete before exiting
    do
    {
        if (!dequeueThread.Join(logShutdownTimeout))
        {
            // Queue thread may be stuck. Check for items in queue and kill thread if empty

            if (messageQueue.Count == 0)
            {
                System.Diagnostics.Debug.Print("Aborting thread");
                dequeueThread.Abort();
                    break;
            }
        }
    } while (dequeueThread.IsAlive);

    Console.WriteLine("Dequeue thread complete.");

    if (!fromDestructor)
    {
        GC.SuppressFinalize(this);
    }

    stopped = true;
    isStopping = false;
}

In this code snippet, the first Dispose method is our public interface that satisfies the requirement for IDisposable implementation. It simply calls our private Dispose method that takes a parameter indicating whether it was called from the class destructor method.

The second private Dispose method is where some housekeeping for the blocking collection and dequeue thread happens. First, we call the blocking collection's CompleteAdding method. This will disallow any further additions to the queue, minimizing the chance that the dequeue thread will never end because messages continue to be added. We then attempt to wait for the thread to complete by calling the thread's Join method, specifying a timeout value for the thread. If the thread is not complete within the specified timeout, we forcibly destroy it and exit. Finally, if called from the class' destructor, we can suppress the finalize method of the garbage collector.

To utilize a producer-consumer queue like this one is quite simple:

C#
class Program
{
    static void Main(string[] args)
    {
        using (MyQueue queue = new MyQueue())
        {
            for (int msgIdx = 1; msgIdx < 101; msgIdx++)
            {
                queue.AddLog(new MyMessage
                {
                    MessageId = msgIdx,
                    Message = string.Format("Message text # {0:#,##0}", msgIdx)
                });
            }
        }
    }
}

The using statement ensures that the queue's Dispose method is invoked upon completion, thereby stopping the dequeing thread. When executed in a loop like this one that enqueues 100 messages, the tail end of the output looks like this:

Enqueueing: Message with ID 92 and value Message text # 92.
Enqueueing: Message with ID 93 and value Message text # 93.
Enqueueing: Message with ID 94 and value Message text # 94.
Dequeueing: Message with ID 88 and value Message text # 88.
Dequeueing: Message with ID 89 and value Message text # 89.
Dequeueing: Message with ID 90 and value Message text # 90.
Dequeueing: Message with ID 91 and value Message text # 91.
Enqueueing: Message with ID 95 and value Message text # 95.
Enqueueing: Message with ID 96 and value Message text # 96.
Enqueueing: Message with ID 97 and value Message text # 97.
Enqueueing: Message with ID 98 and value Message text # 98.
Dequeueing: Message with ID 92 and value Message text # 92.
Dequeueing: Message with ID 93 and value Message text # 93.
Dequeueing: Message with ID 94 and value Message text # 94.
Dequeueing: Message with ID 95 and value Message text # 95.
Enqueueing: Message with ID 99 and value Message text # 99.
Enqueueing: Message with ID 100 and value Message text # 100.
Dequeueing: Message with ID 96 and value Message text # 96.
Dequeueing: Message with ID 97 and value Message text # 97.
Dequeueing: Message with ID 98 and value Message text # 98.
Dequeueing: Message with ID 99 and value Message text # 99.
Dequeueing: Message with ID 100 and value Message text # 100.
Shutting down queue. Waiting for dequeue thread completion.
Dequeue thread complete.

As you can see, the dequeue process slightly lags the enqueue process, as you would expect for processes running in separate threads. The messages are interspersed as the threads compete for the shared resource.

Finishing It Off

So what we've demonstrated is a way to implement a producer-consumer pattern without writing a lot of thread management code. While this pattern is not applicable in a great many situations, it certainly has its uses. Any time you need to queue up items for processing but don't want to slow down the primary process, give this pattern a try.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
CEO RAMSoft Solutions, Inc
United States United States
Bob McGowan is the founder and CEO of RAMSoft Solutions and has led the company since its inception in 1993. He has over 30 years of experience managing projects and developing applications software for microcomputers. He has managed multi-million dollar software development projects for many Fortune 100 companies, including Chase Manhattan Bank, Prudential Insurance, and True North Media Services and has acted as a senior technical advisor for dozens of other projects.

Bob has been developing software for microcomputers since 1980, and has developed applications for many companies using a wide variety of development tools, languages, architectures, and operating systems. In 2004 he co-founded an information technology services company specializing in the Yellow Pages Advertising industry. As Chief Technology Officer he architected and implemented a service allowing agencies to view electronic "tear pages" via web services or interactively via a web site. The process included an optical character recognition (OCR) workflow automation process in for scanning, OCR, and review of Yellow Pages telephone directories. After 12 months of operation the database contained over 1.6 million scanned pages and accompanying extracted text in a searchable format.

He continues to be passionate about software development and technology in general. You can follow his thoughts on software development at http://blog.ramsoftsolutions.com

Comments and Discussions

 
GeneralMy vote of 5! Pin
jediYL27-Jul-15 12:41
professionaljediYL27-Jul-15 12:41 
GeneralRe: My vote of 5! Pin
Bob McGowan27-Jul-15 15:58
Bob McGowan27-Jul-15 15:58 
Your compliment is very much appreciated. I take some pride in writing well, in both prose and code. I also enjoy instructing, so your praise was very satisfying.

Thanks very much.
QuestionMinor bugs in the code Pin
djmarcus27-May-15 9:18
djmarcus27-May-15 9:18 
AnswerRe: Minor bugs in the code Pin
Bob McGowan27-Jul-15 15:55
Bob McGowan27-Jul-15 15:55 
GeneralRe: Minor bugs in the code Pin
djmarcus29-Jul-15 5:34
djmarcus29-Jul-15 5:34 
SuggestionHave you tried Immutable Collections? Pin
Donny Redmond24-Sep-14 1:30
Donny Redmond24-Sep-14 1:30 
GeneralRe: Have you tried Immutable Collections? Pin
Bob McGowan24-Sep-14 3:25
Bob McGowan24-Sep-14 3:25 
GeneralRe: Have you tried Immutable Collections? Pin
Donny Redmond24-Sep-14 3:52
Donny Redmond24-Sep-14 3:52 
NewsNot bad. Pin
Md. Marufuzzaman23-Sep-14 5:18
professionalMd. Marufuzzaman23-Sep-14 5:18 
QuestionMy vote of 5 Pin
EveryNameIsTakenEvenThisOne22-Sep-14 23:09
professionalEveryNameIsTakenEvenThisOne22-Sep-14 23:09 
AnswerRe: My vote of 5 Pin
Bob McGowan23-Sep-14 3:21
Bob McGowan23-Sep-14 3:21 
GeneralMy vote of 5 Pin
johannesnestler21-Sep-14 22:25
johannesnestler21-Sep-14 22:25 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.