Click here to Skip to main content
15,891,248 members
Articles / Programming Languages / C#

A Complete TCP Server/Client Communication and RMI Framework - Usage

Rate me:
Please Sign up or sign in to vote.
4.94/5 (102 votes)
12 Jun 2011CPOL22 min read 339.1K   26.1K   247  
An Open Source lightweight framework (named Simple Client Server Library (SCS)) that is delevoped to create Server/Client applications using the simple Remote Method Invocation mechanism.
using System;
using System.Collections.Generic;
using System.Threading;
using Hik.Communication.Scs.Communication.Messages;
using Hik.Communication.Scs.Communication.Protocols;
using Hik.Threading;

namespace Hik.Communication.Scs.Communication.Messengers
{
    /// <summary>
    /// This class adds SendMessageAndWaitForResponse(...) and SendAndReceiveMessage methods
    /// to a IMessenger for synchronous request/response style messaging.
    /// It also adds queued processing of incoming messages.
    /// </summary>
    /// <typeparam name="T">Type of IMessenger object to use as underlying communication</typeparam>
    public class RequestReplyMessenger<T> : IMessenger, IDisposable where T : IMessenger
    {
        #region Public events

        /// <summary>
        /// This event is raised when a new message is received from underlying messenger.
        /// </summary>
        public event EventHandler<MessageEventArgs> MessageReceived;

        /// <summary>
        /// This event is raised when a new message is sent without any error.
        /// It does not guaranties that message is properly handled and processed by remote application.
        /// </summary>
        public event EventHandler<MessageEventArgs> MessageSent;

        #endregion

        #region Public properties

        /// <summary>
        /// Gets/sets wire protocol that is used while reading and writing messages.
        /// </summary>
        public IScsWireProtocol WireProtocol
        {
            get { return Messenger.WireProtocol; }
            set { Messenger.WireProtocol = value; }
        }

        /// <summary>
        /// Gets the time of the last succesfully received message.
        /// </summary>
        public DateTime LastReceivedMessageTime
        {
            get
            {
                return Messenger.LastReceivedMessageTime;
            }
        }

        /// <summary>
        /// Gets the time of the last succesfully received message.
        /// </summary>
        public DateTime LastSentMessageTime
        {
            get
            {
                return Messenger.LastSentMessageTime;
            }
        }

        /// <summary>
        /// Gets the underlying IMessenger object.
        /// </summary>
        public T Messenger { get; private set; }

        /// <summary>
        /// Timeout value as milliseconds to wait for a receiving message on 
        /// SendMessageAndWaitForResponse and SendAndReceiveMessage methods.
        /// Default value: 60000 (1 minute).
        /// </summary>
        public int Timeout { get; set; }

        #endregion

        #region Private fields

        /// <summary>
        /// Default Timeout value.
        /// </summary>
        private const int DefaultTimeout = 60000;

        /// <summary>
        /// This messages are waiting for a response those are used when 
        /// SendMessageAndWaitForResponse is called.
        /// Key: MessageID of waiting request message.
        /// Value: A WaitingMessage instance.
        /// </summary>
        private readonly SortedList<string, WaitingMessage> _waitingMessages;

        /// <summary>
        /// This object is used to process incoming messages sequentially.
        /// </summary>
        private readonly SequentialItemProcessor<IScsMessage> _incomingMessageProcessor;

        /// <summary>
        /// This object is used for thread synchronization.
        /// </summary>
        private readonly object _syncObj = new object();

        #endregion

        #region Constructor

        /// <summary>
        /// Creates a new RequestReplyMessenger.
        /// </summary>
        /// <param name="messenger">IMessenger object to use as underlying communication</param>
        public RequestReplyMessenger(T messenger)
        {
            Messenger = messenger;
            messenger.MessageReceived += Messenger_MessageReceived;
            messenger.MessageSent += Messenger_MessageSent;
            _incomingMessageProcessor = new SequentialItemProcessor<IScsMessage>(OnMessageReceived);
            _waitingMessages = new SortedList<string, WaitingMessage>();
            Timeout = DefaultTimeout;
        }

        #endregion

        #region Public methods

        /// <summary>
        /// Starts the messenger.
        /// </summary>
        public virtual void Start()
        {
            _incomingMessageProcessor.Start();
        }

        /// <summary>
        /// Stops the messenger.
        /// Cancels all waiting threads in SendMessageAndWaitForResponse method and stops message queue.
        /// SendMessageAndWaitForResponse method throws exception if there is a thread that is waiting for response message.
        /// Also stops incoming message processing and deletes all messages in incoming message queue.
        /// </summary>
        public virtual void Stop()
        {
            _incomingMessageProcessor.Stop();

            //Pulse waiting threads for incoming messages, since underlying messenger is disconnected
            //and can not receive messages anymore.
            lock (_syncObj)
            {
                foreach (var waitingMessage in _waitingMessages.Values)
                {
                    waitingMessage.State = WaitingMessageStates.Cancelled;
                    waitingMessage.WaitEvent.Set();
                }

                _waitingMessages.Clear();
            }
        }

        /// <summary>
        /// Calls Stop method of this object.
        /// </summary>
        public void Dispose()
        {
            Stop();
        }

        /// <summary>
        /// Sends a message.
        /// </summary>
        /// <param name="message">Message to be sent</param>
        public void SendMessage(IScsMessage message)
        {
            Messenger.SendMessage(message);
        }

        /// <summary>
        /// Sends a message and waits a response for that message.
        /// </summary>
        /// <remarks>
        /// Response message is matched with RepliedMessageId property, so if
        /// any other message (that is not reply for sent message) is received
        /// from remote application, it is not considered as a reply and is not
        /// returned as return value of this method.
        /// 
        /// MessageReceived event is not raised for response messages.
        /// </remarks>
        /// <param name="message">message to send</param>
        /// <returns>Response message</returns>
        public IScsMessage SendMessageAndWaitForResponse(IScsMessage message)
        {
            return SendMessageAndWaitForResponse(message, Timeout);
        }

        /// <summary>
        /// Sends a message and waits a response for that message.
        /// </summary>
        /// <remarks>
        /// Response message is matched with RepliedMessageId property, so if
        /// any other message (that is not reply for sent message) is received
        /// from remote application, it is not considered as a reply and is not
        /// returned as return value of this method.
        /// 
        /// MessageReceived event is not raised for response messages.
        /// </remarks>
        /// <param name="message">message to send</param>
        /// <param name="timeoutMilliseconds">Timeout duration as milliseconds.</param>
        /// <returns>Response message</returns>
        /// <exception cref="TimeoutException">Throws TimeoutException if can not receive reply message in timeout value</exception>
        /// <exception cref="CommunicationException">Throws CommunicationException if communication fails before reply message.</exception>
        public IScsMessage SendMessageAndWaitForResponse(IScsMessage message, int timeoutMilliseconds)
        {
            //Create a waiting message record and add to list
            var waitingMessage = new WaitingMessage();
            lock (_syncObj)
            {
                _waitingMessages[message.MessageId] = waitingMessage;
            }

            try
            {
                //Send message
                Messenger.SendMessage(message);

                //Wait for response
                waitingMessage.WaitEvent.Wait(timeoutMilliseconds);

                //Check for exceptions
                switch (waitingMessage.State)
                {
                    case WaitingMessageStates.WaitingForResponse:
                        throw new TimeoutException("Timeout occured. Can not received response.");
                    case WaitingMessageStates.Cancelled:
                        throw new CommunicationException("Disconnected before response received.");
                }

                //return response message
                return waitingMessage.ResponseMessage;
            }
            finally
            {
                //Remove message from waiting messages
                lock (_syncObj)
                {
                    if (_waitingMessages.ContainsKey(message.MessageId))
                    {
                        _waitingMessages.Remove(message.MessageId);
                    }
                }
            }
        }

        #endregion

        #region Private methods

        /// <summary>
        /// Handles MessageReceived event of Messenger object.
        /// </summary>
        /// <param name="sender">Source of event</param>
        /// <param name="e">Event arguments</param>
        private void Messenger_MessageReceived(object sender, MessageEventArgs e)
        {
            //Check if there is a waiting thread for this message in SendMessageAndWaitForResponse method
            if (!string.IsNullOrEmpty(e.Message.RepliedMessageId))
            {
                WaitingMessage waitingMessage = null;
                lock (_syncObj)
                {
                    if (_waitingMessages.ContainsKey(e.Message.RepliedMessageId))
                    {
                        waitingMessage = _waitingMessages[e.Message.RepliedMessageId];
                    }
                }

                //If there is a thread waiting for this response message, pulse it
                if (waitingMessage != null)
                {
                    waitingMessage.ResponseMessage = e.Message;
                    waitingMessage.State = WaitingMessageStates.ResponseReceived;
                    waitingMessage.WaitEvent.Set();
                    return;
                }
            }

            _incomingMessageProcessor.EnqueueMessage(e.Message);
        }

        /// <summary>
        /// Handles MessageSent event of Messenger object.
        /// </summary>
        /// <param name="sender">Source of event</param>
        /// <param name="e">Event arguments</param>
        private void Messenger_MessageSent(object sender, MessageEventArgs e)
        {
            OnMessageSent(e.Message);
        }

        #endregion

        #region Event raising methods

        /// <summary>
        /// Raises MessageReceived event.
        /// </summary>
        /// <param name="message">Received message</param>
        protected virtual void OnMessageReceived(IScsMessage message)
        {
            var handler = MessageReceived;
            if (handler != null)
            {
                handler(this, new MessageEventArgs(message));
            }
        }

        /// <summary>
        /// Raises MessageSent event.
        /// </summary>
        /// <param name="message">Received message</param>
        protected virtual void OnMessageSent(IScsMessage message)
        {
            var handler = MessageSent;
            if (handler != null)
            {
                handler(this, new MessageEventArgs(message));
            }
        }

        #endregion

        #region WaitingMessage class

        /// <summary>
        /// This class is used to store messaging context for a request message
        /// until response is received.
        /// </summary>
        private sealed class WaitingMessage
        {
            /// <summary>
            /// Response message for request message 
            /// (null if response is not received yet).
            /// </summary>
            public IScsMessage ResponseMessage { get; set; }

            /// <summary>
            /// ManualResetEvent to block thread until response is received.
            /// </summary>
            public ManualResetEventSlim WaitEvent { get; private set; }

            /// <summary>
            /// State of the request message.
            /// </summary>
            public WaitingMessageStates State { get; set; }

            /// <summary>
            /// Creates a new WaitingMessage object.
            /// </summary>
            public WaitingMessage()
            {
                WaitEvent = new ManualResetEventSlim(false);
                State = WaitingMessageStates.WaitingForResponse;
            }
        }

        /// <summary>
        /// This enum is used to store the state of a waiting message.
        /// </summary>
        private enum WaitingMessageStates
        {
            /// <summary>
            /// Still waiting for response.
            /// </summary>
            WaitingForResponse,

            /// <summary>
            /// Message sending is cancelled.
            /// </summary>
            Cancelled,

            /// <summary>
            /// Response is properly received.
            /// </summary>
            ResponseReceived
        }

        #endregion
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Founder Volosoft
Turkey Turkey
I have started programming at 14 years old using Pascal as hobby. Then I interested in web development (HTML, JavaScript, ASP...) before university.

I graduated from Sakarya University Computer Engineering. At university, I learned C++, Visual Basic.NET, C#, ASP.NET and Java. I partly implemented ARP, IP and TCP protocols in Java as my final term project.

Now, I am working on Windows and web based software development mostly using Microsoft technologies in my own company.

My open source projects:

* ABP Framework: https://abp.io
* jTable: http://jtable.org
* Others: https://github.com/hikalkan

My personal web site:

https://halilibrahimkalkan.com

Comments and Discussions