|
using System;
using System.Collections.Generic;
using System.Linq;
using System.Diagnostics;
using System.Text;
using System.Threading;
namespace WCFMarketDataServer
{
//----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
/// <summary>
/// General queue object. It contains its own worked thread.
///
/// NOTE: We really need to use a priority queue.
/// </summary>
class CMDMsgQueue
{
//------------------------------------------------------------
/// <summary>
/// Internal message queue IMDMessage
/// </summary>
private Queue<IMDMessage> m_CMDMsQueue = new Queue<IMDMessage>(CMarkerConfig.g_MaxCMDMsgQueueSz);
//------------------------------------------------------------
/// <summary>
/// Instead of using inheretance, we use one delegate to process the IMDMessage.
/// </summary>
public delegate void QueueCallback(IMDMessage pIMDMessage);
private QueueCallback m_QueueCallback = null;
//------------------------------------------------------------
/// <summary>
/// Avoid flooding the log/eventvvwr with multiple messages
/// </summary>
protected CMsgTimer m_CMsgTimerQueue = new CMsgTimer(CMarkerConfig.g_MinLogMsgThresholdSec);
protected CMsgTimer m_CMsgTimer = new CMsgTimer(CMarkerConfig.g_MinLogMsgThresholdSec);
////------------------------------------------------------------
//public CMDMsgQueue()
//{
// // Start Queue processor
// new Thread(new ThreadStart(QueueProcessor)).Start();
//}
////------------------------------------------------------------
///// <summary>
///// Construct with specified callback to process messages from this queue.
///// </summary>
///// <param name="pQueueCallback"></param>
//public CMDMsgQueue(QueueCallback pQueueCallback)
//{
// m_QueueCallback = pQueueCallback;
// // Start processor
// new Thread(new ThreadStart(QueueProcessor)).Start();
//}
//------------------------------------------------------------
/// <summary>
/// Construct with specified callback to process messages from this queue.
/// </summary>
public void StartQueue(QueueCallback pQueueCallback)
{
m_QueueCallback = pQueueCallback;
// Start processor
new Thread(new ThreadStart(QueueProcessor)).Start();
}
//------------------------------------------------------------
/// <summary>
/// Receive each tick from a data sources
/// </summary>
public void PushUpdate(IMDMessage pIMDMessage)
{
// Just put in the queue
lock (m_CMDMsQueue)
{
// Did we reach maximu zise ?
if (CMarkerConfig.g_MaxCMDMsgQueueSz < m_CMDMsQueue.Count)
{
// Remove the top -- too old
m_CMDMsQueue.Dequeue();
if (m_CMsgTimerQueue.Signal())
CMarkerConfig.LogWarning(string.Format("Queue for is full. Size is {0}", m_CMDMsQueue.Count ));
}
// Queue new tick
m_CMDMsQueue.Enqueue(pIMDMessage);
} // lock ...
}
//------------------------------------------------------------
/// <summary>
/// Receive each tick from a data sources
/// </summary>
private void QueueProcessor()
{
int iQueueSize;
IMDMessage pIMDMessage = null;
try
{
// Run until we are stopping the process
while (!Program.g_TerminatingExecutableEvent.WaitOne(0, false))
{
lock (m_CMDMsQueue)
{
if (0 != (iQueueSize = m_CMDMsQueue.Count))
// Deque the enext available update
pIMDMessage = m_CMDMsQueue.Dequeue();
}
// If queue is empty, just pause.
// The data feed is expected to be busy 99.99% of the time
if (0 == iQueueSize)
{
Thread.Sleep(CMarkerConfig.g_MinCMDMsgQueuewaitMs);
continue;
}
// Display limtied log message to show some activity
if (m_CMsgTimer.Signal())
CMarkerConfig.LogInfo(string.Format("QSz: {0} - {1}", iQueueSize.ToString("000"), pIMDMessage.ToString()));
//
// Forward to registered client
Debug.Assert(m_QueueCallback != null);
try
{
m_QueueCallback(pIMDMessage);
}
catch (Exception ex)
{
CMarkerConfig.LogExcptn(ex);
}
}
}
catch (Exception ex)
{
CMarkerConfig.LogExcptn(ex);
}
CMarkerConfig.LogInfo("Terminating CDataSourceListener::QueueProcessor() thread");
}
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
20+ yrs Leading and Developing Microsoft products in the Financial Industry.
My main background is VC++, server and client development.
Currently focused in WPF/XAML, Windows 8 and Windows Azure Server technologies.