Click here to Skip to main content
Click here to Skip to main content
Go to top

Using Asynchronous Sockets for Peer-to-Peer - CollaborativeNotes

, 15 Dec 2007
Rate this:
Please Sign up or sign in to vote.
This article presents a peer-to-peer application sample with asynchronous sockets
Screenshot - Collaborative Notes Repository Server and Clients

Introduction

This project was born from the desire to have some way that would allow for multiple client applications to communicate between each other. These clients would be in a network environment without having a relay server as in IRC.
The goal was to achieve an environment of a collaborating workgroup by sharing a white board where all the elements could write/place their notes and contributions.
As a result, there’s only a server which acts as a peer repository, so that new peers can join a given workgroup and be aware of who is online already, and then all the data is communicated directly from client to client.

Background

In order to better understand this article, the reader should know at least some basic concepts about network/internet connections, either by seeing how other networking programs work and testing their connectivity or from learning about it from other references.
If the reader is knowledgeable about the basics on sockets, then skip right down to the “Using the Code” section.

First of all, all the machines on a network are identified by an address. As most networks exist today, like the internet, this article assumes a TCP/IP network. This means that each machine connected physically to the network has a network address, and for this article, is using what's called IPv4 for the address format on each machine. This means that each machine is identified by a set of four numbers between 0 and 255 separated by dots as in “192.168.2.45”.

Now let's take a look at how these connections work. Basically there are two important steps to communicate data using a network connection:

  1. Connect to the target machine
  2. Send the data through the open channel

It's somewhat similar to writing to a file, you open the file first and then input the data.

With the network connections, you can tweak it to do a lot more than just “connect and send”, given that you can broadcast data or order the connection to target multiple machines at a time. You can also use different ports and standard protocols or create your own protocol and try to use a port of your choice. Each machine has a large number of ports associated to it (around 65536) which are supposed to be used in combination with the network address. In this case the code uses 468xx ports (in example 46800 and 46801).

Despite all those configurations and manipulations you can perform, you have to pick the type of connection to use when communicating. This usually is a choice between TCP and UDP connections. Keeping it simple, TCP will guarantee the delivery of the messages, and UDP won't. Using UDP is like dumping the packets on the line and hoping the client will get them before the packet surpasses the TTL (time to live).

Reviewing so far, to make a simple connection between two machines, you have to focus on the address and port in the connect step and then send the data. The connection should be using either TCP or UDP protocols, and the messages themselves have a TTL (time to live).

Now those two steps listed earlier have corresponding steps on the target machine:

  1. Listen for connections
  2. Receive the data from an active connection

It's simple enough. There is an address for each machine, one listens for connections and the other connects to it using the specified port that's being listened to. After the connection is established, data can be sent from each machine to the other.

Finally, a Socket is what represents the connection between the machines.

It is also recommended that the reader understands what delegates are in C#. In a simplistic definition, a delegate allows to use unknown functions with a specific header declaration for dynamic invocation. Try to run this console application example to illustrate it:

using System;

namespace ConsoleTestLab
{
    public delegate void myDelegate();

    class Program
    {
        static void SayHello()
        {
            System.Console.Write("Hello");
        }
        static void SayWorld()
        {
            System.Console.WriteLine("World!");
        }

        static void Main(string[] args)
        {
            myDelegate function1 = SayHello;
            myDelegate function2 = SayWorld;
            function1.Invoke();
            System.Console.Write(" ");
            function2.Invoke();
        }
    }
}

See the references for more information on delegates.

Using the Code

The basic idea around this example, as well as with many peer-to-peer applications is a simple concept. The network focuses around the existence of peers that communicate between each other, and there has to be at least one starting peer or server for all the peers to connect to. The path chosen to achieve it here was to have a server, a Socket listener which keeps a list of workgroups.

public class CollaborativeNotesRepositoryServer : ICollaborativeNotesServer
{
    ...
    public Dictionary<string, /> > ClientDetailsGroups;
    ...
    public InitState Initialize()
    protected void OnHandleClientConnection(IAsyncResult asyncResult)
    protected void OnHandleClientData(IAsyncResult asyncResult)
    ...
}

Each workgroup represents a peer-to-peer network with its own set of peers present. As the code was developed with this example, it is assumed that there won't be peers in more than a group at a time, though it is not impossible for it to happen with this code architecture.

Since the communication is asynchronous, it is done in several steps and by using callbacks methods to call on the separate threads. First the Socket is initialized, then there’s the BeginAccept which takes an AsyncCallback that will serve to call upon another method to handle the connection in a non-blocking way with a separate thread. Inside that method, the EndAccept must be called so that the connection is established and data can begin to be received by making use of BeginReceive, which takes another AsyncCallback and will trigger the respective method in another thread again, which will call the EndReceive in its turn. The BeginReceive and EndReceive use a byte array as a buffer to store the received data and can be called as many times as needed until the whole message is received. The multiple steps can be seen in the following code:

public InitState Initialize()
{
    ...
    IPHostEntry hostEntry = Dns.GetHostEntry(Dns.GetHostName());
    ...
    IPAddress localAddress = null;
    for (int i = 0; i < hostEntry.AddressList.Length; ++i)
    {
        //IPv4 address
        if (hostEntry.AddressList[i].AddressFamily==AddressFamily.InterNetwork) 
        
            localAddress = hostEntry.AddressList[i];
    }
    ...
    try
    {
        if(mListenerSocket==null)
            mListenerSocket=new Socket(AddressFamily.InterNetwork, 
                SocketType.Stream, ProtocolType.Tcp);
        IPEndPoint localIP = new IPEndPoint(localAddress, mListenPort);
        mListenerSocket.Bind(localIP);
        //the best value to use here depends on each person alone
        mListenerSocket.Listen(50); 
        mListenerSocket.BeginAccept(new AsyncCallback(OnHandleClientConnection), null);
    }
    catch ...
    ...
}

protected void OnHandleClientConnection(IAsyncResult asyncResult)
{
    try
    {
        Socket workerSocket = mListenerSocket.EndAccept(asyncResult);
        try
        {
            TxRxPacket dataStatus = new TxRxPacket(workerSocket);
            workerSocket.BeginReceive(dataStatus.mDataBuffer, 0, 
                dataStatus.mDataBuffer.Length, SocketFlags.None, 
                new AsyncCallback(OnHandleClientData), dataStatus);
        }
        catch ...
        ...

        mListenerSocket.BeginAccept(new AsyncCallback(OnHandleClientConnection), null);
    }
    catch ...
    ...
}

protected void OnHandleClientData(IAsyncResult asyncResult)
{
    try
    {
        TxRxPacket dataStatus = (TxRxPacket)asyncResult.AsyncState;
        int countRx = dataStatus.mCurrentSocket.EndReceive(asyncResult);

        IMessage rxMessage = mMessageParser.ParseMessage(dataStatus.mDataBuffer);

        //handle the message (which can either be register or unregister)
        //send response message if needed : workerSocket.Send(byData);
        switch(rxMessage.Type)
        {
            case ((int)MessageType.RegisterMessage):
            {
                RegisterMessage msg=((RegisterMessage)rxMessage);
                if (!mClientDetailsGroups.ContainsKey(msg.Group))
                    mClientDetailsGroups[msg.Group] = 
                    new Collection< ICollaborativeClientDetails>();

                if(mClientDetailsGroups[msg.Group].IndexOf(msg.Client)>=0)
                    mClientDetailsGroups[msg.Group].Remove(msg.Client);

                //respond with the current group in the message
                RegisteredClientsListMessage response = 
                        new RegisteredClientsListMessage();
                response.Clients = mClientDetailsGroups[msg.Group];
                response.Group = msg.Group;

                //create a socket connection to the newly added client listener port
                Socket clientSocket = new Socket (AddressFamily.InterNetwork, 
                    SocketType.Stream, ProtocolType.Tcp);
                IPAddress remoteMachine = IPAddress.Parse(msg.Client.ClientIPAddress);
                IPEndPoint remoteEndpoint = new IPEndPoint
                    ( remoteMachine, msg.Client.ClientListenPort);
                clientSocket.Connect(remoteEndpoint);
                clientSocket.Send(response.GetMessagePacket());
                //just a minor timeout to be sure the message got there
                clientSocket.Close(1); 
            
                //the socket just lost the purpose of ever existing

                mClientDetailsGroups[msg.Group].Add(msg.Client);

                if (mOnRegisterClient!=null)
                    mOnRegisterClient.Invoke(this, 
                    new ServerRegisterEventArgs(msg.Client));
                break;
            }
            case ((int)MessageType.UnregisterMessage):
            {
                UnregisterMessage  msg=((UnregisterMessage)rxMessage);
                mClientDetailsGroups[msg.Group].Remove(msg.Client);
                //do not contact all others in the same group... 
                //have the client take care of that
                if (mOnRegisterClient != null)
                    mOnRegisterClient.Invoke(this, 
                    new ServerRegisterEventArgs(msg.Client));
                break;
            }
        }
    }
    catch ...
    ...
}

The peer network construction process begins when the repository server receives a RegisterMessage. As it can be seen in that last segment of code, the repository replies to the peer with the current peer list of that group with all the other peers on the same workgroup already, and does it by sending a RegisteredClientsListMessage. After receiving this message with the list of other peers, the client registers itself among all the peers already on the network so that all the other peers become aware of the new peer present on the network.

public void OnHandleClientData(IAsyncResult asyncResult)
{
    try
    {
        TxRxPacket dataStatus = (TxRxPacket)asyncResult.AsyncState;

        dataStatus.mCurrentSocket.EndReceive(asyncResult);
        dataStatus.StoreCurrentData();

        IMessage rxMessage = mMessageParser.ParseMessage(dataStatus.mStoredBuffer);

        if (rxMessage == null)
        {
            //receive the rest of the message
            dataStatus.mCurrentSocket.BeginReceive(dataStatus.mDataBuffer, 0, 
                dataStatus.mDataBuffer.Length, SocketFlags.None, 
                new AsyncCallback(OnHandleClientData), dataStatus);
            return;
        }

        //handle the message
        switch (rxMessage.Type)
        {
            case ((int)MessageType.ResgisteredClientsListMessage):
            {
                Socket workerSocket = (Socket)dataStatus.mCurrentSocket;

                RegisteredClientsListMessage rxClientList = 
                (RegisteredClientsListMessage)rxMessage;
                if (rxClientList.Clients!=null)
                {
                    //register on each of them
                    for (int i = 0; i < rxClientList.Clients.Count; ++i)
                    {
                        mGroupClientsDetails.Add(rxClientList.Clients[i]);
                        ICollaborativeNotesClient client = new CollaborativeNotesClient
                            (mListenPort, rxClientList.Clients[i].ClientIPAddress, 
                            rxClientList.Clients[i].ClientListenPort, mGroup);
                        client.Initialize(); //this will trigger a register message
                        client = null;
                    }
                }
                break;
            }
            …
        }
    }
    catch ...
    ...
}

Thus far, the network starts with the repository server waiting for messages, then the first client shows up and the group and client are registered on the server for the first time, so there are no peers to send back. Then the second peer shows up and there's already the first peer registered on the server, so the server sends the client details for the first one on the list and the second peer registers on the first peer. The third peer will receive the previous two that are registered on the server and does the same registration step on each of them. The process develops in an analogous way, up to any number of peers that join the peer network.

Fig. 1: Activity diagram for the peer registration step.

One weak point to this approach is the gradually increasing message size as the group grows to the order of hundreds or even thousands of peers, where the message will go from something smaller than a kilobyte message to something that can reach a few kilobytes in size. This is one, but not the only reason to have the recursive call to the BeginReceive method. The main reason to call BeginRecieve recursively is in fact to allow peers to send messages of any size between each other, as in the example that happens when communicating images (such as BMP, GIF, JPEG). A very dangerous flaw is present in the code because it is quite simple to trigger infinite loops in the peer applications by injecting messages that are never correctly terminated and are interpreted as valid. So always be careful when implementing communication protocols and parsing mechanisms.

The format for the message themselves here is in a tagged XML-like format. The reason for this choice is to simplify the message parsing by using the System.XML API, namely the XMLDocument to automatically load the message into a DOM like tree, turning the parsing to a tree navigation and extracting the data from the leaves to fill the fields of the message structure. This can be seen with the following example of the RegisterMessage.Parse method:

public bool Parse(Byte[] data)
{
    String messageContent = Encoding.UTF8.GetString(data);

    XmlDocument xmlDoc = new XmlDocument();
    try
    {
        xmlDoc.LoadXml(messageContent);
    }
    catch ...
    {
        ...
    }

    //Validate the incoming message type
    MessageType type = MessageType.EmptyMessage;

    XmlElement messageElement = xmlDoc.DocumentElement;
    if (messageElement.Name == "message")
    {
        foreach (XmlNode node in messageElement.ChildNodes)
        {
            if (node.Name == "type")
            {
                type = (MessageType)Enum.Parse(typeof(MessageType), node.InnerText);
                break;
            }
        }
    }

    if(type!=MessageType.RegisterMessage)
    {
        ...
        return false;
    }

    // The real data parsing
    this.mGroup = "";
    this.mClient = null;
    this.mClient = new CollaborativeClientDetails();

    foreach (XmlNode node in messageElement.ChildNodes)
    {
        if (node.Name == "group")
        {
            this.mGroup = node.InnerText;
        }
        else if (node.Name == "clientdetails")
        {
            foreach (XmlNode detailsNode in node.ChildNodes)
            {
                if(detailsNode.Name=="name")
                {
                    this.mClient.ClientName = detailsNode.InnerText;
                }
                else if (detailsNode.Name == "ipaddress")
                {
                    this.mClient.ClientIPAddress = detailsNode.InnerText;
                }
                else if (detailsNode.Name == "listenport")
                {
                    this.mClient.ClientListenPort = int.Parse(detailsNode.InnerText);
                }
            }
        }
    }

    return true;
}

There are a few message types that were implemented to communicate data between the peer applications, as it was developed here: the TextMessage, the ImageMessage and the StrokeMessage.

The type names are quite self explanatory and the white boards that represent the peers are created exactly to paint them accordingly. The TextMessage has a text value, a font, a color and a position; as for the ImageMessage, it has a position, size and the contents data; and finally the StrokeMessage has essentially a list of points, a stroke width and a color too.

From the peer application's perspective, there are two main objectives it has to comply with. One is to be able to paint the contents of the messages correctly. The other is that the code has to respect the idea that there must be a socket listener and a socket client. The listener on each peer is similar to the repository server in the way that it serves as the fixed point for the others to connect to. Like the repository server, it requires an IPAddress and port to listen to. As for the socket client, it serves to connect and send data to the other peers.
One other detail visible on the code is that after receiving the RegisteredClientsListMessage, the peer will also build a local list of peers besides broadcasting its presence to the other peers in the group in order to be included in their peer list and consequently in their message broadcasts.

One other architecture choice was to make connections as atomic as possible, meaning that there'll only be an active and outgoing connection for as long as there’s a message to send. So for each message to be sent or broadcasted, a client connection will be made, data will be sent and the socket will be closed, closing the connection once the data is sent. This is visible in this piece of code:

public void SendMessageAsync(IMessage message)
{
    mClientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, 
        ProtocolType.Tcp);
    IPAddress remoteMachine = IPAddress.Parse(mServer);
    IPEndPoint remoteEndpoint = new IPEndPoint(remoteMachine, mServerPort);
    try
    {
        mClientSocket.BeginConnect
            ( remoteEndpoint, new AsyncCallback(OnHandleConnection), message );
    }
    catch (Exception ex)
    {
        ...
        return;
    }
}

protected void OnHandleConnection(IAsyncResult asyncResult)
{
    try
    {
        IMessage message=(IMessage)asyncResult.AsyncState;
        mClientSocket.EndConnect(asyncResult);
        Byte[] dataBuffer=message.GetMessagePacket();
        TxRxPacket msgPacket=new TxRxPacket(mClientSocket, dataBuffer.Length);
        msgPacket.mDataBuffer=dataBuffer;
        if (mClientSocket.Connected)
            mClientSocket.BeginSend( msgPacket.mDataBuffer, 0, 
            msgPacket.mDataBuffer.Length, SocketFlags.None, 
            new AsyncCallback(OnHandleSend), msgPacket );
    }
    catch (Exception ex)
    {
        ...
        return;
    }
}

protected void OnHandleSend(IAsyncResult asyncResult)
{
    try
    {
        mClientSocket.EndSend(asyncResult);
        mClientSocket.Close();
        mClientSocket = null;
    }
    catch (Exception ex)
    {
        ...
        return;
    }
}

Summarizing so far, only the most relevant chunks of code have been visible. One was the Parse method from a message implementation. Another was the socket client and listener implementations, the connection creation and data sending code; the connection listening and respective data retrieval from the socket. Even a code snippet where it shows the socket is bidirectional (the example where the peer registers on the repository server and the server replies with the client list using the same socket).

What’s really left to see about the code is all the peer functionality put together. This is the CollaborativeNotesClass which implements the ICollaborativeNotes interface. This object exposes the list of inbound messages received since it was instantiated and the message parser engine in order to allow easy access to add custom messages to the communication protocol, and consequently integrate them into the whole peer communication. It also has fields that represent the details about the repository server, its peer network group name and its listener port. It is in this object that the whole peer logic is aggregated, reducing the peer to only have to keep an instance of this object, and add an event handler in order to be able to stay aware of the arrival of messages in real time on the listener, instead of having some timer peeking on the incoming message list and checking for new ones.

The interface is as follows:

public interface ICollaborativeNotes
{
    int ListenPort
    {
        get;
        set;
    }

    int ServerListenPort
    {
        get;
        set;
    }

    String Server
    {
        get;
        set;
    }

    String Group
    {
        get;
        set;
    }

    Collection<imessage /> InboundMessages
    {
        get;
    }

    IMessageParserEngine MessageParser
    {
        get;
    }

    event OnReceiveMessageDelegate OnReceiveMessage;

    void BroadcastMessage(IMessage message);
    void SendMessage(IMessage message, ICollaborativeClientDetails details);
    void SendMessage(IMessage message, String name);

    void BroadcastMessageAsync(IMessage message);
    void SendMessageAsync(IMessage message, ICollaborativeClientDetails details);
    void SendMessageAsync(IMessage message, String name);

    void Close();
}

The BroadcastMessage and SendMessage methods visible here represent synchronous message sending. On another detail, the implementation for the broadcast methods done here is achieved by simply repeating the SendMessage method once for each peer on the local list. The implementation itself has a few more members though only these are enough to achieve the desired aggregation of functionality.
A final detail about the interface is the Close method. This method performs the last task a peer must perform when closing in order to keep a consistent peer network. This final task is to “unregister” its presence among both the repository server and all the other peer applications. This small step will make sure there won't be wasted threads and connection attempts to the peer leaving the workgroup/peer network.

As final remarks about the sample applications supplied for download, the workgroup is hardcoded in the sample peer application, though the information about the repository server is collected from the app.config file. The same applies for the repository server application here designed where the app.config file holds the port for the server to listen to. Finally there’s also an available sample service coded that is meant to be used as a repository server, though it requires a special installation to get it working on the system.

Conclusion

This article presents an approach on how to use asynchronous sockets and also a couple of concepts on how to set up a peer-to-peer network. As the learning example that it represents, it has inherent faults left on purpose so that the reader who cares to develop with this code as base will try to learn more about the techniques and the technology enough to at least fix them.

The article also presents some architecture concepts that make the code more modular and easier to develop. For example, the existence of message types, or also forcing the basic IMessage interface to have the Parse and Clone methods, or to fit the message instances in a parser engine mechanism, and finally place it all working with socket clients and listeners. All together this achieves the real time interaction achieved with the peer applications developed for the article, the “SampleWhiteBoard” application.

Points of Interest

Safe threading for user interface updates is something which is very difficult to achieve or even keep track of with this kind of multi-threaded environment. This makes it quite hard to update or synchronize the user interface from within child threads such as the threads generated for asynchronous communication. As such, delegates and events are used to make it as doable as possible to achieve the desired result.

There are other networking objects that can be used, such as the TCPListener and TCPClient on the .NET Framework 2.0, and with the arrival of .NET Framework 3.5, more objects are sure to appear with a similar ability to communicate across a network such as will be with pipes.

References

History

  • December 9th, 2007 - Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

Filipe Pereira
Architect Safira
Portugal Portugal
Graduated the course Lincenciatura em Engenharia Informatica e Computadores at Instituto Superior Tecnico de Lisboa.
Worked as IT Technician and Manager of a small team while giving support to very small business offices.
Worked sa a Senior Developer at Markdata, a software house in Portugal.
Currently as Senior Software Engineer at Safira Consultoria S.A. in Portugal.
Follow on   LinkedIn

Comments and Discussions

 
GeneralMy vote of 5 PinmemberMember 432084412-Jun-13 15:09 
GeneralTextMessage always has Position = {-1,-1} PinmemberHugeHugh12-Nov-09 8:14 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web04 | 2.8.140916.1 | Last Updated 15 Dec 2007
Article Copyright 2007 by Filipe Pereira
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid