Click here to Skip to main content
15,867,306 members
Articles / General Programming / Threads

Microsoft Message Queuing – Log Trade Information using Microsoft SQL Server

Rate me:
Please Sign up or sign in to vote.
4.60/5 (7 votes)
15 Mar 2011CPOL4 min read 41.1K   769   35   10
This article shows how to create a simple trade logging server using Microsoft Message Queuing and Microsoft SQL Server

Introduction

My previous article, Microsoft Message Queuing – A simple multithreaded client and server, shows how to easily create a simple Microsoft Message Queue based solution.

My motivation for writing that article was to show how easy it can be done in response to a question posted over in the CodeProject Q&A section. I’ll admit that I was a bit surprised when it turned out that the poster still couldn’t get his solution to work.

Turns out that he was working on a solution for logging trades to a database, so I guess I’ll have to take another shot at describing how easily it actually can be done.

Obviously, this isn’t a fully-fledged trading system, but it’s still a take on the principles of how one can process market data using Microsoft Message Queuing and Microsoft SQL Server.

Screenshot of the server

Server2.png

Database

I’d like our little system to be able to deal with a reasonable subset, for this purpose, of data often required for a trading system – so we’re only going to log bids, offers, and actual trades.

So here is our table for storing bids:

SQL
CREATE TABLE Bid
(
  /* Each bid is identified by an id */
  Id uniqueidentifier not null primary key,
  /* When was the bid posted */
  TimeStamp datetime not null,
  /* Who posted the bid */
  BidderId uniqueidentifier not null,
  /* What are we bidding for */
  InstrumentId uniqueidentifier not null,
  /* Under what condition will the bidder accept an offer  */
  /* Usually there is a limited set of standard conditions */
  BidConditionsId uniqueidentifier not null,
  /* Unit price */
  Value float not null,
  /* Number of units */
  Volume float not null 
)
go

Our table for storing offers:

SQL
CREATE TABLE Offer
(
  /* Each offer is identified by an id */
  Id uniqueidentifier not null primary key,
  /* When was the offer posted */
  TimeStamp datetime not null,
  /* Who posted the offer */
  SellerId uniqueidentifier not null,
  /* Under what condition will the seller accept a bid */
  /* Usually there is a limited set of standard conditions */
  SaleConditionsId uniqueidentifier not null,
  /* What's being offered */
  InstrumentId uniqueidentifier not null,
  /* Unit price */
  Value float not null,
  /* Number of units */
  Volume float not null 
)
go

And finally our trade table:

SQL
CREATE TABLE Trade
(
  /* Each trade is identified by an id */
  Id uniqueidentifier not null primary key,
  /* When was the trade done */
  TimeStamp datetime not null,
  /* Who bought */
  SellerId uniqueidentifier not null,
  /* Who sold */
  BuyerId uniqueidentifier not null,
  /* Under what agreement was the trade made */
  TradeAgreementId uniqueidentifier not null,
  /* The instrument, goods, ... */
  InstrumentId uniqueidentifier not null,
  /* Unit price */
  Value float not null,
  /* Number of units */
  Volume float not null 
)
go

The tables can be created using the SQL\CreateDataTables.sql script from the Harlinn.Messaging.Server2 project - it's included with the solution source code.

Inserting Data

The following little query is one I find useful when writing code to perform operations against the database – this time it lists the column names of the Offer table.

SQL
select cols.name from sys.all_columns cols
join sys.tables t ON (t.object_id = cols.object_id)
where t.name = 'Offer'

As a certain level of performance is usually required for this kind of solution, we will work directly with SqlConnection and SqlCommand – and the script above keeps me from making too many blunders while creating the code.

C#
public class Offer
{
 public const string INSERT_STATEMENT = 
 "INSERT INTO OFFER(Id,TimeStamp,SellerId,SaleConditionsId,InstrumentId,Value,Volume)"+
 " VALUES(@id,@timeStamp,@sellerId,@saleConditionsId,@instrumentId,@value,@volume)";

 public static void Insert(SqlConnection connection, PayloadOffer offer)
 {
  SqlCommand command = connection.CreateCommand();
  using (command)
  {
   command.CommandText = INSERT_STATEMENT;
   command.Parameters.Add("@id", SqlDbType.UniqueIdentifier).Value = offer.Id;
   command.Parameters.Add("@timeStamp", SqlDbType.DateTime).Value = offer.TimeStamp;
   command.Parameters.Add("@sellerId", SqlDbType.UniqueIdentifier).Value = offer.SellerId;
   command.Parameters.Add("@instrumentId", 
	SqlDbType.UniqueIdentifier).Value = offer.InstrumentId;
   command.Parameters.Add("@saleConditionsId", 
	SqlDbType.UniqueIdentifier).Value = offer.SalesConditionsId;
   command.Parameters.Add("@value", SqlDbType.Float).Value = offer.Value;
   command.Parameters.Add("@volume", SqlDbType.Float).Value = offer.Volume;

   command.ExecuteNonQuery();
  }
 }
}

As you see, working directly with SqlConnection and SqlCommand isn’t all that bad, maybe something to think about next time you’re tempted to take the entity framework for a spin.

As the code is quite similar for the Bid and Trade tables, I will skip going through those.

The Payload

The data we are going to send to the server is based on a simple class hierarchy consisting of PayloadBase, PayloadBid, PayloadOffer and PayloadTrade. PayloadBid, PayloadOffer and PayloadTrade is derived from PayloadBase.

C#
[Serializable]
public abstract class PayloadBase
{
 private Guid id;
 private DateTime timeStamp;

 public PayloadBase()
 { 
 }

 public abstract PayloadType PayloadType
 {
  get;
 }
 // code removed 
}

PayloadType is used to discriminate between PayloadTrade, PayloadBid and PayloadOffer – while we could have used the is operator I tend to feel that using a discriminator makes the code more readable.

Processing the Messages

Just like in the previous article, we will process incoming messages asynchronously, one at a time, using the OnReceiveCompleted method:

C#
private void OnReceiveCompleted(Object source, ReceiveCompletedEventArgs asyncResult)
{
 try
 {
  MessageQueue mq = (MessageQueue)source;

  if (mq != null)
  {
   try
   {
    System.Messaging.Message message = null;
    try
    {
     message = mq.EndReceive(asyncResult.AsyncResult);
    }
    catch (Exception ex)
    {
     LogMessage(ex.Message);
    }
    if (message != null)
    {
     PayloadBase payload = message.Body as PayloadBase;
     if (payload != null)
     {
      if (receivedCounter == 0)
      {
       firstMessageReceived = DateTime.Now;
      }
     receivedCounter++;
     if ((receivedCounter % 10000) == 0)
     {
      TimeSpan ts = DateTime.Now - firstMessageReceived;
      string messageText = string.Format
	("Received {0} messages in {1}", receivedCounter, ts);
      LogMessage(messageText);
     }

     try
     {
      switch (payload.PayloadType)
      {
       case PayloadType.Bid:
        {
        PayloadBid bid = (PayloadBid)payload;
        DB.Bid.Insert(sqlConnection, bid);
        }
        break;
       case PayloadType.Offer:
        {
        PayloadOffer offer = (PayloadOffer)payload;
        DB.Offer.Insert(sqlConnection, offer);
        }
        break;
       case PayloadType.Trade:
        {
         PayloadTrade trade = (PayloadTrade)payload;
         DB.Trade.Insert(sqlConnection, trade);
        }
        break;
       }
      }
      catch (Exception e)
      {
       if (isRunning)
       {
        LogMessage(e.Message);
       }
      }
     }
    }
   }
   finally
   {
    if (isRunning)
    {
     mq.BeginReceive();
    }
   }
  }
  return;
 }
 catch (Exception exc)
 {
  if (isRunning)
  {
   LogMessage(exc.Message);
  }
 }
}

If you are not familiar with how binary serialization works, you may be surprised by how simple it is to access the data passed on the queue – as you see, I just cast the retrieved payload to the base class using:

C#
PayloadBase payload = message.Body as PayloadBase; 

If I understood the posting that prompted me to write this article – this wasn’t quite as obvious I initially believed it to be. Anyway – we are now ready to store our data using our simple, but rather efficient DB.<type>.Insert methods – as shown in the switch above.

The Message Generator

client2.png

To take our system for a spin, we need a message generating utility, and it’s nearly identical to the one presented in the previous article, except for the SendMessages method:

C#
private void SendMessages(int count)
{
 Random random = new Random(count);
 string message = string.Format("Sending {0} messages", count);
 LogMessage(message);
 DateTime start = DateTime.Now;
 for (int i = 0; i < count; i++)
 {
  PayloadType payloadType = (PayloadType)(i % 3);
  PayloadBase payload = null;

  switch (payloadType)
  {
   case PayloadType.Bid:
    {
     PayloadBid bid = new PayloadBid();
     bid.Initialize();
     payload = bid;
    }
    break;
   case PayloadType.Offer:
    {
     PayloadOffer offer = new PayloadOffer();
     offer.Initialize();
     payload = offer;
     }
     break;
   case PayloadType.Trade:
    {
     PayloadTrade trade = new PayloadTrade();
     trade.Initialize();
     payload = trade;
    }
    break;
  }

  messageQueue.Send(payload);
 }
 DateTime end = DateTime.Now;
 TimeSpan ts = end - start;
 message = string.Format("{0} messages sent in {1}", count, ts);
 LogMessage(message);
}

We just let the binary formatter do its job – it sends the correct information, even if messageQueue.Send(payload); gets a reference to an object declared as a PayloadBase. As long as the class of the object is declared as [Serializable], the object will be serialized correctly.

Conclusion

With a more comprehensive example at your disposal, I hope I’ve succeeded in illustrating how to create a simple, yet efficient solution based on Microsoft Message Queuing capable of dealing with “real” data loads.

It still has ample room for optimization – the obvious one is to process more than one message at a time, and another is to use multiple paired MessageQueue components and SqlConnections to dequeue messages in parallel.

The main purpose has been to keep the solution simple – while still doing something useful.

Best regards,
Espen Harlinn

History

  • 15th March, 2011: Initial posting

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect Sea Surveillance AS
Norway Norway
Chief Architect - Sea Surveillance AS.

Specializing in integrated operations and high performance computing solutions.

I’ve been fooling around with computers since the early eighties, I’ve even done work on CP/M and MP/M.

Wrote my first “real” program on a BBC micro model B based on a series in a magazine at that time. It was fun and I got hooked on this thing called programming ...

A few Highlights:

  • High performance application server development
  • Model Driven Architecture and Code generators
  • Real-Time Distributed Solutions
  • C, C++, C#, Java, TSQL, PL/SQL, Delphi, ActionScript, Perl, Rexx
  • Microsoft SQL Server, Oracle RDBMS, IBM DB2, PostGreSQL
  • AMQP, Apache qpid, RabbitMQ, Microsoft Message Queuing, IBM WebSphereMQ, Oracle TuxidoMQ
  • Oracle WebLogic, IBM WebSphere
  • Corba, COM, DCE, WCF
  • AspenTech InfoPlus.21(IP21), OsiSoft PI


More information about what I do for a living can be found at: harlinn.com or LinkedIn

You can contact me at espen@harlinn.no

Comments and Discussions

 
GeneralMy vote of 5 Pin
JF201514-Mar-11 21:00
JF201514-Mar-11 21:00 
GeneralRe: My vote of 5 Pin
Espen Harlinn14-Mar-11 22:12
professionalEspen Harlinn14-Mar-11 22:12 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.