Click here to Skip to main content
Click here to Skip to main content
Add your own
alternative version
Go to top

The Super Pool Framework

, 31 Aug 2010
The Super Pool is a framework for decoupled communication and management of components. The Super Pool introduces a natural asynchronous communication environment into your solution that can be fluently spread over different components, threads, processes, or even computers or networks.
Matrix_SuperPool_Standalone.zip
Lib
PowerCollections.dll
Matrix.Framework.SuperPool.Standalone
bin
Debug
Release
Common.Sockets
Common
Core
Matrix.Common.Core
Collections
Identification
Results
Serialization
Matrix.Common.Extended
FastSerialization
Operationals
ThreadPools
Matrix.Common.Sockets
Common
Core
Matrix.Framework.MessageBus
Client
Clients
ExecutionStrategies
Core
Net
Messages
Matrix.Framework.SuperPool
Call
Clients
Core
DynamicProxy
Subscription
Matrix.Framework.SuperPool.Standalone.csproj.user
Properties
Tests
Lib
log4net.dll
Matrix.Common.Core.dll
Matrix.Common.Diagnostics.dll
Matrix.Common.Diagnostics.FrontEnd.dll
Matrix.Common.Extended.dll
Matrix.Common.FrontEnd.dll
Matrix.Common.Sockets.dll
Matrix.Framework.TestFramework.dll
nunit.framework.dll
Matrix.Framework.SuperPool.Demonstration
bin
Debug
Release
FormServer.cs.bak
Properties
Matrix.Framework.SuperPool.Test
bin
Debug
Release
Matrix.Framework.SuperPool.Test.vshost.exe.manifest
MultiThreadTests
Properties
Settings.settings
SpeedTests
Matrix.Framework.SuperPool.UnitTest
bin
Debug
Release
Matrix.Framework.SuperPool.UnitTest.csproj.user
Properties
// -----
// Copyright 2010 Deyan Timnev
// This file is part of the Matrix Platform (www.matrixplatform.com).
// The Matrix Platform is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, 
// either version 3 of the License, or (at your option) any later version. The Matrix Platform is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; 
// without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
// You should have received a copy of the GNU Lesser General Public License along with the Matrix Platform. If not, see http://www.gnu.org/licenses/lgpl.html
// -----
using System;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Net;
using System.Threading;
using Matrix.Common.Core;
using Matrix.Common.Core.Collections;
using Matrix.Common.Core.Serialization;
using Matrix.Common.Sockets.Common;

#if Matrix_Diagnostics
    using Matrix.Common.Diagnostics;
#endif

namespace Matrix.Common.Sockets.Core
{
    /// <summary>
    /// TCP socket based message server implementation.
    /// </summary>
    public class SocketMessageServer : IDisposable
    {
        object _syncRoot = new object();

        public const int DefaultPort = 11632;

        int _backlog = 128;
        /// <summary>
        /// Connections backlog.
        /// </summary>
        public int Backlog
        {
            get { return _backlog; }
            set { _backlog = value; }
        }


        volatile System.Net.Sockets.Socket _listenSocket;

        /// <summary>
        /// Is the server started and listening.
        /// </summary>
        public bool IsStarted
        {
            get 
            { 
                return _listenSocket != null && _listenSocket.IsBound; 
            }
        }

        HotSwapDictionary<int, SocketCommunicatorEx> _clientsHotSwap = new HotSwapDictionary<int, SocketCommunicatorEx>();
        
        /// <summary>
        /// An enumerable containing all the clients pairs.
        /// </summary>
        public IEnumerable<KeyValuePair<int, SocketCommunicatorEx>> Clients
        {
            get { return _clientsHotSwap; }
        }

        /// <summary>
        /// An enumerable containing only the clients instances.
        /// </summary>
        public IEnumerable<SocketCommunicatorEx> ClientsOnly
        {
            get { return _clientsHotSwap.Values; }
        }

        /// <summary>
        /// The serializer used to serialize and deserialize messages to byte[].
        /// </summary>
        ISerializer _serializer;

        int _pendingClientId = 0;
        protected int PendingClientId
        {
            get { return Interlocked.Increment(ref _pendingClientId); }
        }

#if Matrix_Diagnostics
        InstanceMonitor _monitor;
        public InstanceMonitor Monitor
        {
            get { return _monitor; }
        }
#endif

        public delegate void ServerClientUpdateDelegate(SocketMessageServer server, SocketCommunicatorEx client);
        public delegate void MessageUpdateDelegate(SocketMessageServer server, SocketCommunicatorEx client, object message);
        public delegate void AsyncMessageSendUpdateDelegate(SocketMessageServer server, SocketCommunicatorEx client, SocketCommunicator.AsyncMessageSendInfo info);

        public event ServerClientUpdateDelegate ClientConnectedEvent;
        public event ServerClientUpdateDelegate ClientDisconnectedEvent;

        public event MessageUpdateDelegate ClientMessageReceivedEvent;
        public event AsyncMessageSendUpdateDelegate ClientAsyncMessageSendEvent;

        /// <summary>
        /// Constructor.
        /// </summary>
        public SocketMessageServer(ISerializer serializer)
        {
#if Matrix_Diagnostics
            _monitor = new InstanceMonitor(this);
#endif
            _serializer = serializer;
        }

        /// <summary>
        /// Dispose.
        /// </summary>
        public void Dispose()
        {
            // Stop the main accept socket.
            Stop(null);

            ClientConnectedEvent = null;
            ClientDisconnectedEvent = null;

            ClientMessageReceivedEvent = null;
            ClientAsyncMessageSendEvent = null;

            // Dispose all clients.
            SocketCommunicatorEx[] clients = CommonHelper.EnumerableToArray<SocketCommunicatorEx>(_clientsHotSwap.Values);
            _clientsHotSwap.Clear();

            _serializer = null;

            foreach (SocketCommunicatorEx client in clients)
            {
                client.Dispose();
            }

        }

        /// <summary>
        /// Start the server.
        /// </summary>
        public bool Start(IPEndPoint endPoint)
        {
            lock (_syncRoot)
            {
                if (_listenSocket != null)
                {// Already started.
#if Matrix_Diagnostics
                    Monitor.OperationWarning("Server already started.");
#endif
                    return true;
                }

                try
                {
                    this._listenSocket = new System.Net.Sockets.Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                    this._listenSocket.Bind(endPoint);
                    this._listenSocket.Listen(_backlog);
                }
                catch (SocketException ex)
                {
#if Matrix_Diagnostics
                    Monitor.OperationError("Failed to start server.", ex);
#endif
                    _listenSocket.Close();
                    _listenSocket = null;

                    return false;
                }
            }

#if Matrix_Diagnostics
            Monitor.Info("Message server started, at [" + endPoint.ToString() + "].");
#endif

            AssignAsyncAcceptArgs();
            return true;
        }

        /// <summary>
        /// Stop the server.
        /// </summary>
        public void Stop(TimeSpan? timeOut)
        {
            System.Net.Sockets.Socket listenSocket;
            lock (_syncRoot)
            {
                listenSocket = _listenSocket;
                _listenSocket = null;
            }

            if (listenSocket != null)
            {
                if (timeOut.HasValue)
                {
                    listenSocket.Close((int)timeOut.Value.TotalMilliseconds);
                }
                else
                {
                    listenSocket.Close();
                }
            }
        }

        public bool DisconnectClient(int clientId)
        {
            SocketCommunicatorEx client;
            if (_clientsHotSwap.TryGetValue(clientId, out client) == false)
            {
                return false;
            }

            return client.DisconnectAsync();
        }


        /// <summary>
        /// Send to all.
        /// </summary>
        public void SendAsync(object message, TimeSpan? requestConfirmTimeout)
        {
            foreach (SocketCommunicatorEx client in _clientsHotSwap.Values)
            {
                client.SendAsync(message, requestConfirmTimeout);
            }
        }

        /// <summary>
        /// Send a message to a client.
        /// </summary>
        /// <param name="clientId">Id of the client.</param>
        /// <param name="message">Message to send.</param>
        /// <returns>True if send has started successfully.</returns>
        public bool SendAsync(int clientId, object message, TimeSpan? requestConfirmTimeout)
        {
            SocketCommunicatorEx client;
            if (_clientsHotSwap.TryGetValue(clientId, out client) == false)
            {
#if Matrix_Diagnostics
                Monitor.OperationError("Client [" + clientId + "] not found.");
#endif
                return false;
            }

            return client.SendAsync(message, requestConfirmTimeout) != SocketCommunicator.InvalidSendIndex;
        }

