Click here to Skip to main content
15,896,417 members
Please Sign up or sign in to vote.
0.00/5 (No votes)
See more:
Hi,
I'm building a multi-threaded app that's using the producer consumer queue threading logic...I have a real-time stockmarket data-feed that's pushing data into the queue which the worker thread then processes (inserts and updates database tables). The problem is towards the end of the stockmarket session the volume becomes VERY heavy and the queue builds at a faster rate than the worker thread can process causing latency with the inserts by 2 or 3 minutes!

Does anyone have any solutions or suggestions on how to flush the entire queue into the table vs. individual inserts or some other way to resolve this?

By the way, I'm using MySQL 5.5 with .net connector (API) and C# 4.0

Thanks,
Donald
Posted
Updated 25-Feb-11 18:20pm
v8

MySQL permits an insert to insert multiple records:
INSERT INTO querty(field1, field2, field3) VALUES
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' '),
(null, ' ', ' ');


If you're smart you'll use a separate table for each ticker - provides for more efficient storage utilization and faster inserts. but don't use the ticker name as the table name - remember that both the name and the ISIN number is subject to change - something you usually need to keep track of.

Using separate tables for each security will also allow you to split the load between several computers.

I guess you need at least the following data in each table

SQL
CREATE TABLE
( 
/* bidders id is included with the bid registration - this order may require several transactions */
BidId Integer, /* at least a 64 bit integer */
/* sellers id is included with the bid registration - this order may require several transactions */
OfferId Integer, /* at least a 64 bit integer */
TradeType Integer, /* Trade agreement type */
Price decimal,
Volume decimal
)


Well Donald - it's getting interesting, you're right, it's a lot of data, and if you are doing it right it's an astounding amount of data - good luck :)

If you're not keeping track of bids and offers, their sizes and conditions, you are loosing information that is often valuable to traders. Personally I'd like the whole order book for analysis since this may be used to either implement or counter automatic trading systems, or detect other interesting market conditions.

Update
The System.IO.Log[^] namespace defines an interface for logging to a record-oriented sequential I/O system. Using the classes from this namespace, you can implement your own transaction processing system using the Common Log File System (CLFS) provided by ws2003r2 and Windows Vista.

Another simple alternative is to use H2Sharp[^] with h2[^] - as I think you will find the performaance[^] more to your liking. Use memory tables for high perfomance - and use MSMQ to enque information to another process that will persist the information in the rdbms of your choice (MySQL).

I've used a similar approach, implemented in java, and it works beautifully. Running H2 as an embedded engine allows you to quite easily deal with 50 000 statements per second - so I guess performance will not be an issue. In my solution external .Net processes was able to use the Postgesql ODBC driver to communicate with the embedded server.

BTW: Serialization in .Net can be a real performance killer - but here is a very good solution[^]

Use binary formatter with MSMQ and use FastSerializer, or roll your own, to pass the information as byte[] to MSMQ.

If you are not comfortable with H2, there is always Oracle TimesTen[^] - but I've got no idea about the performance or price - but if it isn't way faster than the Oracle RDBMS it's kind of pointless :)

Update 2
Microsoft Message Queuing – Log Trade information using Microsoft SQL Server[^] - you may find this interesting, given reasonable hardware it should solve your problems :)

Regards
Espen Harlinn
 
Share this answer
 
v5
Comments
d.allen101 25-Feb-11 18:40pm    
hey what's up espen! I'm not clear on how to implement the solution you're suggesting. Do I write an arbitrary number of 'VALUES (null,'','')'? and I wouldn't know how many items in the queue and it wouldn't be constant. Can you give me an example and a little more explanation on how to go about it? Thanks! Donald
fjdiewornncalwe 25-Feb-11 19:04pm    
Hey Donald, Espen is suggesting to use "bulk insert" functionality as I was going to. It would mean that in your application, you cache up to 10, 20, or 100 for that matter, records coming in from your feeds and then write them all to the database at the same time when the cache is full or when the thread decides to flush its queue. That way you remove a ton of latency from the database write process which then allows you to process more in a given period of time.
I'm not sure how good this article is but check out http://metabetageek.com/2010/02/04/learning-mysql-find-in-set-and-bulk-insert-options/.
Espen Harlinn 26-Feb-11 4:51am    
Donald, are you working on a real trading system, or just distributing information on a “best effort” basis where nobody is going to get pissed if data gets lost?

The requirements for a real trading system is vastly more complicated, as your system *must* be able to recover from failure at all times. In that case you need both redundancy and transacted persistence for just about everything.
d.allen101 25-Feb-11 21:20pm    
thanks marcus, I'm going to check out the article...
d.allen101 26-Feb-11 6:49am    
hey espen, yes I'm working on a realTime trading system that's going to be used by traders...but at this point its no where near ready for distribution
C#
Thread thrd;
EventWaitHandle wh = new AutoResetEvent(false);
Queue<string> dataQueue = new Queue<string>();
object locker = new object();
MySQLConnection con;
MySQLCommand cmd;

private void button1_Click(object o, EventArgs e)
{
  con = new MySQLConnection (conString);
  cmd = new MySQLCommand();
  con.Open();
  cmd.Connection = con;

  thrd = new Thread(new ThreadStart(worker));
  thrd.Start();
}
// data feed event handler
private void (object o, QuoteFeedArg quoteArg)
{
  lock(locker)
  {
    dataQueue.Dequeue(quoteArg.QuoteString);
  }
  wh.Set();
}
private void worker()
{
  while(true)
  {
    List<string> data = new List<string>();
    int length = 0;
    
    lock(locker)
    {
      if(dataQueue.Count > 0)
      {
        data.AddRange(dataQueue);
        length = dataQueue.Count;
        dataQueue.Clear();
      }
   }

   if(data.Count > 0)
   {
     for(int i = 0; i < length; i++)
     {
       List<string> quotes = new List<string>(data[i].Split(',')); // quoteArg is in comma delimited format
       
       string stkSymbol = quotes[1];
       double bid = quotes[3];
       double ask = quotes[4];
       double last = quotes[5];
       // etc...
      
       cmd.CommandText += "INSERT INTO table_ " + stkSymbol + "(symbol,bid,ask,last) VALUES(" + stkSymbol + "," + bid + "," + ask + "," + last + etc...)";
     }
     cmd.ExecuteNonQuery();
   }
   wh.WaitOne();
}

</string></string>
 
Share this answer
 
v3

This content, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)



CodeProject, 20 Bay Street, 11th Floor Toronto, Ontario, Canada M5J 2N8 +1 (416) 849-8900