Click here to Skip to main content
Click here to Skip to main content

Microsoft Message Queuing – A Simple Multithreaded Client and Server

, 15 Mar 2011
Rate this:
Please Sign up or sign in to vote.
A simple Message Queuing client and server - illustrating how to create a solution capable of handling 500 000 messages in three minutes

Introduction

This article shows how to create a very simple client-server solution using Microsoft Message Queuing – seems like there is a need for something very simple, just the basic stuff to get you up and running with a moderately performing solution.

This solution can easily handle 500 000 moderately complex messages in three minutes.

Microsoft Message Queuing – Log Trade information using Microsoft SQL Server takes this solution a few steps further - by illustrating an approach for saving incoming data to Microsoft SQL Server.

Screenshot of the Microsoft Message Queuing client

client.png

Background

Over in the Q&A section, a question was posted requesting help with a solution that needed to be capable of handling 1500 messages each minute. 1500 messages shouldn’t be a problem – but a simple answer on how to achieve this is still somewhat more complex than what I’d like to post as a reply to a question. I searched a bit around CodeProject, and to my surprise I didn’t find an existing article illustrating the approach I had in mind.

Screenshot of the Microsoft Message Queuing server

Server.png

Usually, the server part would be implemented as a Windows service.

A Brief Look at Some of the Code

Objects of the Payload class is what we are going to send from the client to the server. The class is simple, but not overly so.

[Serializable]
public class Payload
{
 private Guid id;
 private string text;
 private DateTime timeStamp;
 private byte[] buffer;

 public Payload()
 {
 }
 public Payload(string text, byte bufferFillValue, int bufferSize )
 {
  id = Guid.NewGuid();
  this.text = text;
  timeStamp = DateTime.UtcNow;
  buffer = new byte[bufferSize];
  for (int i = 0; i < bufferSize; i++)
  {
   buffer[i] = bufferFillValue;
  }
 }
 // The rest is just properties exposing the fields
}

Calling the UI Thread

Most of the interesting stuff happens in worker threads. To interact with components in the UI thread, we use the familiar InvokeRequired/Invoke pattern.

private delegate void LogMessageDelegate(string text);
private void LogMessage(string text)
{
 if (InvokeRequired)
 {
  Invoke(new LogMessageDelegate(LogMessage), text);
 }
 else
 {
  // Don't do this in a deployment scenario - use something like
  // Log4Net. This is here just to illustrate interaction with 
  // the UI thread.
  messageTextBox.AppendText(text + Environment.NewLine);
 }
}

Initializing the Message Queue on the Server

In this example, we use the BinaryMessageFormatter, and rely on the .NET serialization mechanisms – you’ll get a significant speed improvement by rolling your own serialization/deserialization code working directly on byte arrays.

On the server, I like to use asynchronous handling of incoming messages. We assign an event handling method to the ReceiveCompleted event, and call BeginReceive() to tell the message queue component to start processing messages and call our OnReceiveCompleted event handler.

private void InitializeQueue()
{
 receivedCounter = 0;
 string queuePath = Constants.QueueName;
 if (!MessageQueue.Exists(queuePath))
 {
  messageQueue = MessageQueue.Create(queuePath);
 }
 else
 {
  messageQueue = new MessageQueue(queuePath);
 }
 isRunning = true;
 messageQueue.Formatter = new BinaryMessageFormatter();
 messageQueue.ReceiveCompleted += OnReceiveCompleted;
 messageQueue.BeginReceive();
}

Receiving Messages

Below, you’ll see our OnReceiveCompleted method that handles incoming messages.

It’s fairly simple, and doesn’t do anything useful with the incoming messages except deserialization, and logging every 10 000th message to the UI thread. The really important point is to call BeginReceive() to tell the message queue component that we are ready to receive the next message; if we forget this - our server will only handle a single message, not exactly what we had in mind.

private void OnReceiveCompleted(Object source, 
          ReceiveCompletedEventArgs asyncResult)
{
 try
 {
  MessageQueue mq = (MessageQueue)source;

  if (mq != null)
  {
   try
   {
    System.Messaging.Message message = null;
    try
    {
     message = mq.EndReceive(asyncResult.AsyncResult);
    }
    catch (Exception ex)
    {
     LogMessage(ex.Message);
    }
    if (message != null)
    {
     Payload payload = message.Body as Payload;
     if (payload != null)
     {
      receivedCounter++;
      if ((receivedCounter % 10000) == 0)
      {
       string messageText = 
           string.Format("Received {0} messages", receivedCounter);
       LogMessage(messageText);
      }
     }
    }
   }
   finally
   {
    if (isRunning)
    {
     mq.BeginReceive();
    }
   }
  }
 return;
 }
 catch (Exception exc)
 {
  LogMessage(exc.Message);
 }
}

Initializing the Message Queue on the Client

Our client side message queue initialization is even simpler than the server side initialization. Just remember to use the same formatter on both ends – in this case, BinaryMessageFormatter.

private void InitializeQueue()
{
 string queuePath = Constants.QueueName;
 if (!MessageQueue.Exists(queuePath))
 {
  messageQueue = MessageQueue.Create(queuePath);
 }
 else
 {
  messageQueue = new MessageQueue(queuePath);
 }
 messageQueue.Formatter = new BinaryMessageFormatter();
}

Sending Messages

If you want to use multiple threads for sending messages, you need to initialize multiple MessageQueue components. In this example, we use a single MessageQueue component, and disable and enable the “Send Messages” button to keep the user from queuing messages from several worker threads at the same time. EnableSend() is called from the worker thread to enable the “Send Messages” button when the required number of messages has been sent.

private delegate void EnableSendDelegate();
private void EnableSend()
{
 if (InvokeRequired)
 {
  Invoke(new EnableSendDelegate(EnableSend));
 }
 else
 {
  sendButton.Enabled = true;
 }
}

Here is what we do to actually send a message, if you remove the code for measuring the performance of the function - there really isn't much left. As long as the object is serializable by .NET using the assigned formatter, all we need to do is to call Send passing an instance of our payload object. It doesn't get much simpler than this. Smile | :)

private void SendMessages(int count)
{
 Random random = new Random(count);
 string message = string.Format("Sending {0} messages",count);
 LogMessage(message);
 DateTime start = DateTime.Now;
 for (int i = 0; i < count; i++)
 {
  byte b = Convert.ToByte( random.Next(128) );
  int size = random.Next(1024);
  string text = string.Format("Message: Fill {0} {1}",b,size);
  Payload payload = new Payload(text, b, size);
  messageQueue.Send(payload);
 }
 DateTime end = DateTime.Now;
 TimeSpan ts = end - start;
 message = string.Format("{0} messages sent in {1}", count, ts);
 LogMessage(message);
}

Maintain a Responsive UI

Having our UI freeze up while we are dispatching the required number of messages makes users unhappy. By leveraging System.Threading.ThreadPool.QueueUserWorkItem - we keep our UI responsive in a simple and straightforward manner – again without exactly exerting ourselves. Smile | :)

private void AsychSendMessages(object countAsObject)
{
 int count = (int)countAsObject;
 SendMessages(count);
 EnableSend();
}

private void sendButton_Click(object sender, EventArgs e)
{
 sendButton.Enabled = false;
 int count = Convert.ToInt32( messageCountNumericUpDown.Value );
 System.Threading.ThreadPool.QueueUserWorkItem(
     new System.Threading.WaitCallback(AsychSendMessages), count);
}

Conclusion

Client/server development doesn’t get much easier than this – and with a few tweaks, the performance can be significantly improved. FastSerializer represents a ready to use, high performance serialization/deserialization solution.

So in the hope that it may prove useful: play around with it.

Microsoft Message Queuing is both efficient and very easy to use for .NET developers. Obviously, this article has just scratched the surface of the technology - but given its performance, I'm pretty sure some of you will find it highly satisfactory.

Best regards,
Espen Harlinn

History

  • 12th March, 2011 - Initial posting

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Espen Harlinn
Architect Powel AS
Norway Norway
Chief Architect - Powel AS.
 
Specializing in integrated operations and high performance computing solutions.
 
I’ve been fooling around with computers since the early eighties, I’ve even done work on CP/M and MP/M.
 
Wrote my first “real” program on a BBC micro model B based on a series in a magazine at that time. It was fun and I got hooked on this thing called programming ...
 
A few Highlights:
  • High performance application server development
  • Model Driven Architecture and Code generators
  • Real-Time Distributed Solutions
  • C, C++, C#, Java, TSQL, PL/SQL, Delphi, ActionScript, Perl, Rexx
  • Microsoft SQL Server, Oracle RDBMS, IBM DB2, PostGreSQL
  • AMQP, Apache qpid, RabbitMQ, Microsoft Message Queuing, IBM WebSphereMQ, Oracle TuxidoMQ
  • Oracle WebLogic, IBM WebSphere
  • Corba, COM, DCE, WCF
  • AspenTech InfoPlus.21(IP21), OsiSoft PI
 
More information about what I do for a living can be found at: harlinn.com or LinkedIn
 
You can contact me at espen.harlinn@powel.no

Comments and Discussions

 
QuestionMsmq java PinmemberAsha Arvind29-May-12 20:37 
AnswerRe: Msmq java PinmvpEspen Harlinn29-May-12 22:12 
GeneralMy vote of 5 PinmemberMika Wendelius5-Jan-12 11:54 
GeneralRe: My vote of 5 PinmvpEspen Harlinn28-Apr-12 4:26 
GeneralRe: My vote of 5 PinmvpMika Wendelius29-Apr-12 6:19 
GeneralRe: My vote of 5 PinmvpEspen Harlinn29-Apr-12 6:41 
QuestionTiny code fix PinmemberSAKryukov7-Jul-11 15:05 
Just a tiny code fix for now: public Payload() constructor has an empty body. If it's really empty — remove it, if not — mark with /*...*/.
Empty constructor body is bad, makes no sense, will be caught by FxCop…
 
(Only a private empty-body constructor may have a purpose, to suppress the use of default constructor.)
 
Thank you.
— SA

Sergey A Kryukov

GeneralMy vote of 5 Pinmvpthatraja12-Jun-11 4:56 
GeneralMy vote of 5 PinmemberMoshe Ventura2-Apr-11 20:31 
GeneralRe: My vote of 5 PinmemberEspen Harlinn3-Apr-11 12:26 
GeneralMy vote of 5 PinmemberWin32nipuh26-Mar-11 2:01 
GeneralGood article, some questions (vote=5) [modified] PinmemberWin32nipuh26-Mar-11 1:39 
GeneralRe: Good article, some questions (vote=5) PinmemberEspen Harlinn27-Mar-11 4:34 
GeneralMy vote of 5 PinmemberNuri Ismail15-Mar-11 23:55 
GeneralRe: My vote of 5 PinmemberEspen Harlinn16-Mar-11 1:23 
GeneralMy vote of 5 PinmemberKendoTM14-Mar-11 23:40 
GeneralRe: My vote of 5 PinmemberEspen Harlinn15-Mar-11 1:26 
GeneralMy vote of 5 PinmemberJF201514-Mar-11 20:25 
GeneralRe: My vote of 5 PinmemberEspen Harlinn14-Mar-11 22:26 
QuestionComparative Benchmarks? Pinmembersam.hill13-Mar-11 14:05 
AnswerRe: Comparative Benchmarks? PinmemberEspen Harlinn13-Mar-11 21:46 
GeneralRe: Comparative Benchmarks? Pinmembersam.hill14-Mar-11 16:17 
GeneralMy vote of 5 PinmemberAli Aboutalebi13-Mar-11 11:55 
GeneralRe: My vote of 5 PinmemberEspen Harlinn13-Mar-11 11:58 
GeneralThe Client project not in zip file PinmemberTassosK13-Mar-11 2:03 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web04 | 2.8.140827.1 | Last Updated 15 Mar 2011
Article Copyright 2011 by Espen Harlinn
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid