Click here to Skip to main content
15,885,757 members
Articles / Programming Languages / XML

A Full Library for a Socket Client/Server System

Rate me:
Please Sign up or sign in to vote.
4.81/5 (42 votes)
14 Jan 2015CPOL5 min read 160.9K   11.3K   149  
My article shows a library that everyone can use to create their socket communication. Also, it explains how the library is developed.
using System;
using System.Threading;
using System.Net.Sockets;
using System.Net.Security;
using System.Diagnostics;
using SocketServerLib.Message;
using MultiThreadQueueLib;

namespace SocketServerLib.SocketHandler
{

    #region Defines delegates

    /// <summary>
    /// Defines the delegate for a received message event
    /// </summary>
    /// <param name="handler">The socket client handler where the message has been received</param>
    /// <param name="message">The received message</param>
    public delegate void ReceiveMessageDelegate(AbstractTcpSocketClientHandler handler, AbstractMessage message);
   
    /// <summary>
    /// Defines the delegate for a connection event
    /// </summary>
    /// <param name="handler">The socket client handler that has been connected</param>
    public delegate void SocketConnectionDelegate(AbstractTcpSocketClientHandler handler);

    #endregion

    /// <summary>
    /// This abstract class represents a socket client handler with/without a receive queue and a send queue. 
    /// With the receive queue, each received message is saved in a queue and a dequeuer thread
    /// get the messages one by one from the queue and raise the receive message event.
    /// Without the receieve queue, each time you receive a message a thread is created to raise the receive event.
    /// You have to implement the method GetMessageInstance to define in your socket client handler the message class.
    /// </summary>
    public abstract class AbstractTcpSocketClientHandler : IDisposable
    {
        /// <summary>
        /// The socket
        /// </summary>
        protected TcpSocket socket = null;
        /// <summary>
        /// auto resent events to synch send
        /// </summary>
        private AutoResetEvent sendDone = new AutoResetEvent(true);
        private AutoResetEvent sendWaitHandle = new AutoResetEvent(true);
        /// <summary>
        /// Receive queue
        /// </summary>
        private MultiThreadQueue<ReceiveMessageStateObject> receiveQueue = null;
        /// <summary>
        /// Cancellation token for the dequeuer from the receive queue
        /// </summary>
        private CancellationToken cancellationToken = null;
        /// <summary>
        /// Flag to stop the dequeuer
        /// </summary>
        private bool flagShutdown = false;
        /// <summary>
        /// Dequeuer thread
        /// </summary>
        private Thread dequeuerThread = null;
        /// <summary>
        /// Delegate for received message event
        /// </summary>
        private ReceiveMessageDelegate receiveMessageEvent = null;
        /// <summary>
        /// Delegate for close connection event
        /// </summary>
        private SocketConnectionDelegate closeConnectionEvent = null;
        /// <summary>
        /// Delegate for incoming message event
        /// </summary>
        private SocketConnectionDelegate inReceivingEvent = null;
        // Lock object for raise event
        private readonly object raiseLock = new object();

        /// <summary>
        /// Constructor for a socket client handler on SSL
        /// </summary>
        /// <param name="handler">The socket handler</param>
        /// <param name="stream">The ssl stream</param>
        /// <param name="useReceiveQueue">If true the message receiving is throw a queue, otherwise each message is handle by a different thread</useReceiveQueue>
        public AbstractTcpSocketClientHandler(Socket handler, SslStream stream, bool useReceiveQueue)
            : base()
        {
            if (stream != null)
            {
                this.socket = new SSLSocket(handler, stream);
            }
            else
            {
                this.socket = new TcpSocket(handler);
            }
            // If the socket client handler has to use the receive queue, the necessary classes are created
            if (useReceiveQueue)
            {
                receiveQueue = new MultiThreadQueue<ReceiveMessageStateObject>();
                cancellationToken = new CancellationToken();
                // The dereive dequeuer is started
                this.dequeuerThread = new Thread(new ThreadStart(this.ReceiveDequeuer));
                this.dequeuerThread.Start();
            }
        }

        /// <summary>
        /// Constructor for a socket client handler on SSL without receive queue
        /// </summary>
        /// <param name="handler">The socket handler</param>
        /// <param name="stream">The ssl stream</param>
        public AbstractTcpSocketClientHandler(Socket handler, SslStream stream)
            : this(handler, stream, false)
        {
        }

        /// <summary>
        /// Constructor for a socket client handler (not SSL) without receive queue
        /// </summary>
        /// <param name="handler">The socket handler</param>
        public AbstractTcpSocketClientHandler(Socket handler)
            : this(handler, null, false)
        {
        }

        /// <summary>
        /// Implement this method to create a socket client handler class.
        /// </summary>
        /// <returns>Return an empty message instance</returns>
        protected abstract AbstractMessage GetMessageInstance();

        #region Events

        /// <summary>
        /// Event for a received message.
        /// </summary>
        public event ReceiveMessageDelegate ReceiveMessageEvent
        {
            add { lock (raiseLock) { receiveMessageEvent += value; } }
            remove { lock (raiseLock) { receiveMessageEvent -= value; } }
        }

        /// <summary>
        /// Event for a close connection.
        /// </summary>
        public event SocketConnectionDelegate CloseConnectionEvent
        {
            add { lock (raiseLock) { closeConnectionEvent += value; } }
            remove { lock (raiseLock) { closeConnectionEvent -= value; } }
        }

        /// <summary>
        /// Event for an incoming a message.
        /// </summary>
        public event SocketConnectionDelegate InReceivingEvent
        {
            add { lock (raiseLock) { inReceivingEvent += value; } }
            remove { lock (raiseLock) { inReceivingEvent -= value; } }
        }

        
        #endregion

        #region Properties

        /// <summary>
        /// Socket Keep Alive flag
        /// </summary>
        public bool KeepAlive
        {
            get
            {
                return this.socket.KeepAlive;
            }
            set
            {
                this.socket.KeepAlive = value;
            }
        }

        /// <summary>
        /// Socket Send Timeout
        /// </summary>
        public int SendTimeout
        {
            get
            {
                return this.socket.SendTimeout;
            }
            set
            {
                this.socket.SendTimeout = value;
            }
        }
        
        #endregion

        #region Methods to raise events

        /// <summary>
        /// Handle a received message. A received message event is raise but in a new thread in order to not block the main thread.
        /// Override this method to change the behavior.
        /// </summary>
        /// <param name="rcvObj">The received object</param>
        protected virtual void OnReceiveMessage(ReceiveMessageStateObject rcvObj)
        {
            if (this.receiveQueue != null)
            {
                Trace.WriteLine(string.Format("Add Message {0} from Client {1} in the receive queue", rcvObj.Message.MessageUID, rcvObj.Message.ClientUID));
                this.receiveQueue.Enqueue(rcvObj);
                return;
            }
            ThreadPool.QueueUserWorkItem(new WaitCallback(rcvObj.Handler.RaiseReceiveMessageEvent), rcvObj);
        }

        /// <summary>
        /// Raise a received message event. This method is running in the same thread of the caller. 
        /// </summary>
        /// <param name="stateObj">The receive message state object</param>
        protected void RaiseReceiveMessageEvent(object stateObj)
        {
            if (receiveMessageEvent != null)
            {
                ReceiveMessageStateObject rcvObj = (ReceiveMessageStateObject)stateObj;
                receiveMessageEvent(rcvObj.Handler, rcvObj.Message);
            }
        }

        /// <summary>
        /// Handle a close connection. A close connection event is raise but in a new thread in order to not block the main thread.
        /// Override this method to change the behavior.
        /// </summary>
        /// <param name="closedHandler">The socket client handler closed</param>
        protected virtual void OnCloseConnection(AbstractTcpSocketClientHandler closedHandler)
        {
            ThreadPool.QueueUserWorkItem(new WaitCallback(closedHandler.RaiseCloseConnectionEvent), closedHandler);
        }

        /// <summary>
        /// Raise a received message event. This method is running in the same thread of the caller. 
        /// </summary>
        /// <param name="stateObj">The socket client handler closed</param>
        private void RaiseCloseConnectionEvent(object stateObj)
        {
            if (closeConnectionEvent != null)
            {
                AbstractTcpSocketClientHandler closedHandler = (AbstractTcpSocketClientHandler)stateObj;
                closeConnectionEvent(closedHandler);
            }
        }

        /// <summary>
        /// Raise an incoming message event. This method is running in the same thread of the caller.
        /// </summary>
        /// <param name="handler">The socket client handler that is receiving an incoming message</param>
        protected virtual void OnReceivingMessage(object handler)
        {
            if (inReceivingEvent != null)
            {
                inReceivingEvent((AbstractTcpSocketClientHandler)handler);
            }
        }

        #endregion

        #region Asinchronous receiving

        /// <summary>
        /// Start asynchronous receiving.
        /// </summary>
        internal virtual void StartReceive()
        {
            SocketStateObject state = new SocketStateObject();
            state.workHandler = this;
            socket.BeginReceive(state, new AsyncCallback(ReadCallback));
        }

        /// <summary>
        /// Stop asynchronous receiving.
        /// </summary>
        /// <param name="result"></param>
        /// <returns></returns>
        private int EndReceive(IAsyncResult result)
        {
            if (socket == null)
            {
                return 0;
            }
            return socket.EndReceive(result);
        }

        /// <summary>
        /// Callback for asynchronous receiving.
        /// </summary>
        /// <param name="ar">The socket state object for receiving data</param>
        protected static void ReadCallback(IAsyncResult ar)
        {
            SocketStateObject state = (SocketStateObject)ar.AsyncState;
            AbstractTcpSocketClientHandler handler = state.workHandler;

            try
            {
                // Read data from the client socket.
                int read = handler.EndReceive(ar);
                Trace.WriteLine(string.Format("Receive {0} bytes", read));

                // Data was read from the client socket.
                if (read > 0)
                {
                    // Fire event for incoming message
                    handler.OnReceivingMessage(handler);
                    while (true)
                    {
                        AbstractMessage message = AbstractMessage.TryReadMessage(handler.GetMessageInstance(), state, read);
                        // Fire event for received message
                        if (message != null)
                        {
                            ReceiveMessageStateObject rcvObj = new ReceiveMessageStateObject() { Handler = handler, Message = message };
                            handler.OnReceiveMessage(rcvObj);
                        }
                        if (state.pendingBuffer == null)
                        {
                            break;
                        }
                        read = 0;
                    }
                    handler.socket.BeginReceive(state, new AsyncCallback(ReadCallback));
                }
                else
                {
                    handler.Close();
                }
            }
            catch (MessageException mex)
            {
                // Invalid message
                Trace.WriteLine(string.Format("Exception {0}. Connection will be closed", mex));
                state.message = null;
                state.pendingBuffer = null;
                handler.Close();
            }
            catch (SocketException sex)
            {
                if (sex.SocketErrorCode == SocketError.ConnectionReset || sex.SocketErrorCode == SocketError.ConnectionAborted)
                {
                    Trace.WriteLine(string.Format("Socket error for disconnection {0} : {1} : {2}. Client will be disconnected", sex.ErrorCode, sex.SocketErrorCode, sex.Message));
                    state.message = null;
                    state.pendingBuffer = null;
                    handler.Close();
                }
                else
                {
                    Trace.WriteLine(string.Format("Exception {0}. Connection will be closed", sex));
                    state.message = null;
                    state.pendingBuffer = null;
                    handler.Close();
                }
            }
            catch (Exception ex)
            {
                Trace.WriteLine(string.Format("Exception {0}. Connection will be closed", ex));
                state.message = null;
                state.pendingBuffer = null;
                handler.Close();
            }
        }
        
        #endregion

        #region Synchronous send methods

        /// <summary>
        /// Send a buffer. It's a synchronous operation. The previous send has to be completed.
        /// You can define a timeout on the previous send and skip this one in case of time out.
        /// </summary>
        /// <param name="message">The message to send</param>
        /// <param name="previousSendTimeout">Timeout on the previous send in ms</param>
        /// <returns>True if the message has been sent, otherwise false</returns>
        public virtual bool Send(AbstractMessage message, int previousSendTimeout)
        {
            return SendMessage(message, previousSendTimeout);
        }

        /// <summary>
        /// Send a buffer. It's a synchronous operation. The previous send has to be completed.
        /// </summary>
        /// <param name="message">The message to send</param>
        /// <returns>True if the message has been sent, otherwise false</returns>
        public virtual bool Send(AbstractMessage message)
        {
            return Send(message, 0);
        }

        /// <summary>
        /// Send a buffer. It's a synchronous operation. The previous send has to be completed.
        /// You can define a timeout on the previous send and skip this one in case of time out.
        /// </summary>
        /// <param name="message">The message to send</param>
        /// <param name="previousSendTimeout">Timeout on the previous send in ms</param>
        /// <returns>True if the message has been sent, otherwise false</returns>
        internal bool SendMessage(AbstractMessage message, int previousSendTimeout)
        {
            byte[] buffer = message.GetEnvelope();
            Trace.WriteLine(string.Format("Sending message {0} len {1}", message.MessageUID, message.MessageLength));
            bool r = SendBuffer(this, buffer, previousSendTimeout);
            return r;
        }

        /// <summary>
        /// Send a buffer. It's a synchronous operation. The previous send has to be completed.
        /// You can define a timeout on the previous send and skip this one in case of time out.
        /// </summary>
        /// <param name="clientHandler">The socket client handler for send</param>
        /// <param name="byteData">The buffer to send</param>
        /// <param name="previousSendTimeout">Timeout on the previous send in ms</param>
        /// <returns>True if the message has been sent, otherwise false</returns>
        private static bool SendBuffer(AbstractTcpSocketClientHandler clientHandler, byte[] byteData, int previousSendTimeout)
        {
            bool ret = false;
            bool flagRaiseDisconnection = false;
            bool flagReset = true;
            try
            {
                // Check if client is connected
                if (!clientHandler.socket.Connected)
                {
                    throw new TcpSocketException("Client disconnected", true);
                }
                // Wait for previous send
                if (previousSendTimeout != 0)
                {
                    if (!clientHandler.sendWaitHandle.WaitOne(previousSendTimeout))
                    {
                        throw new TcpSocketException("Send skip for timeout");
                    }
                }
                else
                {
                    clientHandler.sendWaitHandle.WaitOne();
                }
                // Sending the data to the remote device.
                Trace.WriteLine(string.Format("Sending {0} bytes", byteData.Length));
                int n = clientHandler.socket.Send(byteData);
                if (n != byteData.Length)
                {
                    Trace.WriteLine(string.Format("Invalid send: sent {0} bytes on {1} bytes", n, byteData.Length));
                    throw new TcpSocketException("Invalid send");
                }
                ret = true;
            }
            catch (SocketException sex)
            {
                Trace.WriteLine(string.Format("Exception {0}. Connection will be closed", sex));
                flagRaiseDisconnection = true;
            }
            catch (TcpSocketException netEx)
            {
                Trace.WriteLine(string.Format("Exception {0}. Connection will be closed", netEx));
                if (netEx.DisconnectionRequired)
                {
                    flagRaiseDisconnection = true;
                }
                // Not release the sendwaithandle in case of send timeout
                else
                {
                    flagReset = false;
                }
            }
            catch (ObjectDisposedException dex)
            {
                Trace.WriteLine(string.Format("Exception {0}. Connection will be closed", dex));
                flagRaiseDisconnection = true;
            }
            catch (Exception ex)
            {
                Trace.WriteLine(string.Format("Exception {0}.", ex));
            }
            finally
            {
                if (flagReset)
                {
                    Trace.WriteLine("release sendwaithandle");
                    clientHandler.sendWaitHandle.Set();
                }
                if (flagRaiseDisconnection)
                {
                    Trace.WriteLine("Client will be disconnected.");
                    clientHandler.OnCloseConnection(clientHandler);
                }
            }
            return ret;
        }

        #endregion

        #region Asinchronous send methods

        public virtual void SendAsync(AbstractMessage message)
        {
            this.sendDone.WaitOne();
            byte[] buffer = message.GetEnvelope();
            Trace.WriteLine(string.Format("Sending asynch message {0} len {1}", message.MessageUID, message.MessageLength));
            this.socket.BeginSend(buffer, this, SendCallback);
        }

        protected static void SendCallback(IAsyncResult ar)
        {
            AbstractTcpSocketClientHandler handler = null;
            try
            {
                SocketError errorCode = SocketError.Success;
                // Retrieve the socket from the state object.
                handler = (AbstractTcpSocketClientHandler)ar.AsyncState;
                // Complete sending the data to the remote device.
                int bytesSent = handler.socket.EndSend(ar, out errorCode);
                Trace.WriteLine(string.Format("Complete asynch send. Byte {0} sent.", bytesSent));
            }
            catch (SocketException sex)
            {
                Trace.WriteLine(string.Format("Socket error, the connection will be closed. Error {0} : {1} : {2}", sex.ErrorCode, sex.SocketErrorCode, sex.Message));
                handler.OnCloseConnection(handler);
            }
            catch (Exception ex)
            {
                Trace.WriteLine(string.Format("Exception during asynch send {0}", ex));
            }
            finally
            {
                // Signal that all bytes have been sent.
                handler.sendDone.Set();
            }
        }

        #endregion

        #region Receive Dequeuer

        /// <summary>
        /// Dequeuer. Read the receive queue and raise the receive event for each message
        /// </summary>
        /// <param name="stateObj">The received message</param>
        protected virtual void ReceiveDequeuer()
        {
            while (!this.flagShutdown)
            {
                ReceiveMessageStateObject message = null;
                try
                {
                    message = (ReceiveMessageStateObject)receiveQueue.Unqueue(this.cancellationToken);
                }
                catch (OperationCanceledException ex)
                {
                    Trace.WriteLine("A cancel operation was request. A shutdown will be done.");
                    this.flagShutdown = true;
                }
                if (flagShutdown)
                {
                    break;
                }
                if (message != null)
                {
                    Trace.WriteLine(string.Format("Remove Message {0} Client {1} from the receive queue and raise event", message.Message.MessageUID, message.Message.ClientUID));
                    message.Handler.RaiseReceiveMessageEvent(message);
                }
            }
        }

        #endregion

        /// <summary>
        /// Close the socket.
        /// </summary>
        public virtual void Close()
        {
            if (this.receiveQueue != null)
            {
                this.flagShutdown = true;
                this.cancellationToken.Cancel();
                this.dequeuerThread.Join();
            }
            if (this.socket != null && this.socket.Connected)
            {
                this.socket.Close();
                this.OnCloseConnection(this);
            }
        }

        #region IDisposable Members

        /// <summary>
        /// Close and dispose the socket client handler.
        /// </summary>
        public void Dispose()
        {
            this.Close();
            if (this.receiveQueue != null)
            {
                this.receiveQueue.Dispose();
                this.receiveQueue = null;
            }
            if (this.cancellationToken != null)
            {
                this.cancellationToken.Dispose();
                this.cancellationToken = null;
            }
            if (this.socket != null)
            {
                socket.Dispose();
                socket = null;
            }
        }

        #endregion

        /// <summary>
        /// Print on a string the basic information of the socket client handler - the remote end point.
        /// </summary>
        /// <returns>The string with the information</returns>
        public override string ToString()
        {
            if (this.socket == null)
            {
                return string.Format("{0} with IPEndPoint = none", this.GetType().Name);
            }
            return string.Format("{0} with IPEndPoint = {1}", this.GetType().Name, this.socket.IPEndPoint.ToString());
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Team Leader Mediatech Solutions
Italy Italy
I’m an IT Project Manager for an Italian Betting Company and over the last 2 years I acquired experience in Betting area.
I have developed code in different object oriented languages (C#, C++, Java) for more than 10 years using a set of technology such as .Net, J2EE, multithreading, etc…

Comments and Discussions