Click here to Skip to main content
15,886,518 members
Articles / Programming Languages / C# 4.0

An Introduction to Real-Time Stock Market Data Processing

Rate me:
Please Sign up or sign in to vote.
4.96/5 (64 votes)
20 May 2013CPOL24 min read 332.6K   57.7K   202  
Discusses how stock market trading works, the different types of market data available, and provides a code example with sample data that processes a market data feed
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SparkAPI.Data
{

    /// <summary>
    /// Creates a file stream reader for the specified event file and then raises each event as it is receieved. Supports 
    /// pausing, resuming and stepping forward through an event replay.
    /// </summary>
    /// <remarks>
    /// The event file is streamed (read one line at a time) rather than read in entirity to reduce the memory footprint
    /// and increase the speed at which the replay begins.
    /// 
    /// This class inherits the ApiEventFeedBase class and raises the  events though its RaiseEvent() method.
    /// </remarks>
    public class ApiEventFeedThreadedReplay : ApiEventFeedBase
    {

        /// <summary>Event feed file name</summary>
        internal string FileName { get; private set; }

        /// <summary>Specifies whether the replay is currently paused</summary>
        public bool IsPaused { get; private set; }

        /// <summary>Maximum event queue size reached during processing</summary>
        public int MaxQueueDepthReached { get; private set; }

        /// <summary>Specifies whether the replay should step forward one event</summary>
        private bool _isSteppingForward;

        /// <summary>Stores latest observed market time</summary>
        private DateTime _latestMarketTime = DateTime.MinValue;

        /// <summary>Concurrent event queue for transferring events between producer and consumer</summary>
        private BlockingCollection<Spark.Event> _eventQueue;

        /// <summary>
        /// Spark event recieved handler
        /// </summary>
        /// <param name="eventItem"></param>
        internal delegate void EventReceivedHandler(Spark.Event eventItem);

        /// <summary>
        /// ApiEventFeedReplay constructor
        /// </summary>
        /// <param name="fileName">Event feed file name</param>
        public ApiEventFeedThreadedReplay(string fileName)
        {
            FileName = fileName;
            IgnoreQuoteEvents = false;
        }

        /// <summary>
        /// Initiate data feed
        /// </summary>
        public override void Execute()
        {
            
            //Initiate reader task
            _eventQueue = new BlockingCollection<Spark.Event>();
            var readerTask = new Task(ExecuteEventReader); 
            readerTask.Start();

            //Initiate processor task
            var processor = new Task(ExecuteEventProcessor);
            processor.Start();

            //Wait until reading events is complete and mark as finished
            readerTask.Wait();
            _eventQueue.CompleteAdding();

            //Wait until event processing is complete
            processor.Wait();

        }

        /// <summary>
        /// Initiate event reader
        /// </summary>
        private void ExecuteEventReader()
        {
            ApiEventReaderWriter reader = new ApiEventReaderWriter();
            reader.StreamFromFile(FileName, EventRecieved);
        }

        /// <summary>
        /// Initiate event processor
        /// </summary>
        private void ExecuteEventProcessor()
        {
            foreach (Spark.Event eventItem in _eventQueue.GetConsumingEnumerable())
            {
                ProcessEvent(eventItem);
                int currentQueueDepth = _eventQueue.Count;
                if (currentQueueDepth > MaxQueueDepthReached) MaxQueueDepthReached = currentQueueDepth;
            }
        }

        /// <summary>
        /// Pause replay
        /// </summary>
        public void Pause()
        {
            IsPaused = true;
        }

        /// <summary>
        /// Resume replay
        /// </summary>
        public void Resume()
        {
            IsPaused = false;
        }

        /// <summary>
        /// Step forward one event
        /// </summary>
        public void StepForward()
        {
            _isSteppingForward = true;
        }

        /// <summary>
        /// Fired each time an event is read from file
        /// </summary>
        /// <param name="eventItem"></param>
        private void EventRecieved(Spark.Event eventItem)
        {
            _eventQueue.Add(eventItem);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="eventItem"></param>
        private void ProcessEvent(Spark.Event eventItem)
        {

            //Skip over waiting logic if quote event and ignoring quotes
            if ((IgnoreQuoteEvents) && (eventItem.Type == Spark.EVENT_QUOTE)) return;

            //Raise event
            DateTime sparkEventTime = ApiFunctions.DateTimeFromUnixTimestampSeconds(eventItem.Time);
            if (sparkEventTime > _latestMarketTime) _latestMarketTime = sparkEventTime;
            RaiseEvent(new EventFeedArgs(eventItem, _latestMarketTime));

            //Wait before playing next event while replay is pause
            while ((IsPaused) && (!_isSteppingForward))
            {
                System.Threading.Thread.Sleep(100);
            }
            if (_isSteppingForward) _isSteppingForward = false;

        }

    }

}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Australia Australia
Paul Francis currently works as a senior engineer at The Trade Desk.

He holds an undergraduate Honours degree in Finance, and is near completion of a Ph.D. in Market Microstructure, specialising in order flow modelling, and market data processing, reconstruction and analytics.

He is also the creator of Sharp Spark (Spark API SDK), an open source component designed to facilitate the processing of real-time market data from the Spark API: http://sourceforge.net/projects/sparkapi

Paul lives in Sydney, Australia.

Comments and Discussions