Click here to Skip to main content
15,886,026 members
Articles / Programming Languages / C#

The Super Pool Framework

Rate me:
Please Sign up or sign in to vote.
4.87/5 (53 votes)
31 Aug 2010CPOL26 min read 100.8K   1.5K   178  
The Super Pool is a framework for decoupled communication and management of components. The Super Pool introduces a natural asynchronous communication environment into your solution that can be fluently spread over different components, threads, processes, or even computers or networks.
// -----
// Copyright 2010 Deyan Timnev
// This file is part of the Matrix Platform (www.matrixplatform.com).
// The Matrix Platform is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, 
// either version 3 of the License, or (at your option) any later version. The Matrix Platform is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; 
// without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
// You should have received a copy of the GNU Lesser General Public License along with the Matrix Platform. If not, see http://www.gnu.org/licenses/lgpl.html
// -----
using System;
using System.Collections.Generic;
using Matrix.Common.Core;
using Matrix.Common.Core.Collections;
using Matrix.Common.Extended;
using Matrix.Common.Sockets.Common;
using Matrix.Common.Sockets.Core;
using Matrix.Framework.MessageBus.Core;
using Matrix.Framework.MessageBus.Net.Messages;
using System.Threading;
using System.Net;

#if Matrix_Diagnostics
using Matrix.Common.Diagnostics.TracerCore.Items;
#endif

namespace Matrix.Framework.MessageBus.Net
{
    /// <summary>
    /// Extends the default message bus functionality with networking capabilities.
    /// The message bus serves as a server for messages.
    /// </summary>
    public class ServerMessageBus : Matrix.Framework.MessageBus.Core.MessageBus
    {
        SocketMessageServer _server = null;

        /// <summary>
        /// What socketId has what clients.
        /// </summary>
        BiDictionary<int, ListUnique<ClientId>> _remoteClientsNetIds = new BiDictionary<int, ListUnique<ClientId>>();

        /// <summary>
        /// Access control information for each client. If null means client has not provided any.
        /// </summary>
        Dictionary<int, ClientAccessControl> _clientsAccessControl = new Dictionary<int, ClientAccessControl>();

        /// <summary>
        /// What client belongs to what socketId.
        /// </summary>
        Dictionary<ClientId, int> _remoteClientNetId = new Dictionary<ClientId, int>();

        /// <summary>
        /// What remote client implements what type.
        /// </summary>
        Dictionary<ClientId, Type> _remoteClientsTypes = new Dictionary<ClientId, Type>();

        /// <summary>
        /// What remove client source is of what type.
        /// </summary>
        Dictionary<ClientId, List<string>> _remoteClientsSourcesTypesNames = new Dictionary<ClientId, List<string>>();

        int _pendingMessageId = 0;
        protected int PendingMessageId
        {
            get { return Interlocked.Increment(ref _pendingMessageId); }
        }

        /// <summary>
        /// The default port of the server to accept incoming connections on.
        /// </summary>
        public const int DefaultPort = 18261;

        /// <summary>
        /// Access control rules.
        /// </summary>
        public ServerAccessControl AccessControl { get; protected set; }

        /// <summary>
        /// Constructor.
        /// </summary>
        /// <param name="port">The port for the server of the bus. Leave null for default port.</param>
        /// <param name="accessControl">Control requirements for user and password for connecting clients, pass null for no control (everyone can connect).</param>
        public ServerMessageBus(string name, int? port, ServerAccessControl accessControl)
            : base(name)
        {
            AccessControl = accessControl;
            if (port.HasValue == false)
            {
                port = DefaultPort;
            }

            _server = new SocketMessageServer(base.Serializer);
            _server.Start(new IPEndPoint(IPAddress.Any, port.Value));

            _server.ClientAsyncMessageSendEvent += new SocketMessageServer.AsyncMessageSendUpdateDelegate(_server_ClientAsyncMessageSendEvent);
            _server.ClientConnectedEvent += new SocketMessageServer.ServerClientUpdateDelegate(_server_ClientConnectedEvent);
            _server.ClientDisconnectedEvent += new SocketMessageServer.ServerClientUpdateDelegate(_server_ClientDisconnectedEvent);
            _server.ClientMessageReceivedEvent += new SocketMessageServer.MessageUpdateDelegate(_server_ClientMessageReceivedEvent);

            ApplicationLifetimeHelper.ApplicationClosingEvent += new CommonHelper.DefaultDelegate(ApplicationLifetimeHelper_ApplicationClosingEvent);
        }

        void ApplicationLifetimeHelper_ApplicationClosingEvent()
        {
            SocketMessageServer server = _server;
            if (server != null)
            {
                server.SendAsync(new StateUpdateMessage()
                                     {
                                         MessageId = PendingMessageId,
                                         State = StateUpdateMessage.StateEnum.Shutdown,
                                         RequestResponse = false
                                     }, null);

                // Allow a little time for the status update message(s) to travel.
                Thread.Sleep(150);
            }
        }

        public override void Dispose()
        {
            ApplicationLifetimeHelper.ApplicationClosingEvent -= new CommonHelper.DefaultDelegate(ApplicationLifetimeHelper_ApplicationClosingEvent);

            SocketMessageServer server = _server;
            _server = null;

            if (server != null)
            {
                server.ClientAsyncMessageSendEvent -= new SocketMessageServer.AsyncMessageSendUpdateDelegate(_server_ClientAsyncMessageSendEvent);
                server.ClientConnectedEvent -= new SocketMessageServer.ServerClientUpdateDelegate(_server_ClientConnectedEvent);
                server.ClientDisconnectedEvent -= new SocketMessageServer.ServerClientUpdateDelegate(_server_ClientDisconnectedEvent);
                server.ClientMessageReceivedEvent -= new SocketMessageServer.MessageUpdateDelegate(_server_ClientMessageReceivedEvent);

                server.Stop(TimeSpan.FromSeconds(2));
                server.Dispose();
            }

            base.Dispose();
        }

        bool ToClient(int clientSocketId, Message message, TimeSpan? requestConfirmTimeout)
        {
            SocketMessageServer server = _server;
            if (server == null)
            {
                return false;
            }

            ServerAccessControl accessControl = AccessControl;
            if (accessControl != null)
            {
                if (accessControl.IsAllowed(ObtainClientAccessControl(clientSocketId)) == false)
                {
#if Matrix_Diagnostics
                    InstanceMonitor.OperationWarning("Message [" + message.ToString() + "] was not sent to client [" + clientSocketId + "] due to access control.");
#endif
                    return false;
                }
            }

            message.MessageId = PendingMessageId;
            return server.SendAsync(clientSocketId, message, requestConfirmTimeout);
        }

        public override bool ContainsClient(ClientId clientId)
        {
            if (base.ContainsClient(clientId) == false)
            {
                lock (_syncRoot)
                {
                    return _remoteClientNetId.ContainsKey(clientId);
                }
            }

            return true;
        }

        public override Type GetClientType(ClientId clientId)
        {
            if (clientId.IsMessageBusIndexValid && clientId.MessageBus == this)
            {// Receiver seems to be a local item.
                return base.GetClientType(clientId);
            }

            lock (_syncRoot)
            {
                Type value;
                if (this._remoteClientsTypes.TryGetValue(clientId, out value))
                {
                    return value;
                }
            }

            return null;
        }

        public override List<ClientId> GetAllClientsIds()
        {
            List<ClientId> result = base.GetAllClientsIds();
            
            lock(_syncRoot)
            {
                foreach(ClientId id in _remoteClientNetId.Keys)
                {
                    result.Add(id);
                }
            }

            return result;
        }

        public override List<string> GetClientSourceTypes(ClientId clientId)
        {
            if (clientId.IsMessageBusIndexValid && clientId.MessageBus == this)
            {// Receiver seems to be a local item.
                return base.GetClientSourceTypes(clientId);
            }

            lock (_syncRoot)
            {
                List<string> names;
                if (this._remoteClientsSourcesTypesNames.TryGetValue(clientId, out names))
                {
                    return names;
                }
            }

            return null;
        }

        protected override void client_UpdateEvent(MessageBusClient client)
        {
            base.client_UpdateEvent(client);

            // Also send this notification to clients.
            int[] keys;
            lock (_syncRoot)
            {
                keys = GeneralHelper.EnumerableToArray<int>(_remoteClientsNetIds.Keys);
            }

            ClientUpdateMessage message = new ClientUpdateMessage() { 
                                                                        ClientId = client.Id, MessageId = PendingMessageId, RequestResponse = false };

            foreach (int key in keys)
            {
                ToClient(key, message, null);
            }
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="senderId"></param>
        /// <param name="receiverId"></param>
        /// <param name="envelope"></param>
        /// <param name="requestConfirm">Only valid for remote clients, since all local calls are confirmed or denied by default.</param>
        /// <returns></returns>
        protected override SendToClientResultEnum DoSendToClient(ClientId senderId, ClientId receiverId, 
                                                                 Envelope envelope, TimeSpan? requestConfirmTimeout)
        {
            if (receiverId.IsMessageBusIndexValid && (receiverId.MessageBus == this))
                //|| receiverId.MessageBus == null)) // This allows for "lost" ids.
            {// Receiver seems to be a local item.
                SendToClientResultEnum result = base.DoSendToClient(senderId, receiverId, envelope, requestConfirmTimeout);
                if (result != SendToClientResultEnum.ClientNotFound)
                {
                    return result;
                }
            }

            int clientSocketId = 0;
            lock (_syncRoot)
            {
                if (_remoteClientNetId.TryGetValue(receiverId, out clientSocketId) == false)
                {
                    return SendToClientResultEnum.ClientNotFound;
                }
            }

            // Send message.
            EnvelopeMessage message = new EnvelopeMessage() { Envelope = envelope,
                                                              Receivers = new ClientId[] { receiverId },
                                                              Sender = senderId,
                                                              RequestResponse = false
                                                            };

            if (ToClient(clientSocketId, message, requestConfirmTimeout) == false)
            {
                return SendToClientResultEnum.Failure;
            }
            else
            {
                return SendToClientResultEnum.Success;
            }
        }

        void RegisterClientId(int socketClientId, ClientId id, Type type, List<string> sourceTypeNames)
        {
            // Remove the index, that was valid for the remote bus, we use this as pure Id only.
            id.LocalMessageBusIndex = ClientId.InvalidMessageBusClientIndex;

            bool newlyAdded = false;
            lock (_syncRoot)
            {
                // Update all of the remote client state collections.
                newlyAdded = (_remoteClientNetId.ContainsKey(id) == false);
                
                _remoteClientNetId[id] = socketClientId;
                _remoteClientsTypes[id] = type;
                _remoteClientsSourcesTypesNames[id] = sourceTypeNames;

                if (_remoteClientsNetIds.ContainsKey(socketClientId) == false)
                {
                    _remoteClientsNetIds.Add(socketClientId, new ListUnique<ClientId>());
                }

                _remoteClientsNetIds[socketClientId].Add(id);
            }

            if (newlyAdded)
            {
                RaiseClientAddedEvent(id);
            }
        }

        ClientAccessControl ObtainClientAccessControl(int clientId)
        {
            if (clientId < 0)
            {
                return null;
            }

            ClientAccessControl result;
            lock (_syncRoot)
            {
                if (_clientsAccessControl.TryGetValue(clientId, out result) == false)
                {
                    result = new ClientAccessControl();
                    _clientsAccessControl.Add(clientId, result);
                }
            }

            return result;
        }

        void _server_ClientMessageReceivedEvent(SocketMessageServer server, SocketCommunicatorEx client, object message)
        {
            ServerAccessControl accessControl = AccessControl;
            // Check security first.
            if (accessControl != null && message is AccessMessage == false)
            {
                if (accessControl.IsAllowed(ObtainClientAccessControl(client.Id)) == false)
                {
#if Matrix_Diagnostics
                    InstanceMonitor.Info("Message [" + message.ToString()  +"] from client [" + client.ToString() + "] not allowed due to access control.", TracerItem.PriorityEnum.Medium);
#endif
                    return;
                }
            }

            if (message is EnvelopeMessage)
            {// Envelope user message.
                
                EnvelopeMessage envelopeMessage = (EnvelopeMessage)message;

                // Remove the remote message bus index association.
                envelopeMessage.Sender.LocalMessageBusIndex = ClientId.InvalidMessageBusClientIndex;

                foreach (ClientId id in envelopeMessage.Receivers)
                {
                    // Assign the id as local id, if it is, otherwise skip it.
                    id.LocalMessageBusIndex = base.GetClientIndexByGuid(id.Guid);
                    if (id.IsMessageBusIndexValid)
                    {
                        id.MessageBus = this;
                        if (DoSendToClient(envelopeMessage.Sender, id, envelopeMessage.Envelope, null) != SendToClientResultEnum.Success)
                        {
#if Matrix_Diagnostics
                            InstanceMonitor.OperationError(string.Format("Failed to accept envelope message [{0}].", envelopeMessage.ToString()));
#endif
                        }
                    }
                    else
                    {
#if Matrix_Diagnostics
                        InstanceMonitor.OperationError(string.Format("Failed to accept envelope message [{0}] due unrecognized receiver id.", envelopeMessage.ToString()));
#endif
                    }

                }
            }
            else if (message is ClientsListMessage)
            {// Message bus system message.

                ClientsListMessage updateMessage = (ClientsListMessage)message;
                for (int i = 0; i < updateMessage.Ids.Count; i++)
                {
                    RegisterClientId(client.Id, updateMessage.Ids[i], updateMessage.Types[i], updateMessage.SourcesTypes[i]);
                }

            }
            else if (message is RequestClientListUpdateMessage)
            {
                SendClientsUpdate(client.Id);
            }
            else if (message is ClientUpdateMessage)
            {
                ClientUpdateMessage updateMessage = (ClientUpdateMessage)message;
                
                bool validClient;
                lock (_syncRoot)
                {
                    validClient = _remoteClientNetId.ContainsKey(updateMessage.ClientId);
                }

                if (validClient)
                {
                    RaiseClientAddedEvent(updateMessage.ClientId);
                }
                else
                {
#if Matrix_Diagnostics
                    InstanceMonitor.OperationError(string.Format("Failed to raise update event for client [{0}], since client not found.", updateMessage.ClientId.ToString()));
#endif
                }
            }
            else if (message is AccessMessage)
            {
                ClientAccessControl control = ObtainClientAccessControl(client.Id);
                if (control != null)
                {
                    control.Update(message as AccessMessage);
                }
            }
            else if (message is StateUpdateMessage)
            {
                RaiseCounterPartyUpdateEvent("Client:" + client.Id.ToString(), ((StateUpdateMessage)message).State.ToString());
            }
            else
            {
#if Matrix_Diagnostics
                InstanceMonitor.Warning(string.Format("Message [{0}] not recognized.", message.GetType().Name));
#endif
            }
        }

        void _server_ClientDisconnectedEvent(SocketMessageServer server, SocketCommunicatorEx client)
        {
            // Clear all clients hooked on this connection.
            ListUnique<ClientId> clientsIds = null;
            lock (_syncRoot)
            {
                if (_remoteClientsNetIds.TryGetByKey(client.Id, ref clientsIds) == false)
                {
                    return;
                }

                _clientsAccessControl.Remove(client.Id);

                _remoteClientsNetIds.RemoveByKey(client.Id);
                foreach (ClientId id in clientsIds)
                {
                    _remoteClientNetId.Remove(id);
                    _remoteClientsTypes.Remove(id);
                    _remoteClientsSourcesTypesNames.Remove(id);
                }
            }

            // Raise event to notify of the disconnection of all these Ids.
            foreach (ClientId id in clientsIds)
            {
                // Notify of clients removal, with non permanent remove, since they may later be restored.
                RaiseClientRemovedEvent(id, false);
            }
        }

        /// <summary>
        /// Helper, sends an update with all the *local* clients ids to the server.
        /// </summary>
        bool SendClientsUpdate(int socketId)
        {
            ClientsListMessage message = new ClientsListMessage();
            foreach (MessageBusClient client in Clients)
            {
                message.Ids.Add(client.Id);
                message.AddType(client.GetType(), client.OptionalSourceType);
            }

            return ToClient(socketId, message, null);
        }

        void _server_ClientConnectedEvent(SocketMessageServer server, SocketCommunicatorEx client)
        {
            // The client expects is, so activate keep alive.
            client.KeepAlive = true;

            // Send local bus clients info to connected element.
            // Important - this will also be sent even to clients that do not have verified access control.
            if (SendClientsUpdate(client.Id) == false)
            {
#if Matrix_Diagnostics
                InstanceMonitor.OperationError("Failed to send clients update to client [" + client.ToString() + ", " + client.Id + "].");
#endif
            }
        }

        void _server_ClientAsyncMessageSendEvent(SocketMessageServer server, SocketCommunicatorEx client, SocketCommunicator.AsyncMessageSendInfo info)
        {
            // A message has been successfully sent to client.
        }

    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Product Manager Ingenious Ltd, Bulgaria
Bulgaria Bulgaria
I worked for a few years as a C++/Win32 developer and software architect, and then moved on to the .NET environment where I was able to discover the beauty of managed programming.

I am currently involved in the development and management of Open Forex Platform (www.openforexplatform.com) and the Matrix Platform (www.matrixplatform.com).

Comments and Discussions