Click here to Skip to main content
Click here to Skip to main content
Technical Blog

Token Based Asynchronous Socket Server On .NET 4

, 19 Mar 2012 CPOL
Rate this:
Please Sign up or sign in to vote.
A token based server, which means that the server and the client will use token exchange as a handshake to confirm that all of the messages were correctly delivered.

Even though there are exist many technologies and approaches for network communication, such as Windows Communication Foundation or Web Servers with HTTP POST/GET requests, some tasks still require using of Network TCP Sockets. I’ve got a chance to work with sockets a lot recently and one of my tasks was intermediate socket service which  provides communication between automatic robot and client software. The first and naïve solution was the following:

  1. Client send command
  2. Socket server get command and start executing it
  3. Once command execution is  done, send response to client

There is nothing wrong with this solution except of just one thing: the time between steps 2 and 3 could be really big, up to several minutes. Socket server was written on .Net 4, client was written on Ruby 1.9. For unknown reasons (after some internet research it happened that this is Ruby bug) when waiting time exceeds ~1 minute, client never gets server response, even though server clearly has sent it.

Here I’ll show one of possible solutions which eliminates this problem and adds some flexibility to solution in general. We will use token based server, which means that the server and the client will use token exchange as a handshake to confirm that all of the messages were correctly delivered.

The basis for Server Socket could be found on the following MSDN articles:

  1. Using an Asynchronous Server Socket
  2. Asynchronous Server Socket Example

We will use that and will build up more functionality.

Our scenario will be the following:

  1. Client initiates connection by sending the command with token for that command
  2. Server gets the command, stores token for that command in local token cache and sends response immediately
  3. On a separate thread server executes the command
  4. Client sends requests with “check status” command and token
  5. If command has not finished yet, server replies that this token is currently “busy”
  6. Once command is finished, server replies “finished” for “check status” request
  7. Client should confirm that command is finished, server cleans up token cache after confirmation

image

The code that I’m providing here is just an example, prototype, and has some restrictions:

  1. Only one command could be set up in token cache
  2. If client will send another command without prior confirmation for previous command, it will be ignored by server and previous token status will be sent as a response
  3. It will support dummy commands, such as “say hi” or “say bye”, but it is sufficient for the demo purposes

We will use JSON as an exchange protocol and an awesome JSON library for .Net: http://json.codeplex.com/

Server code is written in C++/CLI and using CLR classes. Easily portable to C# (just a matter of syntax in this case)

I. Define the command:

Commands.h
/// <summary>
/// Enumeration with supported commands
/// </summary>
public enum class CommandsList
{
    NOT_INITIALIZED = 0,
    INIT,
    SAY_HI,
    SAY_BYE,
    CHECK_STATUS,
    CONFIRM,

    UNSUPPORTED = -1
};

/// <summary>
/// Serializable to JSON command with token
/// </summary>
public ref class Command
{
public:

    Command();

    #pragma region Utils

    /// <summary>
    /// Converts string to commands enum
    /// </summary>
    static CommandsList StringToCommand(String ^command);
    /// <summary>
    /// Checks if command is valid to be processed
    /// </summary>
    static bool IsValid(CommandsList cmd);
    
#pragma endregion

    #pragma region Public Properties

    /// <summary>
    /// Command name
    /// </summary>
    [JsonProperty]
    property String ^command;
    /// <summary>
    /// Token assosiated with the command
    /// </summary>
    [JsonProperty]
    property String ^token;

    #pragma endregion

private:

    #pragma region Private Members

    /// <summary>
    /// Stores assosiations string - commands enum
    /// </summary>
    static Hashtable ^commandsMap;

    #pragma endregion
};
Commands.cpp
/// Fill up the commands hashtable with name - enum assosiations
Command::Command()
{
    #pragma region Command name to enum mapping
    
    commandsMap = gcnew Hashtable();    
    commandsMap["init"] = CommandsList::INIT;
    commandsMap["say_hi"] = CommandsList::SAY_HI;
    commandsMap["say_bye"] = CommandsList::SAY_BYE;
    commandsMap["check_status"] = CommandsList::CHECK_STATUS;
    commandsMap["confirm"] = CommandsList::CONFIRM;

    #pragma endregion
}

/// If command name was not found in the hashtable, return unsupported
CommandsList Command::StringToCommand(String ^command)
{
    if (commandsMap->ContainsKey(command))
    {
        return safe_cast<CommandsList>(commandsMap[command]);
    }
    else
    {
        return CommandsList::UNSUPPORTED;
    }
}

/// Command not valid if it is unsupported on not initialized
bool Command::IsValid(CommandsList cmd)
{
    if ((CommandsList::NOT_INITIALIZED != cmd) &&
        (CommandsList::UNSUPPORTED) != cmd)
    {
        return true;
    }
    return false;
}

II. Define the commands processor:

CommandsProcessor.h
/// <summary>
/// Class responsible for every command process
/// </summary>
public ref class CommandsProcessor
{
public:

    CommandsProcessor();

    /// <summary>
    /// Chooses which command to process
    /// </summary>
    /// <param name="command">Command object</param>
    /// <param name="tokenCache">Token cache object</param>
    void ProcessCommand(Command ^command, TokenCache^ tokenCache);

private:
    
    #pragma region Every Command Processor

    /// Individual processors for every command

    bool _isInitialized;
    void ProcessInit(Command ^command, TokenCache^ tokenCache);

    void ProcessHi(Command ^command, TokenCache^ tokenCache);
    void ProcessBye(Command ^command, TokenCache^ tokenCache);
    
    #pragma endregion

    #pragma region Exceptions Handlers

    /// Handlers for exceptions to store and to include in service reply

    void HandleManagedException(Exception^ exception);
    void EmptyExceptionMessage();

    ServiceException^ _exception;

    #pragma endregion
};
CommandsProcessor.cpp
/// Constructor
CommandsProcessor::CommandsProcessor()
{
    _isInitialized = false;
    _exception = gcnew ServiceException();
}

/// Process command, emulate processing for this demo
void CommandsProcessor::ProcessCommand(Command ^command, TokenCache ^tokenCache)
{
    //TODO: EMULATION ONLY
    Console::WriteLine("--SLEEP--\n");
    Thread::Sleep(10000);
    Console::WriteLine("--WAKE UP--\n");
    /////////////////

    /// Parse command and process it
    CommandsList cmd = command->StringToCommand(command->command);
    switch (cmd)
    {
    case CommandsList::INIT:
        {
            ProcessInit(command, tokenCache);
            break;
        }
    case CommandsList::SAY_HI:
        {
            ProcessHi(command, tokenCache);
            break;
        }
    case CommandsList::SAY_BYE:
        {
            ProcessBye(command, tokenCache);
            break;
        }
    case CommandsList::CHECK_STATUS:
    case CommandsList::CONFIRM:
    default:
        {
            break;
        }
    }

    /// We already caught all exceptions in every command processor
    EmptyExceptionMessage();
}

#pragma region Every Command Processor

    #pragma region Init
void CommandsProcessor::ProcessInit(Command ^command, TokenCache ^tokenCache)
{
    try
    {
        /// TODO: Insert specific processing here
        _isInitialized = true;
    }
    catch (Exception ^exception)
    {
        /// Store exception in order to include it in report
        HandleManagedException(exception);
    }

    /// Form JSON formatted reply
    JObject ^o = gcnew JObject(
        gcnew JProperty("command", command->command),
        gcnew JProperty("exception_message", _exception->_exceptionMessage),
        gcnew JProperty("is_init", _isInitialized));

    /// Set status and report in the token cache
    tokenCache->SetTokenResponse(command, o);
    tokenCache->SetTokenStatus(command, 
                               (_exception->_isException) ? 
                                   TOKEN_ABORTED : TOKEN_SUCCESS,
                               true);
    tokenCache->WriteTokenFile(tokenCache->GetToken(command));

}
#pragma endregion

    #pragma region Hi
void CommandsProcessor::ProcessHi(Command ^command, TokenCache ^tokenCache)
{
    try
    {
        /// TODO: Insert specific processing here
    }
    catch (Exception ^exception)
    {
        /// Store exception in order to include it in report
        HandleManagedException(exception);
    }

    /// Form JSON formatted reply
    JObject ^o = gcnew JObject(
        gcnew JProperty("command", command->command),
        gcnew JProperty("exception_message", _exception->_exceptionMessage),
        gcnew JProperty("message", "Hi!"));

    /// Set status and report in the token cache
    tokenCache->SetTokenResponse(command, o);
    tokenCache->SetTokenStatus(command, 
                               (_exception->_isException) ? 
                                   TOKEN_ABORTED : TOKEN_SUCCESS,
                               true);
    tokenCache->WriteTokenFile(tokenCache->GetToken(command));

}
#pragma endregion

    #pragma region Bye
void CommandsProcessor::ProcessBye(Command ^command, TokenCache ^tokenCache)
{
    try
    {
        /// TODO: Insert specific processing here
    }
    catch (Exception ^exception)
    {
        /// Store exception in order to include it in report
        HandleManagedException(exception);
    }

    /// Form JSON formatted reply
    JObject ^o = gcnew JObject(
        gcnew JProperty("command", command->command),
        gcnew JProperty("exception_message", _exception->_exceptionMessage),
        gcnew JProperty("message", "Bye!"));

    /// Set status and report in the token cache
    tokenCache->SetTokenResponse(command, o);
    tokenCache->SetTokenStatus(command, 
                               (_exception->_isException) ? 
                                   TOKEN_ABORTED : TOKEN_SUCCESS,
                               true);
    tokenCache->WriteTokenFile(tokenCache->GetToken(command));

}
#pragma endregion

#pragma endregion

#pragma region Exceptions Handlers

/// Store exception in order to include it in report
void CommandsProcessor::HandleManagedException(Exception^ exception)
{
    _exception->_isException = true;
    _exception->_exceptionMessage = gcnew String(exception->Message);
}

/// Clean up stored exception
void CommandsProcessor::EmptyExceptionMessage()
{
    _exception->_isException = false;
    _exception->_exceptionMessage = String::Empty;
}

#pragma endregion

Here we use helper class to store exceptions:

Exceptions.h
/// <summary>
/// Exceptions helper: stores exception to include in reply
/// </summary>
public ref class ServiceException
{
public:
    ServiceException() : 
        _isException(false),
        _exceptionMessage(nullptr) 
    { }

    bool _isException;
    String^ _exceptionMessage;
};

III. Define the local Token Cache for storing current command’s status and info.

This class stores token status in memory and on file system for emergency cases (power off computer without prior confirmation for token for example).

TokenCache.h
/// <summary>
/// Enumeration with token statuses
/// </summary>
public enum TokenStatuses
{
    TOKEN_BUSY = 0,
    TOKEN_SUCCESS,
    TOKEN_ABORTED,
    TOKEN_BAD_COMMAND = -1
};

/// <summary>
/// Helper class: converts token status from enum to string
/// </summary>
public ref class TokenConverter
{
public:
    static String^ TokenStatusToString(TokenStatuses status);
};

/// <summary>
/// Serializable to JSON token status
/// </summary>
public ref class TokenStatus
{
public:

    #pragma region Public Properties

    /// <summary>
    /// Token assosiated with the command
    /// </summary>
    [JsonProperty]
    property String ^token;
    /// <summary>
    /// Token status
    /// </summary>
    [JsonProperty]
    property String ^status;
    /// <summary>
    /// Is command processed?
    /// </summary>
    [JsonProperty]
    property bool finished;
    /// <summary>
    /// JSON response for current command
    /// </summary>
    [JsonProperty]
    property JObject ^response;

    #pragma endregion

    TokenStatus(String ^_token);
};

/// <summary>
/// Token Cache class responsible for Set/Get/Remove current token
/// Limitations of this version: only one token could be set up
/// But easily extendable to unlimited nuber of tokens
/// </summary>
public ref class TokenCache
{
public:

    #pragma region Token Operations
    
    /// Token operations: Set/Get/Remove

    static TokenStatus ^GetToken(Command^ cmd);
    static bool SetToken(Command^ cmd, TokenStatus^ token);
    static bool SetTokenStatus(Command^ cmd, TokenStatuses status, bool finished);
    static bool SetTokenResponse(Command^ cmd, JObject ^response);
    static bool RemoveToken(Command^ cmd);

    /// Store last operation in a token file for emergency cases

    static bool CheckTokenFile([Out] TokenStatus ^%token);
    static void WriteTokenFile(TokenStatus ^token);
    static void DeleteTokenFile();

    #pragma endregion

private:

    #pragma region Private Members

    /// <summary>
    /// Stores assosiations token - status
    /// </summary>
    static Dictionary<String^, TokenStatus^> ^_tokens 
        = gcnew Dictionary <String^, TokenStatus^>();

    /// If new token came, check for previous token
    static String^ prevToken = String::Empty;

    #pragma endregion
};
TokenCache.cpp
/// Token status constructor, token is busy by default
TokenStatus::TokenStatus(String^ _token)
{
    token = _token;
    status = TokenConverter::TokenStatusToString(TOKEN_BUSY);
    finished = false;
    response = nullptr;
}

/// Gets token
/// Locks tokens dictionary
/// If previous token is not empty, use it and ignore new token
TokenStatus^ TokenCache::GetToken(Command^ cmd)
{
    TokenStatus ^t = nullptr;

    System::Threading::Monitor::Enter(_tokens);
    try
    {
        if (String::Empty == prevToken)
        {
            _tokens->TryGetValue(cmd->token, t);
        }
        else
        {
            _tokens->TryGetValue(prevToken, t);
        }
    }
    finally
    {
        System::Threading::Monitor::Exit(_tokens);
    }

    return t;
}

/// Removes token
/// Locks tokens dictionary
/// Removes current token from cache if it finished
bool TokenCache::RemoveToken(Command^ cmd)
{
    bool res = false;
    TokenStatus ^t = nullptr;

    System::Threading::Monitor::Enter(_tokens);
    try
    {
        if (String::Empty == prevToken)
        {
            if (_tokens->TryGetValue(cmd->token, t))
            {
                if (t->finished)
                {
                    _tokens->Remove(cmd->token);
                    res = true;
                }
            }
        }
        else
        {
            if (_tokens->TryGetValue(prevToken, t))
            {
                if (t->finished)
                {
                    _tokens->Remove(prevToken);
                    prevToken = String::Empty;
                    res = true;
                }
            }
        }
    }
    finally
    {
        System::Threading::Monitor::Exit(_tokens);
    }

    return res;
}

/// Sets token
/// Locks tokens dictionary
/// If no current tokens in progress, sets new token
bool TokenCache::SetToken(Command^ cmd, TokenStatus^ token)
{
    bool res = false;

    System::Threading::Monitor::Enter(_tokens);
    try
    {
        if (String::Empty == prevToken)
        {
            if (!_tokens->ContainsKey(cmd->token))
            {
                _tokens[cmd->token] = token;
                prevToken = cmd->token;
                res = true;
            }
        }
    }
    finally
    {
        System::Threading::Monitor::Exit(_tokens);
    }

    return res;
}

/// Sets status for current token
/// Locks tokens dictionary
bool TokenCache::SetTokenStatus(Command^ cmd, TokenStatuses status, bool finished)
{
    bool res = false;

    System::Threading::Monitor::Enter(_tokens);
    try
    {
        TokenStatus ^t = nullptr;
        if (_tokens->TryGetValue(cmd->token, t))
        {
            t->status = TokenConverter::TokenStatusToString(status);
            t->finished = finished;
            _tokens[cmd->token] = t;
            res = true;
        }
    }
    finally
    {
        System::Threading::Monitor::Exit(_tokens);
    }

    return res;
}

/// Sets response for current token
/// Locks tokens dictionary
bool TokenCache::SetTokenResponse(Command^ cmd, JObject ^response)
{
    bool res = false;

    System::Threading::Monitor::Enter(_tokens);
    try
    {
        TokenStatus ^t = nullptr;
        if (_tokens->TryGetValue(cmd->token, t))
        {
            t->response = response;
            res = true;
        }
    }
    finally
    {
        System::Threading::Monitor::Exit(_tokens);
    }

    return res;
}

/// Recovers token from isolated file cache
bool TokenCache::CheckTokenFile([Out] TokenStatus ^%token)
{
    bool fileExists = false;
    token = gcnew TokenStatus(String::Empty);

    try
    {
        IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
        IsolatedStorageFileStream ^isoStream = 
           gcnew IsolatedStorageFileStream("token.last", FileMode::Open, isoFile);

        String ^tokenData = nullptr;
        try
        {
            StreamReader ^sr = gcnew StreamReader(isoStream);
            try
            {
                tokenData = sr->ReadToEnd();
            }
            finally
            {
                delete sr;
            }
            fileExists = true;
        }
        catch (...) { }
        if (!String::IsNullOrEmpty(tokenData))
        {
            try
            {
                token = JsonConvert::DeserializeObject<TokenStatus^>(tokenData);
            }
            catch (...) { }
        }
        isoFile->Close();
    }
    catch (...) { }

    return fileExists;
}

/// Backups token to isolated file cache
void TokenCache::WriteTokenFile(TokenStatus ^token)
{
    try
    {
        IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
        IsolatedStorageFileStream ^isoStream = gcnew IsolatedStorageFileStream(
                  "token.last", FileMode::Create, FileAccess::Write, isoFile);

        try
        {
            JsonSerializer ^serializer = gcnew JsonSerializer();
            StreamWriter ^sw = gcnew StreamWriter(isoStream);
            try
            {
                JsonWriter ^writer = gcnew JsonTextWriter(sw);
                try
                {
                    serializer->Serialize(writer, token);
                }
                finally
                {
                    delete writer;
                }
            }
            finally
            {
                delete sw;
            }
        }
        catch (...) { }
        delete isoFile;
        isoFile->Close();
    }
    catch (...) { }
}

/// Cleanups isolated tokens file
void TokenCache::DeleteTokenFile()
{
    try
    {
        IsolatedStorageFile ^isoFile = IsolatedStorageFile::GetMachineStoreForAssembly();
        isoFile->DeleteFile("token.last");
        delete isoFile;
        isoFile->Close();
    }
    catch(...) { }

}

/// Converts token status to string
String^ TokenConverter::TokenStatusToString(TokenStatuses status)
{
    String^ retStr = String::Empty;

    switch(status)
    {
    case TOKEN_BUSY:
        retStr = "busy";
        break;
    case TOKEN_SUCCESS:
        retStr = "success";
        break;
    case TOKEN_ABORTED:
        retStr = "aborted";
        break;
    case TOKEN_BAD_COMMAND:
    default:
        retStr = "bad_command";
        break;
    }

    return retStr;
}

IV. Finally, define threaded commands processor and Socket Server itself:

SoketServer.h
/// <summary>
/// Threaded object responsible for just one command processing
/// </summary>
public ref class SingleCommandProcessor
{
public:

    /// <summary>
    /// Constructor
    /// </summary>
    /// <param name="currCmd">Command to process</param>
    /// <param name="tokenCache">Token cache object</param>
    /// <param name="commandsProcessor">Commands processor object</param>
    SingleCommandProcessor(
        Command^ currCmd,
        TokenCache ^tokenCache, 
        CommandsProcessor ^commandsProcessor) :    
            _currCmd(currCmd),
            _tokenCache(tokenCache),
            _commandsProcessor(commandsProcessor)
    {}

    /// <summary>
    /// Thread function to process current command
    /// </summary>
    void DoProcess();

private:
    Command ^_currCmd;
    TokenCache ^_tokenCache;
    CommandsProcessor ^_commandsProcessor;
};

/// <summary>
/// State object for reading client data asynchronously
/// </summary>
public ref class StateObject
{
public:
    StateObject();

    Socket ^workSocket;
    literal int BufferSize = 1024;
    array<Byte> ^buffer;
    StringBuilder ^sb;

private:
    bool _initialized;
    void InitializeInstanceFields();
};

/// <summary>
/// Socket Server exactly as described on MSDN articles:
///
/// Using an Asynchronous Server Socket: 
/// http://msdn.microsoft.com/en-us/library/5w7b7x5f.aspx
///
/// Asynchronous Server Socket Example
/// http://msdn.microsoft.com/en-us/library/fx6588te.aspx
/// </summary>
public ref class SocketServer
{
public:
    SocketServer(String ^ipAddress, int port);

    void Run(String ^ipAddress, int port, int backlog);
    void Stop();

private:
    void AcceptCallback(IAsyncResult ^ar);
    void ReceiveCallback(IAsyncResult ^ar);
    void SendCallback(IAsyncResult ^ar);

    SocketPermission ^_permission;
    Socket ^_sListener;
    static ManualResetEvent ^allDone = gcnew ManualResetEvent(false);

    CommandsProcessor ^_commandsProcessor;
    TokenCache ^_tokenCache;
};

The code for Asynchronous Callbacks is the same here as in MSDN examples (links are above) except of Receive Callback part. The logic for receive callback is the following:

  1. Parse command
  2. If valid command check for tokens
  3. If token not finished => reply with current token
  4. If no current tokens => reply with new token
SocketServer.cpp
/// State object
StateObject::StateObject()
{
    InitializeInstanceFields();
}

/// Initialize state object
void StateObject::InitializeInstanceFields()
{
    if ( !_initialized)
    {
        buffer = gcnew array<Byte>(BufferSize);
        sb = gcnew StringBuilder();

        _initialized = true;
    }
}

/// Thread function for single command processing
void SingleCommandProcessor::DoProcess()
{
    _commandsProcessor->ProcessCommand(_currCmd, _tokenCache);
}

/// Socket Server constructor
SocketServer::SocketServer(String ^ipAddress, int port)
{
    _permission = gcnew SocketPermission(PermissionState::Unrestricted);
    _commandsProcessor = gcnew CommandsProcessor();
    _tokenCache = gcnew TokenCache();

    _sListener = nullptr;
}

/// Run Socket Server
void SocketServer::Run(String ^ipAddress, int port, int backlog)
{
    _permission->Demand();

    IPAddress ^ipAddr = IPAddress::Parse(ipAddress);
    IPEndPoint ^ipEndPoint = gcnew IPEndPoint(ipAddr, port);

    _sListener = gcnew Socket(ipAddr->AddressFamily, SocketType::Stream, ProtocolType::Tcp);
    _sListener->Bind(ipEndPoint);
    _sListener->Listen(backlog);

    while (true) {
        allDone->Reset();

        Console::WriteLine("Waiting for a connection on port "+ 
            Convert::ToString(ipEndPoint->Address) + ":" + 
            Convert::ToString(ipEndPoint->Port));

        AsyncCallback ^aCallback = gcnew AsyncCallback(this, &SocketServer::AcceptCallback);
        _sListener->BeginAccept(aCallback, _sListener);

        allDone->WaitOne();
    }
}

/// Stop Socket Server
void SocketServer::Stop()
{
    if (_sListener->Connected)
    {
        _sListener->Shutdown(SocketShutdown::Both);
        _sListener->Close();
    }
}

/// Accept incoming socket connection
void SocketServer::AcceptCallback(IAsyncResult ^ar)
{
    Socket ^listener = nullptr;
    Socket ^handler = nullptr;

    allDone->Set();

    listener = safe_cast<Socket^>(ar->AsyncState);
    handler = listener->EndAccept(ar);

    StateObject ^state = gcnew StateObject();
    state->workSocket = handler;
    handler->BeginReceive(
        state->buffer, 
        0, 
        StateObject::BufferSize, 
        SocketFlags::None, 
        gcnew AsyncCallback(this, &SocketServer::ReceiveCallback), 
        state);
}

/// Recieve incoming socket data
void SocketServer::ReceiveCallback(IAsyncResult ^ar)
{
    bool startNewProcessingThread = false;

    StateObject ^state = safe_cast<StateObject^>(ar->AsyncState);
    Socket ^handler = state->workSocket;

    String ^content = String::Empty;
    String ^response = String::Empty;

    int bytesRead = handler->EndReceive(ar);
    String ^newLine = Encoding::ASCII->GetString(state->buffer,0,bytesRead);
    if (bytesRead > 0)
    {
        state->sb->Append(newLine);
        handler->BeginReceive(
            state->buffer,
            0,
            StateObject::BufferSize, 
            SocketFlags::None, 
            gcnew AsyncCallback(this, &SocketServer::ReceiveCallback), 
            state);
    }
    if (newLine->Contains("<end_of_message>"))
    {
        if (state->sb->Length > 0)
        {
            Command ^command;
            content = state->sb->ToString();
            
            Console::WriteLine("Read "+ 
                Convert::ToString(content->Length) + 
                "bytes from client.\n Data: " + 
                content + "\n");
            
            /*
                1) Parse command
                2) If valid command check for tokens
                3) If token not finished => reply with current token
                4) If no current tokens => reply with new token
            */

            CommandsList cmd = CommandsList::NOT_INITIALIZED;
            try
            {
                /// Get JSON string from received message
                String ^json = content->Substring(0, content->LastIndexOf("}") + 1);
                /// Deserialize to command
                command = JsonConvert::DeserializeObject<Command^>(json);
                /// Convert command name to commands enum
                cmd = command->StringToCommand(command->command);
            }
            catch (...) { }

            /// Check if received command is valid
            if (Command::IsValid(cmd))
            {
                TokenStatus ^t;

                /// Check if we have file-cached tokens
                if(!_tokenCache->CheckTokenFile(t))
                {
                    /// Get token from token cache if no file-cached tokens
                    t = _tokenCache->GetToken(command);
                }

                // If we have cached token
                if (nullptr != t)
                {
                    /// If command is confirmation, remove token from cache
                    if (CommandsList::CONFIRM == cmd)
                    {
                        bool tokenDone = _tokenCache->RemoveToken(command);
                        if (tokenDone)
                        {
                            _tokenCache->DeleteTokenFile();
                        }                            

                        /// Reply with status of confirmation
                        response = (gcnew JObject(
                            gcnew JProperty("status", 
                                TokenConverter::TokenStatusToString(
                                    (tokenDone ? TOKEN_SUCCESS : TOKEN_ABORTED)))))->ToString();
                    }
                    /// Serialize response from token cache
                    else
                    {
                        response = 
                          JsonConvert::SerializeObject(t, 
                          Newtonsoft::Json::Formatting::Indented);
                    }
                }
                /// Create new token if we don't have any
                else
                {
                    t = gcnew TokenStatus(command->token);
                    _tokenCache->SetToken(command, t);
                    startNewProcessingThread = true;
                    response = JsonConvert::SerializeObject(t, Newtonsoft::Json::Formatting::Indented);
                }
            }
            else
            {
                /// Got unsupported command, reply
                JObject ^o = gcnew JObject(
                    gcnew JProperty("status", TokenConverter::TokenStatusToString(TOKEN_BAD_COMMAND)));

                response = o->ToString();
            }

            array<Byte> ^byteData = Encoding::Unicode->GetBytes(response);

            Console::WriteLine("Sending: \n" + 
                response + "\n");
            
            handler->BeginSend(
                byteData, 
                0, 
                byteData->Length, 
                SocketFlags::None, 
                gcnew AsyncCallback(this, &SocketServer::SendCallback), 
                handler);
            
            state->sb->Clear();

            /// If we didn't have active tokens and got a new command
            /// Process it in a separate thread
            if (startNewProcessingThread)
            {
                SingleCommandProcessor^ threadWork = 
                    gcnew SingleCommandProcessor(command, _tokenCache, _commandsProcessor);
                Thread^ newThread = gcnew Thread( gcnew ThreadStart( 
                            threadWork, &SingleCommandProcessor::DoProcess ) );
                newThread->Start();
            }
        }
    }
}

/// Send data to client
void SocketServer::SendCallback(IAsyncResult ^ar)
{
    Socket ^handler = safe_cast<Socket^>(ar->AsyncState);
    int bytesSend = handler->EndSend(ar);

    Console::WriteLine("Sent "+ 
        Convert::ToString(bytesSend) + 
        "bytes to client." + "\n");
    
    handler->Disconnect(true);
}

V. To test server functionality there is a simplest client written in C#:

Client.cs
/// <summary>
/// Serializable to JSON command with token
/// </summary>
public class Command
{
    [JsonProperty]
    public String command;

    [JsonProperty]
    public String token;
};

class Client
{
    static void Main(string[] args)
    {
        PrintHelp();

        byte[] bytes = new byte[1024];
        string line = String.Empty;

        Command cmd = new Command();
        
        /// Read user command to send to server
        while (true)
        {
            line = Console.ReadLine();
            if (line == "exit")
            {
                break;
            }

            int option = Int32.Parse(line);
            switch (option)
            {
                case 0:
                    cmd.command = "check_status";
                    break;
                case 1:
                    cmd.command = "init";
                    cmd.token = GenerateToken();
                    break;
                case 2:
                    cmd.command = "say_hi";
                    cmd.token = GenerateToken();
                    break;
                case 3:
                    cmd.command = "say_bye";
                    cmd.token = GenerateToken();
                    break;
                case 4:
                    cmd.command = "confirm";
                    break;
                default:
                    PrintHelp();
                    cmd.command = "echo";
                    break;
            }

            try
            {
                SocketPermission permission = new SocketPermission(PermissionState.Unrestricted);
                permission.Demand();
                IPAddress ipAddr = IPAddress.Parse("127.0.0.1");
                IPEndPoint ipEndPoint = new IPEndPoint(ipAddr, 8081);
                Socket sender = new Socket(
                    ipAddr.AddressFamily,
                    SocketType.Stream,
                    ProtocolType.Tcp
                    );

                sender.Connect(ipEndPoint);
                Console.WriteLine("Socket connected to {0}",
                    sender.RemoteEndPoint.ToString());

                /// Serialize command
                string theMessage = JsonConvert.SerializeObject(cmd, Formatting.Indented);
                Console.WriteLine(theMessage + "\n");

                /// Add end of message and send
                byte[] msg = Encoding.ASCII.GetBytes(theMessage + "<end_of_message>");
                int bytesSend = sender.Send(msg);

                /// Get response
                int bytesRec = sender.Receive(bytes);
                theMessage = Encoding.Unicode.GetString(bytes, 0, bytesRec);

                while (sender.Available > 0)
                {
                    bytesRec = sender.Receive(bytes);
                    theMessage += Encoding.Unicode.GetString(bytes, 0, bytesRec);
                }

                Console.WriteLine("The server reply: {0}\n", theMessage);

                sender.Shutdown(SocketShutdown.Both);
                sender.Close();
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception: {0}", ex.Message);
            }
        }
    }

    /// <summary>
    /// Generates dummy token
    /// </summary>
    /// <returns>Token string</returns>
    private static string GenerateToken()
    {
        ASCIIEncoding encoding = new ASCIIEncoding();
        byte[] token = encoding.GetBytes(
            DateTime.Now.Hour.ToString() +
            DateTime.Now.Minute.ToString() +
            DateTime.Now.Second.ToString() +
            DateTime.Now.Millisecond.ToString());
        return Convert.ToBase64String(token, 0, token.Length);
    }

    /// <summary>
    /// Prints console help
    /// </summary>
    private static void PrintHelp()
    {
        Console.WriteLine("1 : init");
        Console.WriteLine("2 : say_hi");
        Console.WriteLine("3 : say_bye");
        Console.WriteLine("4 : confirm");
        Console.WriteLine("0 : check_status");
        Console.WriteLine();
    }
}

The advantages of the following approach::

  1. We can guarantee that both client and server always get responses from each other
  2. Server is designed with potential support to process several commands simultaneously: each command has it’s own thread and main thread is processing incoming connections
  3. Using token-based approach we can guarantee that server is answering to required command – we cannot mess up responses
  4. Again thanks to token-based approach we can implement server polling from client side in order to track last command status
  5. None of responses if lost

As usual the Visual Studio solution and code files could be found on my public SVN: http://subversion.assembla.com/svn/max-s-blog-posts/ (SocketServerWithTokens folder).

Hope somebody will find this useful. Thanks and happy socket coding. Max.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

bovykinmaxim

United States United States
No Biography provided

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Terms of Use | Mobile
Web03 | 2.8.150123.1 | Last Updated 19 Mar 2012
Article Copyright 2012 by bovykinmaxim
Everything else Copyright © CodeProject, 1999-2015
Layout: fixed | fluid