Click here to Skip to main content
15,886,578 members
Articles / Programming Languages / C#

The Super Pool Framework

Rate me:
Please Sign up or sign in to vote.
4.87/5 (53 votes)
31 Aug 2010CPOL26 min read 100.8K   1.5K   178  
The Super Pool is a framework for decoupled communication and management of components. The Super Pool introduces a natural asynchronous communication environment into your solution that can be fluently spread over different components, threads, processes, or even computers or networks.
// -----
// Copyright 2010 Deyan Timnev
// This file is part of the Matrix Platform (www.matrixplatform.com).
// The Matrix Platform is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, 
// either version 3 of the License, or (at your option) any later version. The Matrix Platform is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; 
// without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
// You should have received a copy of the GNU Lesser General Public License along with the Matrix Platform. If not, see http://www.gnu.org/licenses/lgpl.html
// -----
using System;
using System.Collections.Generic;
using System.Text;
using System.Net.Sockets;
using System.IO;
using System.Threading;
using System.Net;
using Matrix.Common.Core.Serialization;
using Matrix.Common.Core;

#if Matrix_Diagnostics
    using Matrix.Common.Diagnostics;
#endif

namespace Matrix.Common.Sockets.Common
{
    /// <summary>
    /// Class wraps the complexities of working with a async socket.
    /// </summary>
    public class SocketCommunicator : IDisposable
    {
        public const int InvalidSendIndex = -1;

        protected object _syncRoot = new object();

        int _id = -1;
        /// <summary>
        /// Applied in the server client connections management.
        /// </summary>
        public int Id
        {
            get { return _id; }
            set { _id = value; }
        }

        /// <summary>
        /// Establish the end point routine. Default behaviour serves server and client mode,
        /// but will only report properly when connected.
        /// </summary>
        public virtual EndPoint EndPoint
        {
            get 
            {
                System.Net.Sockets.Socket socket = _socket;
                if (socket != null && socket.Connected)
                {
                    try
                    {
                        return socket.RemoteEndPoint;
                    }
                    catch (Exception ex)
                    {
#if Matrix_Diagnostics
                        Monitor.OperationError("Failed to establish end point", ex);
#endif
                    }
                }

                return null;
            }
        }

        long _pendingSendId = 0;
        public long PendingSendId
        {
            get { return Interlocked.Increment(ref _pendingSendId); }
        }

#if Matrix_Diagnostics
        readonly InstanceMonitor _monitor;
        /// <summary>
        /// 
        /// </summary>
        public InstanceMonitor Monitor
        {
            get { return _monitor; }
        }
#endif

        ISerializer _serializer;

        /// <summary>
        /// Receive buffers.
        /// </summary>
        byte[] _receiveBuffer = new byte[32 * 1024];

        protected System.Net.Sockets.Socket _socket = null;
        
        public bool IsConnected
        {
            get
            {
                System.Net.Sockets.Socket socket = _socket;
                if (socket != null)
                {
                    return socket.Connected;
                }

                return false;
            }
        }

        /// <summary>
        /// We use these 2 streams and constantly swap between them, in order to reuse memory
        /// and not recreate a mem stream each time a buffer comes in.
        /// </summary>
        MemoryStream _pendingReceiveStream = new MemoryStream();
        MemoryStream _pendingReceiveStreamSecondary = new MemoryStream();

        /// <summary>
        /// We use this to evade any possible scenario, where 2 of those get assigned at the same
        /// time (since that causes total destruction, and buffer mixups).
        /// </summary>
        SocketAsyncEventArgs _lastReceiveArgs = null;

        /// <summary>
        /// Class stores information on sending a message.
        /// </summary>
        public class AsyncMessageSendInfo : EventArgs, IDisposable
        {
            public long Id { get; set; }
            public System.Net.Sockets.Socket Socket { get; set; }
            public object UserToken { get; set; }
            public object Message { get; set; }
            public MemoryStream Stream { get; set; }

            public ManualResetEvent ConfirmationEvent { get; set; }

            public void Dispose()
            {
                Message = null;
                UserToken = null;
                Socket = null;

                ManualResetEvent confirmationEvent = ConfirmationEvent;
                ConfirmationEvent = null;
                if (confirmationEvent != null)
                {
                    try
                    {
                        confirmationEvent.Close();
                    }
                    catch (Exception ex)
                    {
#if Matrix_Diagnostics
                        SystemMonitor.Error("Failed to close event", ex);
#endif
                    }
                }

                MemoryStream stream = Stream;
                if (stream != null)
                {
                    stream.Dispose();
                }
                Stream = null;
            }
        }

        public delegate void MessageUpdateDelegate(SocketCommunicator helper, object message);
        public delegate void HelperUpdateDelegate(SocketCommunicator helper);
        public delegate void AsyncMessageSendDelegate(SocketCommunicator helper, AsyncMessageSendInfo info);

        public event AsyncMessageSendDelegate SendAsyncCompleteEvent;
        public event MessageUpdateDelegate MessageReceivedEvent;

        public event HelperUpdateDelegate ConnectedEvent;
        public event HelperUpdateDelegate DisconnectedEvent;

        /// <summary>
        /// Constructor, when socket already connected.
        /// </summary>
        /// <param name="messageSerializer"></param>
        /// <param name="socket"></param>
        public SocketCommunicator(ISerializer serializer)
        {
#if Matrix_Diagnostics
            _monitor = new InstanceMonitor(this);
            Monitor.Info("Socket communicator created.");
#endif

            _serializer = serializer;
        }

        public virtual void Dispose()
        {
#if Matrix_Diagnostics
            Monitor.Info("Socket communicator diposed."); 
#endif
            System.Net.Sockets.Socket socket = _socket;
            _socket = null;
            if (socket != null)
            {
                socket.Close();
            }

            SendAsyncCompleteEvent = null;
            MessageReceivedEvent = null;

            ConnectedEvent = null;
            DisconnectedEvent = null;

            ReleaseAsyncReceiveArgs();

#if Matrix_Diagnostics
            _monitor.Dispose();
#endif
            _serializer = null;
        }

        public override string ToString()
        {
            System.Net.Sockets.Socket socket = _socket;
            if (socket != null)
            {
                EndPoint endPoint = EndPoint;
                if (endPoint != null)
                {
                    return string.Format("{0}, id [{1}], at {2}", this.GetType().Name, Id, endPoint.ToString());
                }
            }

            return string.Format("{0}, id [{1}], at NA", this.GetType().Name, Id);
        }

        /// <summary>
        /// Attach object to a new socket.
        /// </summary>
        /// <param name="socket"></param>
        /// <returns></returns>
        public void AssignSocket(System.Net.Sockets.Socket socket, bool disposeCurrentSocket)
        {
#if Matrix_Diagnostics
            InstanceMonitor monitor = Monitor;
            if (monitor != null && monitor.IsReportAllowed)
            {
                monitor.Info(string.Format("Socket assigned, disposing current [{0}].", disposeCurrentSocket));
            }
#endif

            ReleaseSocket(disposeCurrentSocket);
            lock (_syncRoot)
            {
                _socket = socket;
            }

            AssignAsyncReceiveArgs(true);
        }

        /// <summary>
        /// Release the currently used socket.
        /// </summary>
        public void ReleaseSocket(bool disposeSocket)
        {
#if Matrix_Diagnostics
            InstanceMonitor monitor = Monitor;
            if (monitor != null && monitor.IsReportAllowed)
            {
                monitor.Info(string.Format("Releasing socket, dipose [{0}].", disposeSocket));
            }
#endif

            lock (_syncRoot)
            {
                System.Net.Sockets.Socket socket = _socket;
                if (socket != null && disposeSocket)
                {
                    socket.Disconnect(false);
                    socket.Close();
                }
                _socket = null;
            }
        }

        protected void RaiseConnectedEvent()
        {
#if Matrix_Diagnostics
            InstanceMonitor monitor = Monitor;
            if (monitor != null && monitor.IsReportAllowed)
            {
                monitor.Info("Raising connected event.");
            }
#endif

            SocketCommunicator.HelperUpdateDelegate delegateInstance = ConnectedEvent;
            if (delegateInstance != null)
            {
                delegateInstance(this);
            }
        }

        protected void RaiseDisconnectedEvent()
        {
#if Matrix_Diagnostics
            InstanceMonitor monitor = Monitor;
            if (monitor != null && monitor.IsReportAllowed)
            {
                monitor.Info("Raising disconnected event.");
            }
#endif
            
            SocketCommunicator.HelperUpdateDelegate delegateInstance = DisconnectedEvent;
            if (delegateInstance != null)
            {
                delegateInstance(this);
            }
        }

        /// <summary>
        /// Helper, assign a new set of receive args.
        /// </summary>
        protected bool AssignAsyncReceiveArgs(bool releaseExisting)
        {
#if Matrix_Diagnostics
            InstanceMonitor monitor = Monitor;
            if (monitor != null && monitor.IsReportAllowed)
            {
                monitor.Info(string.Format("AssignAsyncReceiveArgs, release existing [{0}].", releaseExisting));
            }
#endif

            System.Net.Sockets.Socket socket = _socket;
            if (socket == null)
            {
                return false;
            }

            if (releaseExisting)
            {
                ReleaseAsyncReceiveArgs();
            }

            SocketAsyncEventArgs args;
            lock (_syncRoot)
            {
                if (_lastReceiveArgs != null)
                {
                    _lastReceiveArgs.Dispose();
                    _lastReceiveArgs = null;
#if Matrix_Diagnostics
                    Monitor.Fatal("Assign async receive args logic error.");
#endif
                    return false;
                }

                _lastReceiveArgs = new SocketAsyncEventArgs();
                _lastReceiveArgs.Completed += new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_Received);

                _lastReceiveArgs.SetBuffer(_receiveBuffer, 0, _receiveBuffer.Length);
                args = _lastReceiveArgs;
            }

            if (socket.ReceiveAsync(args) == false)
            {
                if (args.SocketError == SocketError.Success)
                {
                    SocketAsyncEventArgs_Received(socket, args);
                }
                else
                {
                    ReleaseAsyncReceiveArgs();
                    return false;
                }
            }

            return true;    
        }


        void SocketAsyncEventArgs_Received(object sender, SocketAsyncEventArgs e)
        {

#if Matrix_Diagnostics
            if (Monitor.IsReportAllowed)
            {
                Monitor.Info("Socket received: " + e.LastOperation.ToString() + " was " + e.SocketError + " data [" + e.BytesTransferred + "]");
            }
#endif

//            // Do nothing here, since all other places for this cause various problems!
//            if (e.BytesTransferred == 0)
//            {
//#if Matrix_Diagnostics
//                InstanceMonitor monitor = Monitor;
//                if (monitor != null && monitor.IsReportAllowed)
//                {
//                    monitor.Info("Zero bytes transferred.");
//                }
//#endif
//                return;
//            }

            bool resetReceiveArgs = true;
            try
            {
                if (e != _lastReceiveArgs)
                {// We must make sure that we only handle these one at a time.
#if Matrix_Diagnostics
                    Monitor.Fatal("Operation logic error in receive args.");
#endif

                    e.Completed -= new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_Received);
                    e.Dispose();

                    resetReceiveArgs = false;
                    return;
                }

                if (e.SocketError == SocketError.ConnectionReset
                    /* TODO: THIS WAS ACTIVE, BUT WAS REMOVED SINCE IT CAUSED A PROBLEM IN VALID CONDITIONS
                     * TEST TO SEE IF LINE STILL NEEDED
                     * SO NOW THIS IS MOVED DOWNWARDS.
                     * || e.BytesTransferred == 0*/
                    )
                {// Connection was reset.
#if Matrix_Diagnostics
                    Monitor.ReportImportant("Socket connection reset.");
#endif
                    RaiseDisconnectedEvent();
                    resetReceiveArgs = false;
                    return;
                }

                System.Net.Sockets.Socket socket = _socket;
                if (socket == null)
                {
                    resetReceiveArgs = false;
                    return;
                }

                if (e.SocketError != SocketError.Success || socket.Connected == false)
                {
                    resetReceiveArgs = false;
                    return;
                }

                if (e.BytesTransferred == 0)
                {
                    resetReceiveArgs = false;
                    return;
                }

                lock (_syncRoot)
                {// Start the stream operations.

                    long streamStartPosition = _pendingReceiveStream.Position;

                    _pendingReceiveStream.Seek(0, SeekOrigin.End);
                    _pendingReceiveStream.Write(e.Buffer, 0, e.BytesTransferred);

                    _pendingReceiveStream.Seek(streamStartPosition, SeekOrigin.Begin);
                }

                ISerializer serializer = _serializer;
                if (serializer == null)
                {
                    return;
                }

                object message = null;
                do
                {
                    lock (_syncRoot)
                    {
                        if (_pendingReceiveStream.Length <= _pendingReceiveStream.Position)
                        {// Already read to the end of stream.
                            break;
                        }

                        long startPosition = _pendingReceiveStream.Position;
                        try
                        {
                            message = serializer.Deserialize(_pendingReceiveStream);
                            if (message == null && _pendingReceiveStream.Position != startPosition)
                            {// No message was retrieved, and stream was corrupted.
                                throw new InvalidDataException();
                            }
                        }
                        catch (InvalidDataException ex)
                        {   
                            // The serialization routine has failed, or the stream is corrupt;
                            // clear everything and try to start over (error recovery).
                            message = null;
                            _pendingReceiveStream.SetLength(0);
#if Matrix_Diagnostics
                            Monitor.Error("Possible invalid position calculation, data lost.", ex);
#endif
                        }
                    }

                    if (message != null)
                    {
                        if (message is SystemMessage)
                        {// System message received.
                            ProcessSystemMessage(message as SystemMessage);
                        }
                        else
                        {// Custom user message.
                            MessageUpdateDelegate delegateInstance = MessageReceivedEvent;
                            if (delegateInstance != null)
                            {
                                delegateInstance(this, message);
                            }
                        }
                    }


                } 
                while (message != null);


                lock (_syncRoot)
                {
                    if (_pendingReceiveStream.Position == _pendingReceiveStream.Length)
                    {// Reset the receive stream.
                        _pendingReceiveStream.SetLength(0);
                    }
                    else
                    {
                        // Swap primary and secondary streams, copying over the remaining data.
                        MemoryStream existingStream = _pendingReceiveStream;
                        MemoryStream newStream = _pendingReceiveStreamSecondary;

                        newStream.SetLength(existingStream.Length - existingStream.Position);
                        // Copy the left over data.
                        existingStream.Read(newStream.GetBuffer(), 0, (int)newStream.Length);
                        newStream.Seek(0, SeekOrigin.Begin);

                        CommonHelper.Swap<MemoryStream>(ref _pendingReceiveStream, ref _pendingReceiveStreamSecondary);
                        _pendingReceiveStreamSecondary.SetLength(0);

                        if (existingStream.Position != existingStream.Length)
                        {
#if Matrix_Diagnostics
                            Monitor.Error("Data propagation error.");
#endif
                            throw new SystemException("Data propagation error.");
                        }
                    }
                }

            }
            finally
            {
                // This is crucial, otherwise there is memory leaks.
                //e.Dispose();

                if (resetReceiveArgs)
                {
                    AssignAsyncReceiveArgs(true);
                }
                else
                {
                    ReleaseAsyncReceiveArgs();
                }
            }
        }

        /// <summary>
        /// Allows to process system messages in a different way, should there be one.
        /// </summary>
        /// <param name="message"></param>
        internal virtual void ProcessSystemMessage(SystemMessage message)
        {
        }

        /// <summary>
        /// Begin an asynchronous disconnect.
        /// </summary>
        /// <returns></returns>
        public bool DisconnectAsync()
        {

#if Matrix_Diagnostics
            InstanceMonitor monitor = Monitor;
            if (monitor != null && monitor.IsReportAllowed)
            {
                monitor.Info("Async disconnecting started.");
            }
#endif

            SocketAsyncEventArgs asyncDisconnectArgs = new SocketAsyncEventArgs();
            asyncDisconnectArgs.Completed += new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_DisconnectComplete);

            if (_socket.DisconnectAsync(asyncDisconnectArgs) == false)
            {

#if Matrix_Diagnostics
                monitor = Monitor;
                if (monitor != null && monitor.IsReportAllowed)
                {
                    monitor.Info("Disconnecting already completed.");
                }
#endif

                SocketAsyncEventArgs_DisconnectComplete(this, asyncDisconnectArgs);
            }

            return true;
        }

        /// <summary>
        /// Disconnect was completed.
        /// </summary>
        void SocketAsyncEventArgs_DisconnectComplete(object sender, SocketAsyncEventArgs e)
        {
#if Matrix_Diagnostics
            InstanceMonitor monitor = Monitor;
            if (monitor != null && monitor.IsReportAllowed)
            {
                monitor.Info("Disconnecting completed.");
            }
#endif

            if (e.SocketError == SocketError.Success)
            {
                e.Completed -= new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_DisconnectComplete);
                e.Dispose();

#if Matrix_Diagnostics
                Monitor.ReportImportant("Disconnected.");
#endif
                RaiseDisconnectedEvent();
            }

            ReleaseAsyncReceiveArgs();
        }

        void ReleaseAsyncReceiveArgs()
        {
#if Matrix_Diagnostics
            InstanceMonitor monitor = Monitor;
            if (monitor != null && monitor.IsReportAllowed)
            {
                monitor.Info("Releasing async receive arguments.");
            }
#endif

            lock (_syncRoot)
            {
                if (_lastReceiveArgs != null)
                {// Release the existing receive args.
                    _lastReceiveArgs.Completed -= new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_Received);
                    _lastReceiveArgs.Dispose();
                    _lastReceiveArgs = null;
                }
            }
        }

        /// <summary>
        /// Send a message, asynchronously.
        /// </summary>
        /// <param name="message">The message being sent.</param>
        /// <param name="requestConfirm">Should we wait for a confirmation, the </param>
        /// <returns>The id of the send message, or negative value (InvalidSendIndex / -1) if send failed.</returns>
        public long SendAsync(object message, TimeSpan? requestConfirmTimeout)
        {
#if Matrix_Diagnostics
            InstanceMonitor monitor = Monitor;
            if (monitor != null && monitor.IsReportAllowed)
            {
                monitor.Info(string.Format("Async sending message [{0}].", message.ToString()));
            }
#endif

            System.Net.Sockets.Socket socket = _socket;
            if (IsConnected == false || socket == null)
            {
#if Matrix_Diagnostics
                Monitor.OperationError("Communicator can not send message [" + message.ToString() + "] since not connected.");
#endif
                return InvalidSendIndex;
            }

            ISerializer serializer = _serializer;
            if (serializer == null)
            {
                return InvalidSendIndex;
            }

            // Event used for confirmed calls
            ManualResetEvent sendCompleteEvent = null;
            if (requestConfirmTimeout.HasValue)
            {
                sendCompleteEvent = new ManualResetEvent(false);
            }

            AsyncMessageSendInfo messageSendInfo = new AsyncMessageSendInfo() {
                                                                                  Id = PendingSendId,
                                                                                  Socket = socket,
                                                                                  Message = message,
                                                                                  ConfirmationEvent = sendCompleteEvent,
                                                                              };

            SocketAsyncEventArgs e = new SocketAsyncEventArgs();
            e.UserToken = messageSendInfo;
            e.Completed += new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_SendComplete);
            
            messageSendInfo.Stream = new MemoryStream();
            if (serializer.Serialize(messageSendInfo.Stream, message) == false)
            {
                messageSendInfo.Dispose();
                return InvalidSendIndex;
            }

            e.SetBuffer(messageSendInfo.Stream.GetBuffer(), 0, (int)messageSendInfo.Stream.Length);
            if (messageSendInfo.Socket.SendAsync(e) == false)
            {
                messageSendInfo.Dispose();
            }

            // Reaquire the event, to lessen the chance of [ObjectDisposedException]
            // when the connection is not established and we get errors on complete instantly.
            sendCompleteEvent = messageSendInfo.ConfirmationEvent;
            if (sendCompleteEvent != null)
            {
                try
                {
                    if (sendCompleteEvent.WaitOne(requestConfirmTimeout.Value) == false)
                    {
#if Matrix_Diagnostics
                        Monitor.OperationError("Communicator could not confirm message sent in assigned timeout.");
#endif
                        return InvalidSendIndex;
                    }
                }
                catch (ObjectDisposedException)
                {
#if Matrix_Diagnostics
                    Monitor.OperationError("Communicator could not confirm message sent in assigned timeout, due to event disposed.");
#endif
                    return InvalidSendIndex;
                }
            }

            return messageSendInfo.Id;
        }

        private void SocketAsyncEventArgs_SendComplete(object sender, SocketAsyncEventArgs e)
        {
#if Matrix_Diagnostics
            InstanceMonitor monitor = Monitor;
            if (monitor != null && monitor.IsReportAllowed)
            {
                monitor.Info(string.Format("Send complete, sender [{0}].", sender != null ? sender.ToString() : string.Empty));
            }
#endif

            e.Completed -= new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_SendComplete);
            
            AsyncMessageSendInfo sendInfo = (AsyncMessageSendInfo)e.UserToken;
            if (sendInfo.ConfirmationEvent != null)
            {// Signal any waiter, we sent the message.
                sendInfo.ConfirmationEvent.Set();
            }

            e.Dispose();

#if Matrix_Diagnostics
            if (Monitor.IsReportAllowed)
            {
                Monitor.Info(this.ToString() + " message send complete [" + sendInfo.Message.ToString() + "]");
            }
#endif

            AsyncMessageSendDelegate delegateInstance = SendAsyncCompleteEvent;
            if (delegateInstance != null)
            {
                delegateInstance(this, sendInfo);
            }
        }

    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Product Manager Ingenious Ltd, Bulgaria
Bulgaria Bulgaria
I worked for a few years as a C++/Win32 developer and software architect, and then moved on to the .NET environment where I was able to discover the beauty of managed programming.

I am currently involved in the development and management of Open Forex Platform (www.openforexplatform.com) and the Matrix Platform (www.matrixplatform.com).

Comments and Discussions