        /// <summary>
        /// Helper, assign the pending async accept args.
        /// </summary>
        SocketAsyncEventArgs AssignAsyncAcceptArgs()
        {
            System.Net.Sockets.Socket listenSocket = _listenSocket;
            if (listenSocket == null)
            {
                return null;
            }

            SocketAsyncEventArgs e = new SocketAsyncEventArgs();
            e.Completed += new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_Completed);

            if (listenSocket.AcceptAsync(e) == false)
            {
                if (e.SocketError == SocketError.Success)
                {
                    SocketAsyncEventArgs_Completed(this, e);
                }
                else
                {// Accept failed.
#if Matrix_Diagnostics
                    Monitor.Fatal("Async accept failed.");
#endif
                }
            }

            return e;
        }

        /// <summary>
        /// Client connected.
        /// </summary>
        private void SocketAsyncEventArgs_Completed(object sender, SocketAsyncEventArgs e)
        {
            try
            {
                if (e.SocketError != SocketError.Success)
                {// This will execute the finally block, but skip the AssignAsyncAcceptArgs().
                    return;
                }

                if (e.LastOperation == SocketAsyncOperation.Accept
                    && e.SocketError == SocketError.Success)
                {
                    SocketCommunicatorEx helper = new SocketCommunicatorEx(_serializer);
                    helper.AssignSocket(e.AcceptSocket, true);
                    helper.Id = PendingClientId;

#if Matrix_Diagnostics
                    helper.Monitor.MinimumTracePriority = Monitor.MinimumTracePriority;
#endif

                    helper.ConnectedEvent += new SocketCommunicator.HelperUpdateDelegate(helper_ConnectedEvent);
                    helper.DisconnectedEvent += new SocketCommunicator.HelperUpdateDelegate(helper_DisconnectedEvent);

                    helper.MessageReceivedEvent += new SocketCommunicator.MessageUpdateDelegate(helper_MessageReceivedEvent);
                    helper.SendAsyncCompleteEvent += new SocketCommunicator.AsyncMessageSendDelegate(helper_SendAsyncCompleteEvent);

                    _clientsHotSwap[(int)helper.Id] = helper;

#if Matrix_Diagnostics
                    Monitor.ReportImportant("Client [" + helper.Id + "] connected.");
#endif

                    ServerClientUpdateDelegate delegateInstance = ClientConnectedEvent;
                    if (delegateInstance != null)
                    {
                        delegateInstance(this, helper);
                    }
                }
                else
                {
#if Matrix_Diagnostics
                    Monitor.NotImplementedWarning(e.ToString());
#endif
                }

            }
            finally
            {
                e.Completed -= new EventHandler<SocketAsyncEventArgs>(SocketAsyncEventArgs_Completed);
                e.Dispose();
            }

            AssignAsyncAcceptArgs();
        }

        #region Helper Instances Events

        void helper_SendAsyncCompleteEvent(SocketCommunicator helper, SocketCommunicator.AsyncMessageSendInfo info)
        {
            AsyncMessageSendUpdateDelegate del = ClientAsyncMessageSendEvent;
            if (del != null)
            {
                del(this, helper as SocketCommunicatorEx, info);
            }
        }

        void helper_MessageReceivedEvent(SocketCommunicator helper, object message)
        {
            MessageUpdateDelegate del = ClientMessageReceivedEvent;
            if (del != null)
            {
                del(this, (SocketCommunicatorEx)helper, message);
            }
        }

        void helper_ConnectedEvent(SocketCommunicator helper)
        {
            // This is never invoked, since we create the helpers directly on the connected sockets.
        }

        void helper_DisconnectedEvent(SocketCommunicator client)
        {
            //Monitor.ReportImportant("Client [" + client.Id + "] disconnected.");

            ServerClientUpdateDelegate delegateInstance = ClientDisconnectedEvent;
            if (delegateInstance != null)
            {
                delegateInstance(this, client as SocketCommunicatorEx);
            }
        }

        #endregion

    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Deyan Timnev
Product Manager Ingenious Ltd, Bulgaria
Bulgaria Bulgaria
I worked for a few years as a C++/Win32 developer and software architect, and then moved on to the .NET environment where I was able to discover the beauty of managed programming.
 
I am currently involved in the development and management of Open Forex Platform (www.openforexplatform.com) and the Matrix Platform (www.matrixplatform.com).

| Advertise | Privacy | Mobile
Web01 | 2.8.140916.1 | Last Updated 31 Aug 2010
Article Copyright 2010 by Deyan Timnev
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid