Click here to Skip to main content
Click here to Skip to main content

Generic Alert Handler - A Practical Example on Multithreading

, 18 Aug 2010
Rate this:
Please Sign up or sign in to vote.
With the help of an example in this article, I have tried to explain a practical multi-threading application design. The Microsoft CLR Profiler tool has been used to get an insight on what is going on. Emphasis is given to development of a generic module as it maximizes code reuse.

Introduction

Recently, I have been developing a number of device monitoring applications where I needed a module like Alert Handler. After going through a couple of iterations, I have arrived at this design which I am going to share with you in this article. This article emphasizes two important points; namely multi-threading and building generic module for reuse.

In a computer system, a Process is known to have isolated compartment. This is managed by Operating System to safeguard one application from another. The only ways two processes can communicate are through means of Inter Process Communication (IPC) which are provided by the Operating System. Thread is defined as a lightweight Process. Multi-Threading provides an alternative approach to concurrency by sharing common memory. But still it is not free of cost; we need to protect them against corruption and avoid deadlocks. As a result, multi-threaded programs are difficult to develop and debug.

In this article, I will show you a practical example in multi-threading and also how to profile the application to get a better insight. One most important paradigm in life is that we should first see and then believe. To follow this here, I will use a free tool CLR Profiler from Microsoft. I am sure that the discussions provided here in this article will help beginners to understand the multi-threading intricacies and help them in their future project.

The second aspect of this article is to emphasize on making a reusable module. Reuse of code base has always been one of the prime motivations in any software project. In order to reuse code, a module has to be designed in a specific manner and with special care. For example, take the case of logging utility. As you know, in every application we need the logging utility as part of our application infrastructure. So we have to design the logging module and define the APIs in a way that it can be reused in multiple applications and preferably without changes. You will see that I have used a similar logging module in this article as well (which is a reduced version taken from NSpring Logging Library for .NET by Jeffrey Varszegi). As mentioned in A Look At What's Wrong With Objects by Marc Clifton, the idea here is to increase the “Application Generic” components as far as possible in a specific project. Ideally attempt should be made to stretch Application Generic components in a project so that it will result in maximum code reuse as described in the following figure:

Sample Image - maximum width is 600 pixels

FIGURE 1 – Application with Generic and Specific Components.

A Brief Note on Requirements

Many times, in our application we needed to handle alerts and pass the message through different channels to appropriate persons specified as target responders, i.e. engineers who are responsible to manage the devices (may be they are located in remote places, say a plant area or a housing society or a hospital building, etc.). Alerts are communicated to the target responders through different engagement channels as per the responder database which is maintained in the application.

In this context, some examples of channels are:

  1. SMS
  2. Email
  3. Console
  4. Siren
  5. Loud Speaker, etc.

In this article, I will show how such AlertHandler module can be implemented. I will use .NET C# for implementation. AlertHandler handles all application generated alerts. These alerts are then sent as messages through Email, SMS, and audios through Sirens, LoudSpeakers – also alerts as popup dialog on supervisory consoles.

Alert Handler Architecture

Sample Image - maximum width is 600 pixels

FIGURE 2 – Alert Manager Architecture.

Design Approach

When an alert message arrives, the alert may be required to pass to multiple channels. In architecting this module, I first design a mediator, called AlertManager, which receives the incoming alerts and dispatches them to appropriate channels. To avoid hard coded "switch..case" in dispatch logic, I will use Chain of Responsibility design pattern. Simply this can be described as follows, the AlertManager owns a List containing all the registered channel handlers, when an alert message is received it will pass the message to each of the handlers in the List. Based on the channel type, only the designated handler will handle the alert and the rest will ignore the alert message.

To minimize coupling with the rest of the application at the same time to work in non blocking asynchronous mode and while processing in order not to lose any alert event, I will assume that alerts are received in an alertQueue. They are then dispatched from there to appropriate message handlers as per the specified channel in the message. In this client-server design, I will use Publish/Subscribe design pattern. Clients, i.e., channel handlers (such as SMS, Console and Email) may register their interest in handling alerts to the mediator AlertManager. Each channel handler may then process the message in its own specific way.

When the application initializes, we'll instantiate just one instance of the AlertManager, but based on configured channels for a specific application we will instantiate different SMS, Console and Email channel handlers. These are typically instantiated by extracting types which are derived from a base type (in this case Handler) in the executing assembly (in this case AlertHandler.exe) and using a Dynamic Factory based on .NET reflection. If the same alert message is to be sent to multiple channels, then there will be multiple message entries in the alertQueue one with each specific channel.

With this approach, you see that AlertManager need not have any prior knowledge on which communication channel the responder has opted for, so in future new communication channels can be added without code changes in AlertManager.

The figure below shows the class diagram of the important classes present in AlertHandler module.

Sample Image - maximum width is 600 pixels

FIGURE 3 – AlertHandler Classes.

The AlertManager offers a mechanism called registration. When the SMS, Console and Email channel handlers are constructed and initialized, they register themselves with AlertManager as a part of startup process, so that the AlertManager knows how to notify the channel handler objects when new alert messages arrive. Now, when the AlertManager receives a new alert message, it passes the alertMsg object to all the registered handlers giving them an opportunity to process. As explained earlier, since I have implemented Chain of Responsibility Pattern only the designated channel handler will process the new message as per the channel information present in the alert message. The following lines illustrate the AlertManager class which does also implement IDisposable interface.

AlertManager Class

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace AlertHandler
{
    public class AlertMsgContext
    {
        // Type defining information passed to receivers of the event
        public AlertMsgContext(String from, String to, String subject, 
        String body, Common.AlertChannels channel)
        {
            this.from = from;
            this.to = to;
            this.subject = subject;
            this.body = body;
            this.channel = channel;
        }
        public readonly String from, to, subject, body;
        public readonly Common.AlertChannels channel;
    }

    /// <summary>
    /// The mediator in handling alert messages
    /// </summary>
    public class AlertManager : IDisposable
    {
        public AlertManager()
        {
            // Construct an instance of the AlertManager
        }

        /// <summary>
        /// A dedicated thread is created to dequeue
        /// </summary>
        public void Initialize()
        {
            alertQueue = new Queue<alertmessage>(Common.startPoolSize);
            handlers = new List<handler>();
            nodeAdd = new ManualResetEvent(false);
            alertDispatcher = new Thread(new ThreadStart(DequeueMsg));
            alertDispatcher.Start();
        }

        public void EnqueueMsg(AlertMessage alertMessage)
        {
            alertQueue.Enqueue(alertMessage);
            lock (eventLocker)
            {
                nodeAdd.Set();
            }
        }

        /// <summary>
        /// Method responsible for notifying registered handlers
        /// </summary>
        private void OnAlertMsg(AlertMsgContext context)
        {
            // Has any object registered interest with our event?
            foreach (Handler ah in handlers)
            {
                if (ah.HandleMsg(context))
                {
                    break;      // break the loop as it is handled
                }
            }
        }

        /// <summary>
        /// First construct an object to hold the information I want
        /// and pass to the receivers of this notification
        /// </summary>
        private void TriggerAlertEvent(String from, String to, 
		String subject, String body,
            Common.AlertChannels channel)
        {
            AlertMsgContext context =
               new AlertMsgContext(from, to, subject, body, channel);
            OnAlertMsg(context);
        }

        /// <summary>
        /// This method is called when an alert has been dequeued
        /// </summary>
        private void HandleMessage(AlertMessage alertMessage)
        {
            TriggerAlertEvent(alertMessage.recipient, alertMessage.backupRecipient,
                "Controller ID - " + alertMessage.controllerID + " " +
                "Device ID - " + alertMessage.deviceID + " "
                + alertMessage.category.ToString(), alertMessage.detailedMessage,
                alertMessage.channel);
        }

        private void DequeueMsg()
        {
            bool empty = false;
            AlertMessage alertMessage = null;
            while (nodeAdd.WaitOne())
            {
                while (alertQueue.Count > 0)
                {
                    alertMessage = alertQueue.Dequeue(ref empty);
                    if (alertMessage == default(AlertMessage))
                    {
                        return;
                    }
                    else
                    {
                        HandleMessage(alertMessage);
                    }
                }
                lock (eventLocker)
                {
//to handle the scenario, let's say de-queue thread consumed all
//messages and about to reset the event. Before that another message
//dropped in and locked the object. Now de-queue thread is waiting for
//lock. Particular enqueue thread now adds a message, sets the event
//and releases the lock. Now de-queue thread will get the lock and if
//we would not have checked the alertQueue.Count it would have reset
//the event and now we will fail to process the newly added message, until
//another message is added.
                    if (0 == alertQueue.Count)
                    {
                        nodeAdd.Reset();
                    }
                }
            }
        }

        public void Register(Handler ah)
        {
            handlers.Add(ah);
        }

        public void UnRegister(Handler ah)
        {
            handlers.Remove(ah);
        }

        private void ClearHandlers()
        {
            for (int i = 0; i < handlers.Count; i++)
            {
                handlers[i].Dispose();
            }
            handlers.Clear();
            handlers = null;
        }

        private void ClearQueue()
        {
            alertQueue.Clear();
            alertQueue = null;
        }

        /// <summary>
        /// Implementing IDisposable pattern
        /// </summary>
        protected virtual void Dispose(bool disposing)
        {
          if(!disposed && disposing)
          {
            alertDispatcher.Abort();
            ClearHandlers();
            ClearQueue();
            nodeAdd.Close();
            disposed = true;
          }
        }

        public void Dispose()
        {
            Dispose(true);
            // This object will be cleaned up by the Dispose method.
            // Therefore, we call GC.SupressFinalize to
            // take this object off the finalization queue
            // and prevent finalization code for this object
            // from executing a second time.
            GC.SuppressFinalize(this);
        }

        #region Declarations

        private Queue<alertmessage> alertQueue = null;
        private List<handler> handlers = null;
        private Thread alertDispatcher = null;
        private ManualResetEvent nodeAdd = null;
        private readonly object eventLocker = new object();
        private bool disposed = false;

        #endregion
    }
}

You can see in the AlertManager we have defined a method OnAlertMsg responsible for notifying registered handlers of the alert event. The OnAlertMsg method is called when a new alert has been dequeued, i.e., read from the alertQueue by the dedicated thread in AlertManager. This method receives an initialized AlertMsgContext object containing additional information about the event. The thread callback method DequeueMsg in AlertManager empties alerts from alertQueue in alert extraction loop and through HandleMessage it will call OnAlertMsg.

Handler Abstract Class

For different channel handlers, first we define an abstract class Handler which each handler type is supposed to implement. The first method in Handler is Register which registers the specific channel handler to the AlertManager showing interest in the alertMessage. This does not change with different handlers so it is kept in the base Handler class. The second method in Handler is HandleMsg which does the actual handling of alert message and it will differ based on channel type so it is kept virtual. This method is responsible for channel specific rendering of alert messages and also communication protocol handling if any. The third method is UnRegister which simply un-registers thereby removing the entry from the AlertManager’s registered handler List. Handler class also implements IDisposable interface.

using System;
using System.Collections.Generic;
using System.Text;

namespace AlertHandler
{
    /// <summary>
    /// Disposable abstract base class for all channel handlers
    /// </summary>
    public abstract class Handler : IDisposable
    {
        public void Register()
        {
            // after instantiation register with Alert Manager.
            aManager = App.GetAM();
            aManager.Register(this);
        }

        public virtual bool HandleMsg(AlertMsgContext context)
        {
            return false;
        }

        /// <summary>
        /// Unregister myself with AlertManager
        /// </summary>
        public void UnRegister()
        {
            aManager.UnRegister(this);
        }

        /// <summary>
        /// Implementing IDisposable pattern
        /// </summary>
        protected virtual void Dispose(bool disposing)
        {
            if (!disposed && disposing)
            {
                aManager = null;
                disposed = true;
            }
        }

        public void Dispose()
        {
            Dispose(true);
            // This object will be cleaned up by the Dispose method.
            // Therefore, we call GC.SupressFinalize to
            // take this object off the finalization queue
            // and prevent finalization code for this object
            // from executing a second time.
            GC.SuppressFinalize(this);
        }

        #region Declarations

        private AlertManager aManager = null;
        private bool disposed = false;

        #endregion
    }
}

SMS Class

A sample channel handler class SMS as implemented is listed below:

using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;


namespace AlertHandler
{
    class SMS : Handler
    {
        public SMS()
        {
            // Construct an instance of the SMSMsgEventHandler
        }

        /// <summary>
        /// This is the method that the AlertManager will call to
        /// notify the SMS object that a new message has arrived
        /// Implementing Chain of Responsibility pattern
        /// </summary>
        public override bool HandleMsg(AlertMsgContext context)
        {
            bool result = false;
            if (context.channel == Common.AlertChannels.SMS)
            // I will work only if it is for me
            {
                result = true;
                if (ThreadPool.QueueUserWorkItem(new WaitCallback(SendMessage), context))
                {
                    Console.WriteLine("Queued the work for ThreadPool request in SMS...");
                }
                else
                {
                    Console.WriteLine("Unable to queue ThreadPool request in SMS...");
                }
            }
            return result;
        }

        /// <summary>
        /// Wrapper method for use with thread pool.
        /// 'arguments' identifies the additional event information
        /// that the AlertManager wants to give us.
        /// </summary>
        private void SendMessage(object arguments)
        {
// Normally, the code here would SMS the Alert message like using
// pagegate third party software

// This test implementation displays the info on the console and also logs for audit
            AlertMsgContext messageArguments = arguments as AlertMsgContext;
            Console.WriteLine("SMS message:");
            Console.WriteLine(
               "   To: {0}\n   From: {1}\n   Subject: {2}\n   Body: {3}\n",
               messageArguments.from, messageArguments.to, messageArguments.subject,
               messageArguments.body);
            App.FileLog("SMS message:", messageArguments.from + " " + messageArguments.to
                + " " + messageArguments.subject + " " + messageArguments.body);
        }
    }
}

When the application initializes, it would first construct an AlertManager object. Along with AlertManager, channel handlers are also constructed as per the application configuration. In the startup process, channel handlers will register with AlertManager, this essentially means that their reference gets added in a container list maintained by AlertManager. When an SMS alert message arrives then after dequeuing, AlertManager invokes its own OnAlertMsg, where it would pass the message to each of the registered handlers. The channel type as present in the data structure of queued alert will be verified by each channel handler and only the SMS handler then will process this alert message.

The handler’s HandleMsg has a parameter which is a reference to an AlertMsgContext object. This object contains additional information that would be useful to specific channel handlers. From the AlertMsgContext object, the SMS handler has access to the following information pertaining to the AlertMsg:

  1. The controllerID
  2. The deviceID
  3. The category
  4. The detailedMessage and
  5. The recipient

In a real SMS implementation, this information would be sent as SMS to receiver mobile phone using the field recipient. In the sample code for this article, the information is simply displayed on console window just for illustration.

Note: Notice that here we have used ThreadPool class from System.Threading namespace of .NET. Pooling is done for efficiency reason especially if the times of executing the thread task is comparable with thread start and dispose time taken by the operating system. If you take a look at this class, you will see that all the members are static and there is no public constructor. This is because there's only one pool per process and we cannot create a new one.

Detail Data Structures

public enum AlertChannels
        {
            Console = 1,
            Email,
            SMS,
            Siren,
            Unknown
        }

public enum ExceptionType
        {
            Fire = 1,
            Leakage,
            NoFuel,
            HighVoltage,
            HighPressure,
            HighTemparature,
            NoWater,
            LowVoltage,
            LowPressure,
            HighCurrent,
            LowCurrent,
            SinglePhase,
            FailedToStart,
            CommunicationFailed,
            Unknown
        }
    }

Alerts are generated as part of the rule based actions depending on ExceptionType, device type (which includes Tele-Controller, Water Pump, Diesel-Generator, Air-Conditioner, Halogen-Lights, and Fans, etc.) and their strategic locations in the area under supervision.

    public class AlertMessage
    {
        public String controllerID = null;
//Static IP address of the Telecontroller
        public String deviceID = null;
//MacID of the device
        public Common.ExceptionType category;
//this is the ExceptionType
        public String detailedMessage = null;
//IDs will get translated with shortnames so that they become human
//readable
        public Common.AlertChannels channel;
//based on this, the message will be dispatched to specific channel
//handler by the Alert manager
       public String recipient = null;
//user ID of the responder
        public String backupRecipient = null;
//backup user ID usually the supervisor of the responder
    }

Alerts generated by the application are inserted in the queue by the device monitoring threads and the dedicated thread of AlertManager dequeues and calls its own HandleMessage – then based on channel type appropriate channel handler will send the alert. AlertManager will also have the option to log the alerts before dispatching them to registered channels. Channel handlers by themselves will log exceptions if any – say for example in case of email failure.

Important Concepts

Publish Subscribe Design Pattern

With systems based on the Publish Subscribe design pattern (also called Observer pattern), subscribers register their interest in an event, or a pattern of events, and are asynchronously notified of events generated by publishers. In our example, the publishers are various device monitoring threads which generate alerts and subscribers are specific channel handlers which send alerts to appropriate responders. Using this pattern, we achieve various decoupling (refer the Many Faces of Publish/Subscribe) to achieve testable and maintainable codes. Decoupling also enforces scalability at the abstraction level, by allowing participants to operate independently of one another. Namely the decoupling here are

Space Decoupling: The Publishers do not hold references to the Subscribers; neither do they know how many of these Subscribers are participating in the interaction. Similarly, Subscribers do not hold references to the Publishers; neither do they know how many of these Publishers are participating in the interaction.

Time Decoupling: With the addition of message queue, the Publisher and Subscriber do not need to be participating in the interaction at the same time.

Synchronization Decoupling: The production and consumption of messages do not happen in the main flow of control of the Publishers and Subscribers, and do not therefore happen in a blocking manner. So Publishers and Subscribers do work independent of each other.

Chain of Responsibility Design Pattern

We have used this pattern in the interaction between AlertManager and registered channel handlers. When an alert message is dequeued from alertQueue, we use this pattern to avoid coupling between AlertManager and channel handlers by giving more than one handler a chance to verify the message. To say in another way, we chain the receiving Subscriber objects and pass the message along the chain until an object handles it. As a result, AlertManager need not know which alert message to pass to which channel handler. Channel handlers themselves decide whether to handle the message or not based on the channel type which is a field in AlertMessage data structure. It’s an example of the Open/Closed Principle. The behavior of the system can be altered by adding handlers in the chain of responsibility (in the List of registered alert handlers) without making any code modifications to AlertManager.

Dynamic Factory Design Pattern

Factory pattern separates the object construction from the rest of the code. However it still has coupling although it is localized in the factory class. If we use .NET reflection in Factory Pattern, then we overcome these dependencies which are called as hard coded coupling in the form of "switch..case" as present in the Concrete Factory Pattern. HandlerFactory class utilizes this concept of Dynamic Factory. Following is the code listing from HandlerFactory class and TypeExtractor class implementation.

HandlerFactory Class

using System;
using System.Collections.Generic;
using System.Reflection;

namespace AlertHandler
{
    /// <summary>
    /// Dynamic Factory to create a handler instance
    /// </summary>
    public class HandlerFactory
    {
        /// <summary>
        /// Usage - Object instance = App.CreateInstance(classType);
        /// </summary>
        public Handler CreateHandler(Type classType)
        {
            Handler ah = null;
            if (classType != null)
            {
                // Create an instance of an object.
                try
                {
                    ah = Activator.CreateInstance(classType) as Handler;
                }
                catch (Exception e)
                {
                    App.FileLog("<HandlerFactory><CreateHandlerFromTypes>", 
			"Error creating object instance of type: " + 
			classType.ToString() + " " + e.Message);
                }
            }
            return ah;
        }
    }
}

TypeExtractor Class

using System;
using System.Collections.Generic;
using System.Reflection;

namespace AlertHandler
{
    /// <summary>
    /// Helper class to Dynamic Factory to create a handler instance
    /// </summary>
    public class TypeExtractor
    {
        public static List<Type> GetAllHandlerTypes()
        {
            List<Type> chnlTypes = null;
            Type[] typeArray = null;
            try
            {
                Assembly assembly = Assembly.GetExecutingAssembly();
                typeArray = assembly.GetTypes();
                chnlTypes = new List<Type>();
                foreach (Type type in typeArray)
                {
                    if (type.BaseType.Equals(assembly.GetType
				("AlertHandler.Handler", true)))
                    {
                        chnlTypes.Add(type);
                    }
                }
            }
            catch (Exception e)
            {
                App.FileLog("<TypeExtractor><GetAllHandlerTypes>", 
			"Error getting Handler types, " + e.Message);
            }
            return chnlTypes;
        }
   }
}

Object Pool

We all know that in .NET, released objects are garbage collected by the .NET framework. We do not need to worry about whether we have used exact new() delete() pair in our code. This is fine, but again we need to bear the cost of Garbage Collection. So when we use the same type of object again and again, the technique of Object Pooling allows us to re-use released objects again, thereby avoiding Garbage Collection (GC) overhead. In this implementation, Objects are pooled into a linked list data structure which is implemented using lock-free algorithm. The code below demonstrates lock-free linked list operations for getting and adding objects to pool - (taken and made generic from ... Managed I/O Completion Ports (IOCP) - Part 2 by P.Adityanand).

Now take a pause and discuss why to use lock free structure? This is because when our application is running in a multi-processor environment (which is prevalent in today’s PC hardware), it may reduce the performance due to lock contention when we use locking to protect common memory among multiple threads. The common memory pool we use here is a singly linked list data structure with head and tail pointers.

The lock free algorithm works this way:

  1. Nodes are only added after the last node in the linked list using tail pointer.
  2. Nodes are always removed from the beginning of the linked list using head pointer.
  3. To obtain consistent values of various pointers, we rely on sequences of reads that re-check earlier values to be sure they haven’t changed (please refer A Pragmatic Implementation of Non-Blocking Linked-Lists).

In this implementation, we have used Compare and Swap (CAS) which is a very familiar term in the context of multi-threaded applications. In .NET, this is implemented in System.Threading namespace in the method Interlocked.CompareExchange. It allows us to compare two values, and update one of them with a new value, all in a single atomic thread-safe operation.

Note: Atomic means that the operation cannot be interleaved with other thread operations.

using System;
using System.Threading;
using System.Reflection;

namespace AlertHandler
{
/// <summary>
/// Defines a factory interface to be implemented by classes
/// that creates new poolable objects
/// </summary>
	public abstract class PoolableObjectFactory<T>
	{
/// <summary>
/// Create a new instance of a poolable object
/// </summary>
/// <returns>Instance of user defined PoolableObject derived
/// type</returns>
        public abstract PoolableObject<T> CreatePoolableObject();
	}

/// <summary>
/// Poolable object type. One can define new poolable types by deriving
/// from this class.
/// </summary>
	public class PoolableObject<T>
	{
/// <summary>
/// Default constructor. Poolable types need to have a no-argument
/// constructor for the poolable object factory to easily create
/// new poolable objects when required.
/// </summary>
		public PoolableObject()
		{
			Initialize();
		}

/// <summary>
/// Called when a poolable object is being returned from the pool
/// to caller.
/// </summary>
		public virtual void Initialize()
		{
			LinkedObject = null;
		}

/// <summary>
/// Called when a poolable object is being returned back to the pool.
/// </summary>
		public virtual void UnInitialize()
		{
            LinkedObject = null;
		}
        internal PoolableObject<T> LinkedObject;
	}

/// <summary>
/// Lock Free ObjectPool
/// </summary>
	public class ObjectPool<T>
	{
/// <summary>
/// Creates a new instance of ObjectPool
/// </summary>
/// <param name="pof">Factory class to be used by ObjectPool to
/// create new poolable object instance when required.</param>
		public ObjectPool(PoolableObjectFactory<T> pof, bool bCreateObjects)
		{
			_pof = pof;
			_bCreateObjects = bCreateObjects;
			Init(0);
		}

/// <summary>
/// Creates a new instance of ObjectPool with n-number of pre-created
/// objects in the pool.
/// </summary>
/// <param name="pof">Factory class to be used by ObjectPool to
/// create new poolable object instance when required.</param>
/// <param name="objectCount">Numberof objects to pre-create</param>
		public ObjectPool(PoolableObjectFactory<T> pof, 
		bool bCreateObjects, long objectCount)
		{
			_pof = pof;
			_bCreateObjects = bCreateObjects;
			Init(objectCount);
		}

/// <summary>
/// Add the poolable object to the object pool. The object is
/// uninitialized before adding it to the pool.
/// </summary>
/// <param name="newNode">PoolableObject instance</param>
		public void AddToPool(PoolableObject<T> newNode)
		{
			newNode.UnInitialize();
			PoolableObject<T> tempTail = null;
			PoolableObject<T> tempTailNext = null;
			do
			{
				tempTail = _tail;
				tempTailNext = tempTail.LinkedObject 
					as PoolableObject<T>;
                if (tempTail == _tail)
				{
					if (tempTailNext == null)
					{
// If the tail node we are referring to is really the last
// node in the queue (i.e. its next node is null), then
// try to point its next node to our new node
// We have used Compare and Swap (CAS) which is a very familiar term
// in the context of multi-threaded applications. It allows us to
// compare two values, and update one of them with a new value, all in
// a single atomic thread-safe operation.
                        if (Interlocked.CompareExchange
			<poolableobject<t>>(ref tempTail.LinkedObject, 
				newNode, tempTailNext) 
				== tempTailNext)
                        {
                            break;
                        }
					}
					else
					{
// This condition occurs when we have failed to update
// the tail's next node. And the next time we try to update
// the next node, the next node is pointing to a new node
// updated by other thread. But the other thread has not yet
// re-pointed the tail to its new node.
// So we try to re-point to the tail node to the next node of the
// current tail
                        Interlocked.CompareExchange<poolableobject<t>>
				(ref _tail, tempTailNext, tempTail);
					}
				}
			} while (true);

// If we were able to successfully change the next node of the current
// tail node to point to our new node, then re-point the tail node also
// to our new node
            Interlocked.CompareExchange<poolableobject<t>>(ref _tail, newNode, tempTail);
			Interlocked.Increment(ref _count);
		}

/// <summary>
/// Returns an existing object from the pool or creates a
/// new object if the pool is empty. If an existing object is being
/// returned it is initialized before returned to the caller.
/// </summary>
/// <returns>PoolableObject instance</returns>
		public PoolableObject<T> GetObject()
		{
			bool empty = false;
			PoolableObject<T> tempTail = null;
			PoolableObject<T> tempHead = null;
			PoolableObject<T> tempHeadNext = null;
			do
			{
				tempHead = _head;
				tempTail = _tail;
				tempHeadNext = tempHead.LinkedObject 
					as PoolableObject<T>;
                if (tempHead == _head)
				{
// There may not be any elements in the queue
					if (tempHead == tempTail)
					{
						if (tempHeadNext == null)
						{
// If the queue is really empty come out of dequeue operation
							empty = true;
							break;
						}
						else
						{
// Some other thread could be in the middle of the
// enqueue operation. it could have changed the next node of the tail
// to point to the new node. So let us advance the tail node to point
// to the next node of the current tail
                            Interlocked.CompareExchange
				<poolableobject<t>>
				(ref _tail, tempHeadNext, tempTail);
						}
					}
					else
					{
// Move head one element down.
// If succeeded Try to get the data from head and
// break out of the loop.
                        if (Interlocked.CompareExchange
			<poolableobject<t>>
				(ref _head, tempHeadNext, tempHead) == tempHead)
						{
							break;
						}
					}
				}
			} while (true);
			if (empty == false)
			{
				Interlocked.Decrement(ref _count);
				tempHead.Initialize();
			}
			else
			{
                if (_bCreateObjects == true)
                {
                    tempHead = _pof.CreatePoolableObject();
                }
			}
			return tempHead;
		}

/// <summary>
/// Removes all the poolable objects from the pool.
/// </summary>
		public void Clear()
		{
			Clear(0);
		}

/// <summary>
/// Removes all the poolable objects from the pool. And fills the pool
/// with n-number of pre-created objects
/// </summary>
		public void Clear(long objectCount)
		{
			_count = 0;
			Init(objectCount);
		}

/// <summary>
/// Count of poolable objects in the pool
/// </summary>
		public long Count
		{
			get
			{
				return _count;
			}
		}

		private void Init(long objectCount)
		{
			_head = _tail = _pof.CreatePoolableObject();
			if (objectCount > 0)
			{
				for(int count=1; count<=objectCount; count++)
				{
					AddToPool(_pof.CreatePoolableObject());
				}
			}
        }

        #region Declarations

        private PoolableObject<T> _head = null;
		private PoolableObject<T> _tail = null;
		private long _count = 0;
		private bool _bCreateObjects = false;
		private PoolableObjectFactory<T> _pof = null;

        #endregion
    }
}

Lock Free Queue

Using the same lock-free algorithm as in Object Pool, we implement the Queue. The Queue we have used here is based on .NET generics as we required different queues with different message structures in other parts of our application (thereby reusing Queue class). Note once a node in Queue is dequeued using head pointer, it is added to Pool for future reuse.

The rest of this implementation is same as given in Managed I/O Completion Ports (IOCP) - Part 2 by P.Adityanand.

using System;
using System.Threading;

namespace AlertHandler
{
/// <summary>
/// Factory class to create new instances of the Node type
/// </summary>
    class NodePoolFactory<T> : PoolableObjectFactory<T>
    {
/// <summary>
/// Creates a new instance of poolable Node type
/// </summary>
/// <returns>New poolable Node object</returns>
        public override PoolableObject<T> CreatePoolableObject()
        {
            return new Node<T>();
        }
    }

/// <summary>
/// Internal class used by all other data structures
/// </summary>
    class Node<T> : PoolableObject<T>
    {
        public Node()
        {
            Init(default(T));
        }
        public Node(T data)
        {
            Init(data);
        }
        public override void Initialize()
        {
            Init(default(T));
        }
        private void Init(T data)
        {
            Data = data;
            NextNode = null;
        }
        public T Data;
        public Node<T> NextNode;
    }

/// <summary>
/// Lock Free Queue
/// </summary>
	public class Queue<T>
	{
/// <summary>
/// Creates a new instance of Lock-Free Queue
/// </summary>
		public Queue()
		{
			Init(0);
		}

/// <summary>
/// Creates a new instance of Lock-Free Queue with n-number of
/// pre-created nodes to hold objects queued on to this instance.
/// </summary>
		public Queue(int nodeCount)
		{
			Init(nodeCount);
		}

		public void Enqueue(T data)
		{
			Node<T> tempTail = null;
			Node<T> tempTailNext = null;
			Node<T> newNode = _nodePool.GetObject() 
				as Node<T>; //new Node(data);
			newNode.Data = data;
			do
			{
				tempTail = _tail;
				tempTailNext = tempTail.NextNode as Node<T>;
				if (tempTail == _tail)
				{
					if (tempTailNext == null)
					{
// If the tail node we are referring to is really the last
// node in the queue (i.e. its next node is null), then
// try to point its next node to our new node
// CAS will compare the object reference value pointed to by the first
// parameter, with that of the comparand, which is the third parameter,
// and will make the first variable point to the object specified in
// the second parameter.
                        if (Interlocked.CompareExchange
			<node<t>>(ref tempTail.NextNode, 
				newNode, tempTailNext) == tempTailNext)
							break;
					}
					else
					{
// This condition occurs when we have failed to update
// the tail's next node. And the next time we try to update
// the next node, the next node is pointing to a new node
// updated by other thread. But the other thread has not yet
// re-pointed the tail to its new node.
// So we try to re-point to the tail node to the next node of the
// current tail
//
                        Interlocked.CompareExchange<node<t>>
				(ref _tail, tempTailNext, tempTail);
					}
				}
			} while (true);

// If we were able to successfully change the next node of the current
// tail node to point to our new node, then re-point the tail node also
// to our new node
//
            Interlocked.CompareExchange<node<t>>(ref _tail, newNode, tempTail);
			Interlocked.Increment(ref _count);
		}

		public T Dequeue(ref bool empty)
		{
            T data = default(T);
            Node<T> tempTail = null;
            Node<T> tempHead = null;
            Node<T> tempHeadNext = null;
			do
			{
				tempHead = _head;
                tempTail = _tail;
                tempHeadNext = tempHead.NextNode;
				if (tempHead == _head)
				{
					// There may not be any 
					// elements in the queue
					//
					if (tempHead == tempTail)
					{
                        if (tempHeadNext == null)
						{
// If the queue is really empty come out of dequeue operation
							empty = true;
                            return default(T);
						}
						else
						{
// Some other thread could be in the middle of the
// enqueue operation. it could have changed the next node of the tail
// to point to the new node.
// So let us advance the tail node to point to the next node of the
// current tail
                            Interlocked.CompareExchange
				<node<t>>(ref _tail, tempHeadNext, tempTail);
						}
					}
					else
					{
// Move head one element down.
// If succeeded Try to get the data from head and
// break out of the loop.
//
                        if (Interlocked.CompareExchange
			<node<t>>(ref _head, tempHeadNext, tempHead) == tempHead)
                        {
                            data = tempHeadNext.Data;
                            break;
                        }
					}
				}
			} while (true);
			Interlocked.Decrement(ref _count);
            tempHead.Data = default(T);
			_nodePool.AddToPool(tempHead);
			return data;
		}

		public void Clear()
		{
            Init(_count);
		}

        //public void Clear(int nodeCount)
        //{
        //    Init(nodeCount);
        //}

		public long Count
		{
			get
			{
				return Interlocked.Read(ref _count);
			}
		}

		private void Init(long nodeCount)
		{
			_count = 0;
            if (_nodePool != null)
            {
                _nodePool.Clear(nodeCount);
            }
            else
            {
                _nodePool = new ObjectPool<T>(new NodePoolFactory<T>(), true, nodeCount);
                _head = _tail = _nodePool.GetObject() as Node<T>;
            }
        }

        #region Declarations

        private Node<T> _head;
        private Node<T> _tail;
		private long _count = 0;
		private ObjectPool<T> _nodePool = null;

        #endregion
    }
}

Profiling and Getting an Inside View

Testing and debugging multi-threaded application is an important job. As mentioned earlier, nothing you can believe unless you check it. In this section, we will have a brief look at how to profile a multi-threaded application for getting a better insight into its functioning details.

In a previous article, I have used Fiddler Tool from Microsoft to understand what is flowing through wire in an ASP.NET application. Here I will use CLR-Profiler Tool to look at what is going on in the multi-threaded application. The usuage and various explanations are given in “CLRProfiler.doc” available from the download site. The document How to use CLR Profiler is also a good one to get a quick grasp on usage. Using this tool, we come to know answers to several questions like, what are the heap allocation and deallocation statistics, how long does it take for GC to free the allocated heap memory, who is called by whom as a call stack across the application, etc.

Following are the important screenshots for AlertHandler as tested on a Laptop having configuration as Intel Pentium Dual CPU, T2390 @ 1.86 GHz, 1.99 GB of RAM having .NET framework 2.0 installed.

Sample Image - maximum width is 600 pixels

FIGURE 4 – Summary on Memory Allocation And Garbage Collection.

Sample Image - maximum width is 600 pixels

FIGURE 5 – Garbage Collection Time Line.

Sample Image - maximum width is 600 pixels

FIGURE 6 – Heap Allocation Graph with specific Caller and Callees Selected.

Sample Image - maximum width is 600 pixels

FIGURE 7 – Histogram by Age.

Sample Image - maximum width is 600 pixels

FIGURE 8 – Heap Allocation graph pruned to a specific Selected caller and callee.

Sample Image - maximum width is 600 pixels

FIGURE 9 – Handle Allocation graph pruned to a specific Selected caller and callee.

Sample Image - maximum width is 600 pixels

FIGURE 10 – Objects by Addresses.

Conclusion

To prevent corruption in common memory, multi-threaded programming with synchronizing locks does always come with a price. We need to be extremely careful to avoid deadlocks as well as corruption due to lack of synchronization. With multi-core CPUs in PC hardware, the contention on these locks degrade the overall application performance. It is hard to design scalable locking strategies due to problems such as deadlocks, and contentions.

In this article, I have used a proven lock-free algorithm for Queue and used locks only for short tasks like setting or resetting a ManualRestEvent object. Keir Fraser and Tim Harris in their paper Concurrent Programming Without Locks have described achieving high performance through using lock free algorithms. Have a look at this paper.

Additionally, this article also shows various design patterns in use which reduces coupling among software components and produce testable and maintainable code thereby making them reusable. We all know that code reuse will increase productivity as well as code quality. However some preparations as explained in this article are required to make reuse of code base, it is not free. Many times, due to constraint on time, it is not possible to look at this aspect during a project execution cycle. The best time to look at this is at the end of a project when we want to collect different quality and productivity metrics of the just concluded project. With some iteration, if we manage a few reusable modules at the end of a project, it becomes a bonus achievement for the whole team along with quality value addition to future clients.

Finally, I have used a free profiling tool for multi-threaded application and shown various screen shots taken using various options of this tool by running the sample application AlertHandler. These views help us in understanding how this application runs and how we can improve performance. How the objects are allocated and garbage collected and are there any memory leaks.

Like all software projects, this design approach is not the only way to solve this particular problem. There could be other ways to design and specifically I need to mention using .NET Events and Delegates as an alternative to Publish/Subscribe design pattern as used here. However I wanted to use a single dedicated dispatcher thread as this approach provides us a way to look at the Queue for testing and debugging.

I would appreciate your feedback comments. Happy coding and experimenting!

References

  1. A Look At What's Wrong With Objects by Marc Clifton
  2. Who cares about Domain Rules by Maruis Marais
  3. The Many Faces of Publish/Subscribe by Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, Anne-Marie Kermarrec
  4. Using the Chain of Responsibility Pattern by Jeremy D. Miller
  5. Creating Dynamic Factories in .NET Using Reflection by Romi Kovacs
  6. Managed I/O Completion Ports (IOCP) - Part 2 by P.Adityanand
  7. A Pragmatic Implementation of Non-Blocking Linked-Lists by Timothy L. Harris University of Cambridge Computer Laboratory, Cambridge, UK
  8. Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms by Maged M. Michael, Michael L. Scott Department of Computer Science, University of Rochester
  9. Concurrent Programming Without Locks by Keir Fraser and Tim Harris
  10. What every Dev must know about multi-threaded Apps by Vance Morrison
  11. Beginners Guide To Threading In .NET Part 1 of n by Sacha Barber
  12. The Practical Guide to Multithreading Part 1 by Ajay Vijayvargiya
  13. The Programming the Thread Pool in the .NET Framework by David Carmona
  14. Multi-threaded Debugging Techniques by By Shameem Akhter and Jason Roberts
  15. Logging Library for .NET by Jeffrey Varszegi
  16. CLR Profiler Tool from Microsoft
  17. How to use CLR Profiler

Acknowledgements

I must thank my ex colleague Mr. Parashar Satpute to help me in the implementation of final version of AlertHandler module. He had given various insights, explanations and implementation techniques as used in this article which I found extremely useful. Without his help, I think I could not have completed this article.

History

  • Initial version submitted on August 17, 2010
  • Minor text revision on August 18, 2010

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author

Suchi Banerjee, Pune
Software Developer (Senior) Free Lance Consulting
India India
Started my career with Tata Institute of Fundamental Research as Scientific Officer and worked in Project group.
Over 20 years of work experience in C++, C#, ASP.NET, SQL etc. Currently working as a Free Lance Software Consultant, Pune, India.

Comments and Discussions

 
QuestionSimply Fantastic............55 PinmemberTushar Patil,Pune17-Oct-11 1:07 
AnswerRe: Simply Fantastic............55 PinmemberSuchi Banerjee, Pune17-Oct-11 3:41 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web03 | 2.8.140721.1 | Last Updated 18 Aug 2010
Article Copyright 2010 by Suchi Banerjee, Pune
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid