Click here to Skip to main content
15,896,207 members
Articles / Programming Languages / XML

DotNetMQ: A Complete Message Queue System for .NET

Rate me:
Please Sign up or sign in to vote.
4.94/5 (190 votes)
23 May 2011LGPL331 min read 1.1M   53.8K   489  
A new and independent Open Source Message Queue system that is entirely built in C# and .NET framework 3.5.
/*
DotNetMQ - A Complete Message Broker For .NET
Copyright (C) 2011 Halil ibrahim KALKAN

This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.

This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 */

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Threading;
using log4net;
using MDS.Communication.Events;
using MDS.Communication.TCPCommunication;
using MDS.Communication.Messages;
using MDS.Exceptions;
using MDS.Settings;
using MDS.Threading;

namespace MDS.Communication
{
    /// <summary>
    /// Represents communication layer of MDS server. This class represents communicators as servers and applications
    /// to upper layers.
    /// </summary>
    public class CommunicationLayer : IRunnable
    {
        #region Private fields

        /// <summary>
        /// Reference to logger.
        /// </summary>
        private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);

        /// <summary>
        /// Reference to the settings.
        /// </summary>
        private readonly MDSSettings _settings;

        /// <summary>
        /// A collection that stores all remote applications.
        /// key: ApplicationId
        /// Total MDSRemoteApplication objects count is equal to
        /// (MDSClientApplication count + MDSAdjacentServer count + 1 MDSController).
        /// </summary>
        private readonly SortedList<int, MDSRemoteApplication> _remoteApplications;

        /// <summary>
        /// A collection that stores communication managers.
        /// </summary>
        private readonly List<ICommunicationManager> _communicationManagers;

        /// <summary>
        /// Temporary Communicator List. This list store a communicator until that communicator
        /// registers to MDS server. After registration, it is removed from list.
        /// </summary>
        private readonly SortedList<long, ICommunicator> _communicators;

        /// <summary>
        /// Last generated application ID. This is used to get Unique ID for a RemoteApplication.
        /// It is used by CreateApplicationId method.
        /// </summary>
        private static int _lastApplicationId;

        /// <summary>
        /// Last generated communicator ID. When a new communicator builded, it gets 
        /// _lastCommunicatorId+1 by calling CreateCommunicatorId() method.
        /// </summary>
        private static long _lastCommunicatorId;

        #endregion

        #region Constructors

        /// <summary>
        /// Constructor.
        /// </summary>
        public CommunicationLayer()
        {
            _settings = MDSSettings.Instance;
            _remoteApplications = new SortedList<int, MDSRemoteApplication>();
            _communicators = new SortedList<long, ICommunicator>();
            _communicationManagers =
                new List<ICommunicationManager>
                    {
                        new TCPCommunicationManager(Convert.ToInt32(_settings["__ThisServerTCPPort"].Trim()))
                    };

            foreach (var manager in _communicationManagers)
            {
                manager.CommunicatorConnected += Manager_CommunicatorConnected;
            }
        }

        #endregion

        #region Public methods

        /// <summary>
        /// Generates a Unique ID for Remote Applications.
        /// </summary>
        /// <returns>Unique ID</returns>
        public static int CreateApplicationId()
        {
            return Interlocked.Increment(ref _lastApplicationId);
        }

        /// <summary>
        /// Generates a Unique ID for communicators.
        /// </summary>
        /// <returns>Unique ID</returns>
        public static long CreateCommunicatorId()
        {
            return Interlocked.Increment(ref _lastCommunicatorId);
        }

        /// <summary>
        /// Starts the communication layer and all subsytems.
        /// </summary>
        public void Start()
        {
            foreach (var manager in _communicationManagers)
            {
                manager.Start();
            }
        }

        /// <summary>
        /// Stops the communication layer and all subsytems.
        /// </summary>
        /// <param name="waitToStop">Indicates that caller thread must wait
        /// until communication layer stops</param>
        public void Stop(bool waitToStop)
        {
            foreach (var manager in _communicationManagers)
            {
                manager.Stop(waitToStop);
            }

            StopCommunicators(waitToStop);
            ClearCommunicators(waitToStop);
        }

        /// <summary>
        /// Waits until communication layer stops.
        /// </summary>
        public void WaitToStop()
        {
            foreach (var manager in _communicationManagers)
            {
                manager.WaitToStop();
            }

            WaitToStopOfCommunicators();
            ClearCommunicators(true);
        }

        /// <summary>
        /// Adds a remote application to communication layer.
        /// </summary>
        /// <param name="application">Remote application to add</param>
        public void AddRemoteApplication(MDSRemoteApplication application)
        {
            lock (_remoteApplications)
            {
                if (!_remoteApplications.ContainsKey(application.ApplicationId))
                {
                    _remoteApplications.Add(application.ApplicationId, application);
                }
            }
        }

        /// <summary>
        /// Removes a remote application from communication layer.
        /// </summary>
        /// <param name="application">Remote application to remove</param>
        public void RemoveRemoteApplication(MDSRemoteApplication application)
        {
            lock (_remoteApplications)
            {
                if (_remoteApplications.ContainsKey(application.ApplicationId))
                {
                    if(application.ConnectedCommunicatorCount > 0)
                    {
                        throw new MDSException("Remote application can not be removed. It has " +
                                               application.ConnectedCommunicatorCount + " communicators connected.");
                    }

                    _remoteApplications.Remove(application.ApplicationId);
                }
            }
        }

        #endregion

        #region Private methods

        /// <summary>
        /// When a communicator connects to server, this method is called.
        /// </summary>
        /// <param name="sender">Sender</param>
        /// <param name="e">Event args</param>
        private void Manager_CommunicatorConnected(object sender, CommunicatorConnectedEventArgs e)
        {
            e.Communicator.StateChanged += Communicator_StateChanged;
            AddToCommunicators(e.Communicator);
            e.Communicator.MessageReceived += Communicator_MessageReceived;
            e.Communicator.Start();
        }

        #region Register message handling and processing methods

        /// <summary>
        /// When a message received from a communicator, this method is called.
        /// This method just process Register messages. After a register message received
        /// from cummunicator, stops listen to events from this communicator anymore.
        /// </summary>
        /// <param name="sender">Sender (ICommunicator)</param>
        /// <param name="e">Event args</param>
        private void Communicator_MessageReceived(object sender, MessageReceivedFromCommunicatorEventArgs e)
        {
            if (e.Message.MessageTypeId != MDSMessageFactory.MessageTypeIdMDSRegisterMessage)
            {
                return;
            }
            
            try
            {
                ProcessRegisterMessage(e.Communicator, e.Message as MDSRegisterMessage);
            }
            catch (Exception ex)
            {
                Logger.Warn(ex.Message, ex);
            }
            finally
            {
                e.Communicator.MessageReceived -= Communicator_MessageReceived;                
            }
        }

        /// <summary>
        /// Processes MDSRegisterMessage objects.
        /// </summary>
        /// <param name="communicator">Sender communicator of message</param>
        /// <param name="message">Message</param>
        private void ProcessRegisterMessage(ICommunicator communicator, MDSRegisterMessage message)
        {
            //Set the communicator properties
            communicator.CommunicationWay = message.CommunicationWay;

            MDSRemoteApplication remoteApplication = null;
            //Find remote application
            lock (_remoteApplications)
            {
                foreach (var app in _remoteApplications.Values)
                {
                    if (app.Name == message.Name && message.CommunicatorType == app.CommunicatorType)
                    {
                        remoteApplication = app;
                        break;
                    }
                }
            }

            //If application is found...
            if (remoteApplication != null)
            {
                try
                {
                    //Add communicator to communicator list of remote application
                    remoteApplication.AddCommunicator(communicator);
                    //Remove communicator from tempoary communicators list.
                    RemoveFromCommunicators(communicator.ComminicatorId);
                    //Send success message to remote application
                    SendOperationResultMessage(communicator, true, communicator.ComminicatorId.ToString(), message.MessageId);
                }
                catch (Exception ex)
                {
                    Logger.Warn(ex.Message, ex);
                    //An error occured, send failed message to remote application
                    SendOperationResultMessage(communicator, false, ex.Message, message.MessageId);
                    communicator.Stop(false);
                }
            }
            else //application == null
            {
                //Stop communicator, because a remote application can not connect this server that is not defined in settings file
                SendOperationResultMessage(communicator, false, "No remote application found with name: " + message.Name, message.MessageId);
                communicator.Stop(false);
            }
        }

        /// <summary>
        /// Sends a MDSOperationResultMessage message to a communicator.
        /// </summary>
        /// <param name="communicator">Communicator object</param>
        /// <param name="success">Operation result</param>
        /// <param name="resultText">Detailed result/error text</param>
        /// <param name="repliedMessageId">The message id of request message</param>
        private static void SendOperationResultMessage(ICommunicator communicator, bool success, string resultText, string repliedMessageId)
        {
            communicator.SendMessage(
                new MDSOperationResultMessage
                    {
                        Success = success,
                        ResultText = resultText,
                        RepliedMessageId = repliedMessageId
                    });
        }

        #endregion

        #region Communicator add/remove and stop methods

        /// <summary>
        /// When state of a communicator changes, this method handles event.
        /// It is used to remove a communicator from list when it is closed.
        /// </summary>
        /// <param name="sender">Sender (ICommunicationManager)</param>
        /// <param name="e">Event arguments</param>
        private void Communicator_StateChanged(object sender, CommunicatorStateChangedEventArgs e)
        {
            switch (e.Communicator.State)
            {
                case CommunicationStates.Closed:
                    RemoveFromCommunicators(e.Communicator.ComminicatorId);
                    break;
            }
        }

        /// <summary>
        /// Adds a TCPCommunicator object to _communicators list.
        /// </summary>
        /// <param name="communicator">TCPCommunicator to be added</param>
        private void AddToCommunicators(ICommunicator communicator)
        {
            lock (_communicators)
            {
                _communicators[communicator.ComminicatorId] = communicator;
            }
        }

        /// <summary>
        /// Removes a TCPCommunicator object from _communicators list.
        /// </summary>
        /// <param name="comminicatorId">Id of TCPCommunicator to be removed</param>
        private void RemoveFromCommunicators(long comminicatorId)
        {
            lock (_communicators)
            {
                if (_communicators.ContainsKey(comminicatorId))
                {
                    _communicators.Remove(comminicatorId);
                }
            }
        }

        /// <summary>
        /// Stops all communicator connections.
        /// </summary>
        /// <param name="waitToStop">Indicates that caller thread waits stopping of communicators</param>
        private void StopCommunicators(bool waitToStop)
        {
            lock (_communicators)
            {
                var communicatorIds = _communicators.Keys.ToArray();
                foreach (var communicatorId in communicatorIds)
                {
                    try
                    {
                        _communicators[communicatorId].Stop(waitToStop);
                    }
                    catch (Exception ex)
                    {
                        Logger.Error(ex.Message, ex);
                    }
                }
            }
        }

        /// <summary>
        /// Removes all TCPCommunicator objects from _communicators list.
        /// </summary>
        /// <param name="waitToStop">Indicates that caller thread waits stop</param>
        private void ClearCommunicators(bool waitToStop)
        {
            if (!waitToStop)
            {
                return;
            }

            lock (_communicators)
            {
                _communicators.Clear();
            }
        }

        /// <summary>
        /// Waits all communicators to stop.
        /// </summary>
        private void WaitToStopOfCommunicators()
        {
            lock (_communicators)
            {
                var communicatorIds = _communicators.Keys.ToArray();
                foreach (var communicatorId in communicatorIds)
                {
                    try
                    {
                        _communicators[communicatorId].WaitToStop();
                    }
                    catch (Exception ex)
                    {
                        Logger.Error(ex.Message, ex);
                    }
                }
            }
        }

        #endregion

        #endregion
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The GNU Lesser General Public License (LGPLv3)


Written By
Founder Volosoft
Turkey Turkey
I have started programming at 14 years old using Pascal as hobby. Then I interested in web development (HTML, JavaScript, ASP...) before university.

I graduated from Sakarya University Computer Engineering. At university, I learned C++, Visual Basic.NET, C#, ASP.NET and Java. I partly implemented ARP, IP and TCP protocols in Java as my final term project.

Now, I am working on Windows and web based software development mostly using Microsoft technologies in my own company.

My open source projects:

* ABP Framework: https://abp.io
* jTable: http://jtable.org
* Others: https://github.com/hikalkan

My personal web site:

https://halilibrahimkalkan.com

Comments and Discussions