Introduction
This project was born from the desire to have some way that would allow for multiple client applications to communicate between each other. These clients would be in a network environment without having a relay server as in IRC.
The goal was to achieve an environment of a collaborating workgroup by sharing a white board where all the elements could write/place their notes and contributions.
As a result, there’s only a server which acts as a peer repository, so that new peers can join a given workgroup and be aware of who is online already, and then all the data is communicated directly from client to client.
Background
In order to better understand this article, the reader should know at least some basic concepts about network/internet connections, either by seeing how other networking programs work and testing their connectivity or from learning about it from other references.
If the reader is knowledgeable about the basics on sockets, then skip right down to the “Using the Code” section.
First of all, all the machines on a network are identified by an address. As most networks exist today, like the internet, this article assumes a TCP/IP network. This means that each machine connected physically to the network has a network address, and for this article, is using what's called IPv4 for the address format on each machine. This means that each machine is identified by a set of four numbers between 0 and 255 separated by dots as in “192.168.2.45”.
Now let's take a look at how these connections work. Basically there are two important steps to communicate data using a network connection:
- Connect to the target machine
- Send the data through the open channel
It's somewhat similar to writing to a file, you open the file first and then input the data.
With the network connections, you can tweak it to do a lot more than just “connect and send”, given that you can broadcast data or order the connection to target multiple machines at a time. You can also use different ports and standard protocols or create your own protocol and try to use a port of your choice. Each machine has a large number of ports associated to it (around 65536) which are supposed to be used in combination with the network address. In this case the code uses 468xx ports (in example 46800 and 46801).
Despite all those configurations and manipulations you can perform, you have to pick the type of connection to use when communicating. This usually is a choice between TCP and UDP connections. Keeping it simple, TCP will guarantee the delivery of the messages, and UDP won't. Using UDP is like dumping the packets on the line and hoping the client will get them before the packet surpasses the TTL (time to live).
Reviewing so far, to make a simple connection between two machines, you have to focus on the address and port in the connect step and then send the data. The connection should be using either TCP or UDP protocols, and the messages themselves have a TTL (time to live).
Now those two steps listed earlier have corresponding steps on the target machine:
- Listen for connections
- Receive the data from an active connection
It's simple enough. There is an address for each machine, one listens for connections and the other connects to it using the specified port that's being listened to. After the connection is established, data can be sent from each machine to the other.
Finally, a Socket is what represents the connection between the machines.
It is also recommended that the reader understands what delegates are in C#. In a simplistic definition, a delegate allows to use unknown functions with a specific header declaration for dynamic invocation. Try to run this console application example to illustrate it:
using System;
namespace ConsoleTestLab
{
public delegate void myDelegate();
class Program
{
static void SayHello()
{
System.Console.Write("Hello");
}
static void SayWorld()
{
System.Console.WriteLine("World!");
}
static void Main(string[] args)
{
myDelegate function1 = SayHello;
myDelegate function2 = SayWorld;
function1.Invoke();
System.Console.Write(" ");
function2.Invoke();
}
}
}
See the references for more information on delegates.
Using the Code
The basic idea around this example, as well as with many peer-to-peer applications is a simple concept. The network focuses around the existence of peers that communicate between each other, and there has to be at least one starting peer or server for all the peers to connect to. The path chosen to achieve it here was to have a server, a Socket listener which keeps a list of workgroups.
public class CollaborativeNotesRepositoryServer : ICollaborativeNotesServer
{
...
public Dictionary<string, /> > ClientDetailsGroups;
...
public InitState Initialize()
protected void OnHandleClientConnection(IAsyncResult asyncResult)
protected void OnHandleClientData(IAsyncResult asyncResult)
...
}
Each workgroup represents a peer-to-peer network with its own set of peers present. As the code was developed with this example, it is assumed that there won't be peers in more than a group at a time, though it is not impossible for it to happen with this code architecture.
Since the communication is asynchronous, it is done in several steps and by using callbacks methods to call on the separate threads. First the Socket
is initialized, then there’s the BeginAccept
which takes an AsyncCallback
that will serve to call upon another method to handle the connection in a non-blocking way with a separate thread. Inside that method, the EndAccept
must be called so that the connection is established and data can begin to be received by making use of BeginReceive
, which takes another AsyncCallback
and will trigger the respective method in another thread again, which will call the EndReceive
in its turn. The BeginReceive
and EndReceive
use a byte array as a buffer to store the received data and can be called as many times as needed until the whole message is received. The multiple steps can be seen in the following code:
public InitState Initialize()
{
...
IPHostEntry hostEntry = Dns.GetHostEntry(Dns.GetHostName());
...
IPAddress localAddress = null;
for (int i = 0; i < hostEntry.AddressList.Length; ++i)
{
if (hostEntry.AddressList[i].AddressFamily==AddressFamily.InterNetwork)
localAddress = hostEntry.AddressList[i];
}
...
try
{
if(mListenerSocket==null)
mListenerSocket=new Socket(AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
IPEndPoint localIP = new IPEndPoint(localAddress, mListenPort);
mListenerSocket.Bind(localIP);
mListenerSocket.Listen(50);
mListenerSocket.BeginAccept(new AsyncCallback(OnHandleClientConnection), null);
}
catch ...
...
}
protected void OnHandleClientConnection(IAsyncResult asyncResult)
{
try
{
Socket workerSocket = mListenerSocket.EndAccept(asyncResult);
try
{
TxRxPacket dataStatus = new TxRxPacket(workerSocket);
workerSocket.BeginReceive(dataStatus.mDataBuffer, 0,
dataStatus.mDataBuffer.Length, SocketFlags.None,
new AsyncCallback(OnHandleClientData), dataStatus);
}
catch ...
...
mListenerSocket.BeginAccept(new AsyncCallback(OnHandleClientConnection), null);
}
catch ...
...
}
protected void OnHandleClientData(IAsyncResult asyncResult)
{
try
{
TxRxPacket dataStatus = (TxRxPacket)asyncResult.AsyncState;
int countRx = dataStatus.mCurrentSocket.EndReceive(asyncResult);
IMessage rxMessage = mMessageParser.ParseMessage(dataStatus.mDataBuffer);
switch(rxMessage.Type)
{
case ((int)MessageType.RegisterMessage):
{
RegisterMessage msg=((RegisterMessage)rxMessage);
if (!mClientDetailsGroups.ContainsKey(msg.Group))
mClientDetailsGroups[msg.Group] =
new Collection< ICollaborativeClientDetails>();
if(mClientDetailsGroups[msg.Group].IndexOf(msg.Client)>=0)
mClientDetailsGroups[msg.Group].Remove(msg.Client);
RegisteredClientsListMessage response =
new RegisteredClientsListMessage();
response.Clients = mClientDetailsGroups[msg.Group];
response.Group = msg.Group;
Socket clientSocket = new Socket (AddressFamily.InterNetwork,
SocketType.Stream, ProtocolType.Tcp);
IPAddress remoteMachine = IPAddress.Parse(msg.Client.ClientIPAddress);
IPEndPoint remoteEndpoint = new IPEndPoint
( remoteMachine, msg.Client.ClientListenPort);
clientSocket.Connect(remoteEndpoint);
clientSocket.Send(response.GetMessagePacket());
clientSocket.Close(1);
mClientDetailsGroups[msg.Group].Add(msg.Client);
if (mOnRegisterClient!=null)
mOnRegisterClient.Invoke(this,
new ServerRegisterEventArgs(msg.Client));
break;
}
case ((int)MessageType.UnregisterMessage):
{
UnregisterMessage msg=((UnregisterMessage)rxMessage);
mClientDetailsGroups[msg.Group].Remove(msg.Client);
if (mOnRegisterClient != null)
mOnRegisterClient.Invoke(this,
new ServerRegisterEventArgs(msg.Client));
break;
}
}
}
catch ...
...
}
The peer network construction process begins when the repository server receives a RegisterMessage
. As it can be seen in that last segment of code, the repository replies to the peer with the current peer list of that group with all the other peers on the same workgroup already, and does it by sending a RegisteredClientsListMessage
. After receiving this message with the list of other peers, the client registers itself among all the peers already on the network so that all the other peers become aware of the new peer present on the network.
public void OnHandleClientData(IAsyncResult asyncResult)
{
try
{
TxRxPacket dataStatus = (TxRxPacket)asyncResult.AsyncState;
dataStatus.mCurrentSocket.EndReceive(asyncResult);
dataStatus.StoreCurrentData();
IMessage rxMessage = mMessageParser.ParseMessage(dataStatus.mStoredBuffer);
if (rxMessage == null)
{
dataStatus.mCurrentSocket.BeginReceive(dataStatus.mDataBuffer, 0,
dataStatus.mDataBuffer.Length, SocketFlags.None,
new AsyncCallback(OnHandleClientData), dataStatus);
return;
}
switch (rxMessage.Type)
{
case ((int)MessageType.ResgisteredClientsListMessage):
{
Socket workerSocket = (Socket)dataStatus.mCurrentSocket;
RegisteredClientsListMessage rxClientList =
(RegisteredClientsListMessage)rxMessage;
if (rxClientList.Clients!=null)
{
for (int i = 0; i < rxClientList.Clients.Count; ++i)
{
mGroupClientsDetails.Add(rxClientList.Clients[i]);
ICollaborativeNotesClient client = new CollaborativeNotesClient
(mListenPort, rxClientList.Clients[i].ClientIPAddress,
rxClientList.Clients[i].ClientListenPort, mGroup);
client.Initialize();
client = null;
}
}
break;
}
…
}
}
catch ...
...
}
Thus far, the network starts with the repository server waiting for messages, then the first client shows up and the group and client are registered on the server for the first time, so there are no peers to send back. Then the second peer shows up and there's already the first peer registered on the server, so the server sends the client details for the first one on the list and the second peer registers on the first peer. The third peer will receive the previous two that are registered on the server and does the same registration step on each of them. The process develops in an analogous way, up to any number of peers that join the peer network.
Fig. 1: Activity diagram for the peer registration step.
One weak point to this approach is the gradually increasing message size as the group grows to the order of hundreds or even thousands of peers, where the message will go from something smaller than a kilobyte message to something that can reach a few kilobytes in size. This is one, but not the only reason to have the recursive call to the BeginReceive
method. The main reason to call BeginRecieve
recursively is in fact to allow peers to send messages of any size between each other, as in the example that happens when communicating images (such as BMP, GIF, JPEG). A very dangerous flaw is present in the code because it is quite simple to trigger infinite loops in the peer applications by injecting messages that are never correctly terminated and are interpreted as valid. So always be careful when implementing communication protocols and parsing mechanisms.
The format for the message themselves here is in a tagged XML-like format. The reason for this choice is to simplify the message parsing by using the System.XML
API, namely the XMLDocument
to automatically load the message into a DOM like tree, turning the parsing to a tree navigation and extracting the data from the leaves to fill the fields of the message structure. This can be seen with the following example of the RegisterMessage.Parse
method:
public bool Parse(Byte[] data)
{
String messageContent = Encoding.UTF8.GetString(data);
XmlDocument xmlDoc = new XmlDocument();
try
{
xmlDoc.LoadXml(messageContent);
}
catch ...
{
...
}
MessageType type = MessageType.EmptyMessage;
XmlElement messageElement = xmlDoc.DocumentElement;
if (messageElement.Name == "message")
{
foreach (XmlNode node in messageElement.ChildNodes)
{
if (node.Name == "type")
{
type = (MessageType)Enum.Parse(typeof(MessageType), node.InnerText);
break;
}
}
}
if(type!=MessageType.RegisterMessage)
{
...
return false;
}
this.mGroup = "";
this.mClient = null;
this.mClient = new CollaborativeClientDetails();
foreach (XmlNode node in messageElement.ChildNodes)
{
if (node.Name == "group")
{
this.mGroup = node.InnerText;
}
else if (node.Name == "clientdetails")
{
foreach (XmlNode detailsNode in node.ChildNodes)
{
if(detailsNode.Name=="name")
{
this.mClient.ClientName = detailsNode.InnerText;
}
else if (detailsNode.Name == "ipaddress")
{
this.mClient.ClientIPAddress = detailsNode.InnerText;
}
else if (detailsNode.Name == "listenport")
{
this.mClient.ClientListenPort = int.Parse(detailsNode.InnerText);
}
}
}
}
return true;
}
There are a few message types that were implemented to communicate data between the peer applications, as it was developed here: the TextMessage
, the ImageMessage
and the StrokeMessage
.
The type names are quite self explanatory and the white boards that represent the peers are created exactly to paint them accordingly. The TextMessage
has a text value, a font, a color and a position; as for the ImageMessage
, it has a position, size and the contents data; and finally the StrokeMessage
has essentially a list of points, a stroke width and a color too.
From the peer application's perspective, there are two main objectives it has to comply with. One is to be able to paint the contents of the messages correctly. The other is that the code has to respect the idea that there must be a socket listener and a socket client. The listener on each peer is similar to the repository server in the way that it serves as the fixed point for the others to connect to. Like the repository server, it requires an IPAddress
and port to listen to. As for the socket client, it serves to connect and send data to the other peers.
One other detail visible on the code is that after receiving the RegisteredClientsListMessage
, the peer will also build a local list of peers besides broadcasting its presence to the other peers in the group in order to be included in their peer list and consequently in their message broadcasts.
One other architecture choice was to make connections as atomic as possible, meaning that there'll only be an active and outgoing connection for as long as there’s a message to send. So for each message to be sent or broadcasted, a client connection will be made, data will be sent and the socket will be closed, closing the connection once the data is sent. This is visible in this piece of code:
public void SendMessageAsync(IMessage message)
{
mClientSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream,
ProtocolType.Tcp);
IPAddress remoteMachine = IPAddress.Parse(mServer);
IPEndPoint remoteEndpoint = new IPEndPoint(remoteMachine, mServerPort);
try
{
mClientSocket.BeginConnect
( remoteEndpoint, new AsyncCallback(OnHandleConnection), message );
}
catch (Exception ex)
{
...
return;
}
}
protected void OnHandleConnection(IAsyncResult asyncResult)
{
try
{
IMessage message=(IMessage)asyncResult.AsyncState;
mClientSocket.EndConnect(asyncResult);
Byte[] dataBuffer=message.GetMessagePacket();
TxRxPacket msgPacket=new TxRxPacket(mClientSocket, dataBuffer.Length);
msgPacket.mDataBuffer=dataBuffer;
if (mClientSocket.Connected)
mClientSocket.BeginSend( msgPacket.mDataBuffer, 0,
msgPacket.mDataBuffer.Length, SocketFlags.None,
new AsyncCallback(OnHandleSend), msgPacket );
}
catch (Exception ex)
{
...
return;
}
}
protected void OnHandleSend(IAsyncResult asyncResult)
{
try
{
mClientSocket.EndSend(asyncResult);
mClientSocket.Close();
mClientSocket = null;
}
catch (Exception ex)
{
...
return;
}
}
Summarizing so far, only the most relevant chunks of code have been visible. One was the Parse
method from a message implementation. Another was the socket client and listener implementations, the connection creation and data sending code; the connection listening and respective data retrieval from the socket. Even a code snippet where it shows the socket is bidirectional (the example where the peer registers on the repository server and the server replies with the client list using the same socket).
What’s really left to see about the code is all the peer functionality put together. This is the CollaborativeNotesClass
which implements the ICollaborativeNotes
interface. This object exposes the list of inbound messages received since it was instantiated and the message parser engine in order to allow easy access to add custom messages to the communication protocol, and consequently integrate them into the whole peer communication. It also has fields that represent the details about the repository server, its peer network group name and its listener port. It is in this object that the whole peer logic is aggregated, reducing the peer to only have to keep an instance of this object, and add an event handler in order to be able to stay aware of the arrival of messages in real time on the listener, instead of having some timer peeking on the incoming message list and checking for new ones.
The interface is as follows:
public interface ICollaborativeNotes
{
int ListenPort
{
get;
set;
}
int ServerListenPort
{
get;
set;
}
String Server
{
get;
set;
}
String Group
{
get;
set;
}
Collection<imessage /> InboundMessages
{
get;
}
IMessageParserEngine MessageParser
{
get;
}
event OnReceiveMessageDelegate OnReceiveMessage;
void BroadcastMessage(IMessage message);
void SendMessage(IMessage message, ICollaborativeClientDetails details);
void SendMessage(IMessage message, String name);
void BroadcastMessageAsync(IMessage message);
void SendMessageAsync(IMessage message, ICollaborativeClientDetails details);
void SendMessageAsync(IMessage message, String name);
void Close();
}
The BroadcastMessage
and SendMessage
methods visible here represent synchronous message sending. On another detail, the implementation for the broadcast methods done here is achieved by simply repeating the SendMessage
method once for each peer on the local list. The implementation itself has a few more members though only these are enough to achieve the desired aggregation of functionality.
A final detail about the interface is the Close
method. This method performs the last task a peer must perform when closing in order to keep a consistent peer network. This final task is to “unregister” its presence among both the repository server and all the other peer applications. This small step will make sure there won't be wasted threads and connection attempts to the peer leaving the workgroup/peer network.
As final remarks about the sample applications supplied for download, the workgroup is hardcoded in the sample peer application, though the information about the repository server is collected from the app.config file. The same applies for the repository server application here designed where the app.config file holds the port for the server to listen to. Finally there’s also an available sample service coded that is meant to be used as a repository server, though it requires a special installation to get it working on the system.
Conclusion
This article presents an approach on how to use asynchronous sockets and also a couple of concepts on how to set up a peer-to-peer network. As the learning example that it represents, it has inherent faults left on purpose so that the reader who cares to develop with this code as base will try to learn more about the techniques and the technology enough to at least fix them.
The article also presents some architecture concepts that make the code more modular and easier to develop. For example, the existence of message types, or also forcing the basic IMessage
interface to have the Parse
and Clone
methods, or to fit the message instances in a parser engine mechanism, and finally place it all working with socket clients and listeners. All together this achieves the real time interaction achieved with the peer applications developed for the article, the “SampleWhiteBoard” application.
Points of Interest
Safe threading for user interface updates is something which is very difficult to achieve or even keep track of with this kind of multi-threaded environment. This makes it quite hard to update or synchronize the user interface from within child threads such as the threads generated for asynchronous communication. As such, delegates and events are used to make it as doable as possible to achieve the desired result.
There are other networking objects that can be used, such as the TCPListener
and TCPClient
on the .NET Framework 2.0, and with the arrival of .NET Framework 3.5, more objects are sure to appear with a similar ability to communicate across a network such as will be with pipes.
References
History
- December 9th, 2007 - Initial version