Click here to Skip to main content
15,881,882 members
Articles / Programming Languages / C#

The Super Pool Framework

Rate me:
Please Sign up or sign in to vote.
4.87/5 (53 votes)
31 Aug 2010CPOL26 min read 100.6K   1.5K   178  
The Super Pool is a framework for decoupled communication and management of components. The Super Pool introduces a natural asynchronous communication environment into your solution that can be fluently spread over different components, threads, processes, or even computers or networks.
// -----
// Copyright 2010 Deyan Timnev
// This file is part of the Matrix Platform (www.matrixplatform.com).
// The Matrix Platform is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, 
// either version 3 of the License, or (at your option) any later version. The Matrix Platform is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; 
// without even the implied warranty of  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
// You should have received a copy of the GNU Lesser General Public License along with the Matrix Platform. If not, see http://www.gnu.org/licenses/lgpl.html
// -----
using System;
using System.Collections.Generic;
using System.Text;
using Matrix.Common.Extended.ThreadPools;
using Matrix.Framework.MessageBus.Net;
using Matrix.Framework.MessageBus.Clients.ExecutionStrategies;

#if Matrix_Diagnostics
    using Matrix.Common.Diagnostics;
#endif

namespace Matrix.Framework.MessageBus.Core
{
    /// <summary>
    /// Base class for implementing a message bus.
    /// </summary>
    public abstract class MessageBusBase : IMessageBus
    {
        volatile bool _isDisposed = false;

        protected bool IsDisposed
        {
            get { return _isDisposed; }
        }

#if Matrix_Diagnostics
        private InstanceMonitor _monitor;
        protected InstanceMonitor InstanceMonitor
        {
            get { return _monitor; }
        }
#endif

        volatile string _name = string.Empty;
        /// <summary>
        /// Name of the bus.
        /// </summary>
        public string Name
        {
            get { return _name; }
        }

        volatile ThreadPoolFastEx _defaultThreadPool;
        /// <summary>
        /// The pool this bus uses by default.
        /// </summary>
        public ThreadPoolFastEx DefaultThreadPool
        {
            get { return _defaultThreadPool; }
        }

        protected enum SendToClientResultEnum
        {
            ClientNotFound,
            Failure,
            Success
        }

        public event MessageBusClientUpdateDelegate ClientAddedEvent;
        public event MessageBusClientRemovedDelegate ClientRemovedEvent;
        public event MessageBusClientUpdateDelegate ClientUpdateEvent;
        public event MessageBusCounterPartyUpdateDelegate CounterPartyUpdateEvent;

        /// <summary>
        /// Constructor.
        /// </summary>
        public MessageBusBase(string name)
        {
            _name = name;

#if Matrix_Diagnostics
            _monitor = new InstanceMonitor(this);
#endif
            
            _defaultThreadPool = new ThreadPoolFastEx(name + ".DefaultThreadPool");

            _defaultThreadPool.MinimumThreadsCount = 5;
            _defaultThreadPool.MaximumThreadsCount = 10;
        }

        #region IDisposable Members

        public virtual void Dispose()
        {
            _isDisposed = true;

            ThreadPoolFastEx threadPool = _defaultThreadPool;
            if (threadPool != null)
            {
                threadPool.Dispose();
                _defaultThreadPool = null;
            }
        }

        #endregion

        public abstract bool AddClient(MessageBusClient client);
        public abstract bool RemoveClient(MessageBusClient client, bool isPermanent);
        public abstract bool ContainsClient(ClientId client);

        public abstract Type GetClientType(ClientId clientId);
        public abstract List<string> GetClientSourceTypes(ClientId clientId);

        public abstract List<ClientId> GetAllClientsIds();

        /// <summary>
        /// Obtain an instance of the client, in case it is a local client
        /// and the instance is accessible. This will not work for remote
        /// (network) associated clients.
        /// </summary>
        public abstract MessageBusClient GetLocalClientInstance(ClientId clientId);

        protected void RaiseClientAddedEvent(ClientId clientId)
        {
            MessageBusClientUpdateDelegate delegateInstance = ClientAddedEvent;
            if (delegateInstance != null)
            {
                delegateInstance(this, clientId);
            }
        }

        protected void RaiseClientRemovedEvent(ClientId clientId, bool isPermanent)
        {
            MessageBusClientRemovedDelegate delegateInstance = ClientRemovedEvent;
            if (delegateInstance != null)
            {
                delegateInstance(this, clientId, isPermanent);
            }
        }

        protected void RaiseClientUpdateEvent(ClientId clientId)
        {
            MessageBusClientUpdateDelegate delegateInstance = ClientUpdateEvent;
            if (delegateInstance != null)
            {
                delegateInstance(this, clientId);
            }
        }

        protected void RaiseCounterPartyUpdateEvent(string partyId, string state)
        {
            MessageBusCounterPartyUpdateDelegate del = CounterPartyUpdateEvent;
            if (del != null)
            {
                del(this, partyId, state);
            }
        }

        
        /// <summary>
        /// Send a responce to an envelope received.
        /// </summary>
        /// <param name="receivedEnvelope">The enveloped received, we shall respond to it.</param>
        /// <param name="envelope">The new envelope, we are sending as responce.</param>
        /// <returns></returns>
        public Outcomes Respond(Envelope receivedEnvelope, Envelope envelope)
        {
            envelope.Address = EnvelopeTransportation.CreateResponseTransport(receivedEnvelope.History);
            return Send(envelope);
        }

        /// <summary>
        /// Addressed envelope.
        /// </summary>
        protected Outcomes Send(Envelope envelope)
        {
            if (envelope.Address != null)
            {
                EnvelopeStamp stamp = envelope.Address.PopStamp();
                if (stamp != null)
                {
                    return Send(stamp.SenderIndex, stamp.ReceiverIndex, envelope, null, false);
                }
            }

            return Outcomes.Failure;
        }

        /// <summary>
        /// Send an item to a single repicient.
        /// </summary>
        /// <returns></returns>
        public Outcomes Send(ClientId senderId, ClientId receiverId, Envelope envelope, TimeSpan? requestConfirmTimeout, bool showErrorsDiagnostics)
        {
            return Send(senderId, new ClientId[] { receiverId }, envelope, requestConfirmTimeout, showErrorsDiagnostics);
        }

        /// <summary>
        /// Send an item to multiple recipients.
        /// </summary>
        public Outcomes Send(ClientId senderId, IEnumerable<ClientId> receiversIds, Envelope envelope, TimeSpan? requestConfirmTimeout, bool showErrorsDiagnostics)
        {
            return DoSend(senderId, receiversIds, envelope, requestConfirmTimeout, showErrorsDiagnostics);
        }

        protected abstract Outcomes DoSend(ClientId senderIndex, IEnumerable<ClientId> receiversIndeces, Envelope envelope, TimeSpan? requestConfirmTimeout, bool showErrorsDiagnostics);

    
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Product Manager Ingenious Ltd, Bulgaria
Bulgaria Bulgaria
I worked for a few years as a C++/Win32 developer and software architect, and then moved on to the .NET environment where I was able to discover the beauty of managed programming.

I am currently involved in the development and management of Open Forex Platform (www.openforexplatform.com) and the Matrix Platform (www.matrixplatform.com).

Comments and Discussions