Click here to Skip to main content
Click here to Skip to main content
Go to top

Simple Example Demonstrating MSMQ Message Routing

, 8 Apr 2008
Rate this:
Please Sign up or sign in to vote.
This article demonstrates the use of writing reusable components, in this case, a message router, for forwarding messages from one Message Queue to another.

Abstract

This article demonstrates the use of writing reusable components, in this case, a message router, for forwarding messages from one Message Queue to another.

Introduction

Disconnected Enterprise Applications are difficult to design, especially when data is critical. There are several methodologies available, but not all of them fit well. Some of the possible implementations may include Web Services, Message Queuing, Enterprise Service Bus (ESB) architecture, and Custom Synchronization Strategy.

While working on one of our projects, we needed to find the optimum solution for a Distributed Architecture based application. The project required information exchange over several offices spread across multiple geographical areas. So, in this article, I am explaining the methodology we adopted to make it simpler and get the work done.

Problem

The setup involved multiple integration points with various offices, franchisees, third party independent contractors, and their customers, who all needed to access data at various levels of security. So, we needed a reliable mechanism for sharing the details across geographical limits.

One of the well known approaches to integrate applications with a disconnected nature is to use a messaging architecture. That's exactly what we chose. However, using such an architecture has its own pros and cons. Well, that is a subject of another article.

Anyway, for our application, we wanted to provide reliable messaging as well as work in a highly disconnected infrastructure. We found message queues to provide us the answer; however, we still needed to solve a few annoyances like how we would forward a message from one queue to another? What if we wanted to forward based on the content of the message?

So, we decided, hey why not hack our way to a quick utility that does this for us? Further, why not make it configurable and threaded?

So, essentially, we wanted to do the following:

  1. Route messages from one Message Queue to another Message Queue.
  2. Route messages from one Message Queue to multiple Message Queues.
  3. Routing on the basis of text inside the messages.

A first utility could send all the messages from one public queue to another public queue directly, a second utility could send messages from a Message Queue to multiple Message Queues, and a third one, a Content Based Router, could distribute the messages from one queue to another on the basis of what is inside the messages.

We also wanted to check the activities of the application, so we needed to log its working. We also wanted to make the code adaptable. We wanted to be able to change the path of the message queues from outside the application using Extensible Markup Language (XML) configuration files. A quick hack was to use the in-built serialization of the .NET framework.

Oh, did I forget to mention we wanted this to run on our new quad core server? To achieve parallelism and use the benefits of threaded applications, we needed to make our application threaded as well as thread-safe so that messages could be transferred from multiple queues simultaneously without interfering with other queues in processing.

Now, you may ask, why go through so much trouble when a simple code can do the trick. Well, we also needed to monitor the message queue activities to know what's going on. The operations performed also needed to be shown on a Graphical User Interface (GUI) which will be discussed in an upcoming article. The implementation of these form the basis for employing a monitoring utility that we will write on top of this in a following article.

Solution

First of all, we needed a configuration class to hold the values that we could pass to the forwarder. Here's a simple design:

class diagram of MessageForwarderConfiguration

We started creating the Forwarder utility which simply forwards the messages from one message queue to another without looking inside the messages. For this, we created a class that received the path of the message queues from an XML serialized file in the constructor.

StreamReader strRd = new StreamReader(xmlfile);
MessageForwarderConfiguration queue =(MessageForwarderConfiguration)
           MessageForwarderConfiguration.Serializer.Deserialize(strRd);
MessageForwardingWorker fwdWorker = new MessageForwardingWorker(queue);
fwdWorker.RunWorkerAsync();

All the processing is done on a separate thread so that the application can manage the interaction between multiple queues at the same time. To achieve this, we inherited from the BackgroundWorker class (System.ComponentModel.BackgroungWorker) so that we could perform our operations on multiple threads.

using System.ComponentModel;

public class MessageForwardingWorker : BackgroundWorker
{
    ....
}

Then, the OnDoWork() method of the BackgroungWorker class is overloaded in the MessageForwardingWorker class to create the respective objects of the MessageQueue class - one for receiving the messages and another for sending the messages.

// constructor 
public MessageForwardingWorker(MessageForwarderConfiguration objForwardQueue)
{
   queueInfo_ = objForwardQueue;
}

// overloaded method of BackgroundWorker
protected override void OnDoWork(System.ComponentModel.DoWorkEventArgs e)
{
   while (!this.CancellationPending)
   {
      MessageQueue srcqueue = new MessageQueue(queueInfo_.SourceQueue);
      MessageQueue destqueue = new MessageQueue(queueInfo_.DestinationQueue);
      ...
   }
}

Then, we determined the existence of the source and the destination queues along with their read, write access.

if (MessageQueue.Exists(queueInfo_.SourceQueue))
{
    if (MessageQueue.Exists(queueInfo_.DestinationQueue))
    {
        if (srcqueue.CanRead)
        {
          if (destqueue.CanWrite)
           {
               // do the actual work here
               ....
          }
        }
    }
}

We start receiving the messages from the source queue dynamically, using the GetEnumerator2() method which provides a dynamic link to the queue which we are referencing. After picking the message from the source queue, the message is sent to the destination queue.

MessageEnumerator msgEnum = srcqueue.GetMessageEnumerator2();
while (msgEnum.MoveNext())
{
    // iterating on each message
     msgToSend = msgEnum.Current;
    id = msgToSend.Id;
    destqueue.Send(msgToSend);
    // removing the message after sending to destination queue
    srcqueue.ReceiveById(id);
}

Note:- To get Quality Of Service (QOS), we receive the message, and only after sending the message to the destination queue, do we remove the message from the source queue. This provides us the guarantee that the message is successfully transferred, else if we remove the message from the queue before actually sending it to the destination queue, then it could result in Consistency problems.

We also wanted to make sure that transactional destination queues were handled correctly.

while (msgEnum.MoveNext())
{
  // iterating on messages and using transactions
  msgToSend = msgEnum.Current;
  id = msgToSend.Id;
  if (isTransactional) destqueue.Send(msgToSend, mqtr);
  else destqueue.Send(msgToSend);
  srcqueue.ReceiveById(id);
}
if (isTransactional) mqtr.Commit();

Further, how will we know what's going on? Well, we logged all operations in a text file using LOG4NET – an Apache project.

private log4net.ILog log = log4net.LogManager.GetLogger(
        System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

public void doLog(string addToLog)
{
  ...
  log.Info(addToLog);
}

When writing threaded applications, one thing must be kept in mind. Make the threads efficient, not hogging the system completely. In our implementation, when a forwarder transfers all of its messages, it will wait for a predefined time interval specified in the XML configuration file, and when the thread comes out of the wait period, it again starts performing the same operations.

Thread.Sleep(queueInfo_.SleepTime);

The system can be enhanced to allow forwarding to multiple destinations by simply implementing a list of destination queues or an aggregator that combines messages from multiple source queues to a single destination.

Enhancing Message Forwarder to Content Based Message Router

The second utility is just a refinement of this Message Forwarder utility which sends a message to more than one destination queue by looking inside the message. Technically, we can implement the Router using just this utility.

The next step is to implement an upgraded version of the Forwarder utility facilitating content based message routing. We wanted our application to be able to distribute the messages among various queues on the basis of the content inside the message.

A reliable and efficient method is needed for matching the messages inside the Source Queues so that they can be sent to the appropriate destination queue. We used the label property associated with each message to identify the messages when they are actually stored in Source Queues. Thus, we only have to match the label of each message and not the whole message body and without descrambling the message. For matching labels, we prefer to use Regular Expressions (also known as RegEx) due to their powerful matching capabilities.

class diagram of MessageRouterConfiguration

We modified the configuration class for our needs as follows:

using System.ComponentModel;

public class MessageRoutingWorker : BackgroundWorker
{
    MessageRouterConfiguration queueInfo_;
    Dictionary<Regex, List<MessageQueue>> dictRegexMsgQList;
    MessageQueue srcQueue;
    ...
}

To achieve parallelism in the application, messages would be read from more than one source queue so it will also be threaded as well as thread safe, where each thread is performing independently.

For matching the label of the messages, we created a dictionary of Regex and a list of Message Queues where for each matched regex, there is a list of Message Queues on which the message needs to be sent.

Dictionary<Regex, List<MessageQueue>> dictRegexMsgQList;

As before, we initialize our class with the XML configuration, and initialize our dictionary with the matching pattern and the path of the source and destination queues corresponding to it.

// Constructor
public MessageRoutingWorker(MessageRouterConfiguration objRouteQueue)
{
  // Set the internal configuration object
  queueInfo_ = objRouteQueue;
  // Create the source queue
  srcQueue = new MessageQueue(queueInfo_.SourceQueue);
  // Now call the helper to create destination and the relevant regex
  doDictionaryInitializer();
}

private void doDictionaryInitializer()
{
  dictRegexMsgQList = new Dictionary<Regex, List<MessageQueue>>();
  // We create a simple pair structure for holding two values.
  foreach (Pair pair in queueInfo_.DestQueueList)
  {
     // Create an expression we want to match and associated destination
     // queues in a list.
     Regex regexpr = new Regex(pair.MatchExpression);
     List<MessageQueue> listMQ = new List<MessageQueue>();
     foreach (string str in pair.DestQueueList)
     {
       listMQ.Add(new MessageQueue(str));
     }
     // Now add these to our internal dictionary object
     dictRegexMsgQList.Add(regexpr, listMQ);
  }
}

At this point, we have the following members in each thread:

  1. Path of the Source Queue
  2. Sets of Match Expression and Destination Queues where the message needs to be sent

The constructor initializes all the message queues and regular expressions as defined in the configuration. At this point, we fire the threads to run and do their work.

StreamReader strRd = new StreamReader(xmlfile);
MessageRouterConfiguration queue = (MessageRouterConfiguration)
   MessageRouterConfiguration.Serializer.Deserialize(strRd);
MessageRoutingWorker routingWorker = new MessageRoutingWorker(queue);
fwdWorker.RunWorkerAsync();

In each thread, the existence of Source Queues along with their read permissions are checked, and their status is logged in a text file using LOG4NET – an Apache project.

// preparing the dictionary
private void doDictionaryInitializer()
{
  dictRegexMsgQList = new Dictionary<Regex, List<MessageQueue>>();
  // We create a simple pair structure for holding two values.
  foreach (Pair pair in queueInfo_.DestQueueList)
  {
    Regex regexpr = new Regex(pair.MatchExpression);
    List<MessageQueue> listMQ = new List<MessageQueue>();
    foreach (string str in pair.DestQueueList)
    {
      listMQ.Add(new MessageQueue(str));
    }
    dictRegexMsgQList.Add(regexpr, listMQ);
  }
}

To actually route the messages, we create an instance of the MessageEnumerator class using the GetMessageEnumerator2() method to enumerate on the messages in the queue. Now, traverse over each message and match each Regex available in the dictionary with the label of the message. When a match is found, send the message to the corresponding list of Message Queues. Before actually sending the message to specified Message Queues, the availability and write permission on these Queues are checked and logged in a text file.

MessageEnumerator msgEnum = srcQueue.GetMessageEnumerator2();
Message msgToSend = null;

while (msgEnum.MoveNext())
{
// iterating on messages
msgToSend = msgEnum.Current;
string id = msgToSend.Id;
foreach (Regex regex in dictRegexMsgQList.Keys)
{
 // matching the regex
 if (regex.IsMatch(msgToSend.Label))
 {
  List<MessageQueue> destinationList = dictRegexMsgQList[regex];
  if (doCheckAndSend(msgToSend, destinationList))
   // if message is sent then remove it from queue
   srcQueue.ReceiveById(id);
  }
 }
}

The log is simply implemented as a helper function as follows:

private log4net.ILog log =log4net.LogManager.GetLogger(
        System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
public void doLog(string addToLog)
{
  ...
  log.Info(addToLog);
}

The method doCheckAndSend checks the existence and write permissions of the Destination Queue, and if all is well, then sends the message (present in msgToSend) to the list of Destination Queues.

private bool doCheckAndSend(Message msg, List<MessageQueue> listDestQueue)
{
  bool sendSucceded = false;
  // checking properties of destination queues
  foreach (MessageQueue destQueue in listDestQueue)
  {
    // logging the activities of application
    doLog("Checking Existence Of Destination Queue [" + destQueue.Path + "]");
    if (MessageQueue.Exists(destQueue.Path))
      if (destQueue.CanWrite)
        ...
      else ...
    else ...
  }
   
  foreach (MessageQueue destQueue in listDestQueue)
  {
    bool queueTransactional = destQueue.Transactional;
    MessageQueueTransaction mqTr = new MessageQueueTransaction();

    if (queueTransactional)  ...
    else ...

    if (queueTransactional) mqTr.Begin();

    if (queueTransactional) destQueue.Send(msg, mqTr);
    else destQueue.Send(msg);

    if (queueTransactional) mqTr.Commit();
    sendSucceded = true;
  }
  return sendSucceded;
}

Note: To get Quality Of Service (QOS), we receive the message, and only after sending the message to the destination queue do we remove the message from the source queue. This provides us the guarantee that a message is successfully transferred, else if we would have removed the message from the queue before actually sending it to the destination queue, then it could have resulted in Consistency problems.

When a thread has transferred all of its messages, then it will wait for a predefined time specified in the XML configuration file. And, when the thread comes out of the wait period, it again starts performing routing operations.

Thread.Sleep(queueInfo_.SleepTime);

Conclusion

With theses utilities, we now have the ability to transfer the messages from any queue (source) to any queue (local or remote destination). We can aggregate as well as distribute messages based on their contents. We have the ability to deploy any number of message queues in the current as well as future scenarios. This also provides us with the flexibility to re-route messages in the event of server issues, or if we need to do load balancing for high volume queues.

Most importantly, we can now stay connected in a disconnected architecture.

License

The article above is free to use. I have not attached any files as the code is from a large project which is proprietary. The code above has been published with permission from Aquevix Solutions who owns the copyright to the code.

License

This article, along with any associated source code and files, is licensed under The Creative Commons Attribution-ShareAlike 2.5 License

Share

About the Author

Gupta_Prateek
Software Developer Aquevix Solutions
India India
No Biography provided

Comments and Discussions

 
QuestionWHERE THE FFFF IS THE CODE????!! PinmemberKalev Rebane24-May-13 18:35 
GeneralMessage transfer not guaranteed PinmemberCodeCatcher29-Aug-08 3:10 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web02 | 2.8.140921.1 | Last Updated 8 Apr 2008
Article Copyright 2008 by Gupta_Prateek
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid