Networking, Threading, Event Integration Tutorial with USDTP






4.50/5 (9 votes)
This article is a lengthy ground-up tutorial on Application Design, Networking, Threading, and Event Integration all wrapped into one.
Introduction
First and foremost, the intention of this article is as a Tutorial. My intention is for this to become a loose standard or guideline for how networking interfaces should behave. (I also personally believe that USDTP is a useful protocol in its own right, however.)
What I am trying to say is that you please not just copy the code and replace every instance of "USDTP" with "MSCCP" (My Super Cool Copied Protocol) or something to that effect. This is designed to teach you how to build an application like this, not to just Copy/Paste large blocks of code. However, I would be greatly honored if you were to use this code "as is"; changing the protocol would really defeat the point of me creating the protocol in the first place.
I am, of course, entirely open to suggestions and the like.
Now, on to business...
Background
In this tutorial, we will be building a rather-complicated networking protocol and implementation based on TCP/IP in C#. This assumes at least a functioning knowledge of C# and .NET, but really nothing more than syntax and common sense. If you find yourself pouring over long blocks of stuff you already know, this is intentional: I'm trying to cover everything, so feel free to skip ahead.
The goal of this article is to create a protocol which will be named USDTP (Universal Segmented Data Transfer Protocol). In essence, this protocol will allow the transmission of delineated blocks of data with a header.
Here is a good way to think about it:
[Header][[Segment][...][...]]
The actual message will appear as:
USDTP[Header][TotalSegments][[SegmentLength][SegmentData]][[...][...]][...]
And the connection headers will be: (ignore the bracketed items)
.USDTP <?> [Request]
.USDTP <-> [Accepted]
.USDTP <X> [Refused]
- "USDTP" (the Message Prefix) and ".USDTP ???" (the Connection Headers) will be encoded in UTF8.
- [Header] will be one byte long.
- [TotalSegments] will be a signed long (8 bytes).
- [SegmentLength] will be a signed long (8 bytes).
- [SegmentData] will be exactly [SegmentLength] bytes in length.
Getting Started
Okay, let's get started by creating the USDTPMessage
class. This class will form the basis of the entire protocol.
using System;
namespace USDTP
{
public class USDTPMessage
{
public byte[][] Payload;
public byte Header;
public USDTPMessage(byte header, byte[][] payload)
{
Payload = payload;
Header = header;
}
}
}
As you can see, this class is very simple minded, but its power lies in the fact that it is so simple.
Header
is the header byte.Payload
is the segments. (See what terrible things arrays can do to grammar.)
Each inner array (i.e. Payload[0], Payload[1], etc.) is an individual segment containing many bytes (i.e. Payload[0][0], Payload[0][1], Payload[1][0], etc...). These segments can, of course, contain different number of bytes.
The Packer
Now a simple question remains: How to get this class, USDTPMessage
, into an array of bytes plus a header (i.e. Header
+ Payload
) suitable for network transfer.
If you recall, earlier we defined the protocol, now we must make the ever important leap from telling intuitive and intelligent humans how to reach this point to telling mindless, heartless, and soulless computers how to do the same.
Philosophy aside, here is how we are to do this:
using System;
using System.Text;
using System.Collections.Generic;
using USDTP;
namespace USDTP.Packing
{
public static class USDTPPacker
{
public static byte[] FullPack(USDTPMessage message)
{
//Temporary List storing the packed data
List<byte> PackedData = new List<byte>();
//Add the standard header
PackedData.AddRange(Encoding.UTF8.GetBytes("USDTP"));
//Add the message header
PackedData.Add(message.Header);
//Add the total number of payload segments
PackedData.AddRange(BitConverter.GetBytes(message.Payload.LongLength));
//Add the payload segments
foreach (byte[] ba in message.Payload)
{
//Add the segment's length
PackedData.AddRange(BitConverter.GetBytes(ba.LongLength));
//Add the segment's data
PackedData.AddRange(ba);
}
//Return the now-packed message.
return PackedData.ToArray();
}
}
}
Just in case that wasn't clear, I'll walk you through it.
We begin with the "required-for-everything" namespace, System
, plus the following namespaces:
System.Text
[Encoding
]System.Collections.Generic
[List<T>
]USDTP
[USDTPMessage
]
Next we define the current namespace USDTP.Packing
, which is exactly the same as putting:
namespace USDTP
{
namespace Packing
{
//...
}
}
Following that, we define the static
class USDTPPacker
, which has only one method: FullPack
which is also static
, returns an array of byte
s, and takes one USDTPMessage
as its argument. The intention of this method is to take the USDTPMessage message
and output an array of byte
s which may be later sent across the network.
We begin by declaring a generic List
of byte
s which will contain the data as it is being packed. Following this, the customary "USDTP" prefix is added to the list in UTF8
format. Next, we add the Header
of the message
and the total number of segments (as a long
), using System.BitConverter
to convert the long
to the equivalent array of byte
s. Then, for each segment, we add its length using the same method as above; and, finally, add the actual data. After this, it is a simple matter of converting PackedData
(a list
) to the required return type (an array of byte
s) using ToArray()
.
- For more information on
List
s, visit MSDN's article onList<T>
.
The UnPacker
Now that we have a way to pack (encode) the data, we need a way to unpack (decode) the data after it has been received. The problem is, not all data will be received at one time, or may be sent in different TCP packets and arrive at different times. TCP handles the order of the packets for us, but the issue remains that there are noticeable gaps in delivery time. To solve this, we are going to build a non-static
class that will be capable of unpacking the message in chunks, regardless of how many bytes are available at the given time. It should also be able to process multiple messages in the same set of data (i.e. if more than one message is received at a time), or even partial messages.
Because bytes are received from the network in blocks (more on that later), it is necessary to be able to process these blocks on the fly, without having to wait for more data. To do this, we will need a stage counter to remember where we are in the process and be able to cache bytes until we are ready to use them. To do this, the variable lBytesTillNextAction
will contain the number of bytes left till we have enough data stored in a buffer (named TmpBuffer
) to act on it.
The process for receiving a message is the natural result of the order in which the bytes are received:
[First] Prefix Header Segement1 Segemnet2 ... [Last]
Meaning "USDTP" should be the first thing we receive, then the header, and finally the segments.
Here is the full code to do what we have envisioned, plus a little more.
using System;
using System.Text;
using System.Collections.Generic;
using System.Threading;
using USDTP;
namespace USDTP.Packing
{
public class USDTPUnPacker
{
// The messages which have been unpacked by this UnPacker but not yet processed.
public Queue<USDTPMessage> Messages = new Queue<USDTPMessage>();
// Auto reset event which fires when a message has been fully unpacked.
public readonly AutoResetEvent reWait = new AutoResetEvent(false);
//Temporary buffer of received bytes waiting to be processed.
private List<byte> TmpBuffer = new List<byte>();
//The temporary payload of the current message.
private List<byte[]> TmpPayload = new List<byte[]>();
private byte TmpHeader = 0; //Header of the current message.
private bool bHasMessage = false; //Is a message currently being processed.
private long lSegsExp = 0; //How many segments we are expecting.
//How many more bytes we must receive before an action is taken on them.
private long lBytesTillNextAction = 0;
private int iStage = 0; //The current state of unpacking.
//Semaphore controlling access to unpacking functions.
private Semaphore semLock = new Semaphore(1, 1);
public USDTPUnPacker() { }
// Resets the UnPacker in case of an error or on completion of a message.
private void Reset()
{
bHasMessage = false;
lBytesTillNextAction = 0;
iStage = 0;
TmpBuffer.Clear();
TmpPayload.Clear();
TmpHeader = 0;
lSegsExp = 0;
GC.Collect();
}
//Processes the temporary buffer when it is full enough to be processed.
private bool ProcessTemp()
{
//we have the supposed USDTP header
if (iStage == 1)
{
//decode it
String proto = Encoding.UTF8.GetString(TmpBuffer.ToArray());
TmpBuffer.Clear();
//if we got garbage
if (proto != "USDTP")
//error (and reset)
throw new Exception("Invalid USDTP message header received!");
lBytesTillNextAction = 1;
iStage = 2;
}
//we have the Message header
else if (iStage == 2)
{
//store it
TmpHeader = TmpBuffer[0];
TmpBuffer.Clear();
iStage = 3;
lBytesTillNextAction = 8;
}
//we have the number of segments
else if (iStage == 3)
{
//get it
lSegsExp = BitConverter.ToInt64(TmpBuffer.ToArray(), 0);
TmpBuffer.Clear();
//if no segments are expected
if (lSegsExp == 0)
//we're done
return true;
iStage = 4;
lBytesTillNextAction = 8;
}
//we are expecting a segment's length
else if (iStage == 4)
{
//we now need one less segment
lSegsExp--;
//and wait until we have received the full body before we continue
lBytesTillNextAction = BitConverter.ToInt64(TmpBuffer.ToArray(), 0);
TmpBuffer.Clear();
iStage = 5;
}
//we have the body
else if (iStage == 5)
{
//store it
TmpPayload.Add(TmpBuffer.ToArray());
TmpBuffer.Clear();
//if we don't need any more segments
if (lSegsExp == 0)
//we're done
return true;
lBytesTillNextAction = 8;
iStage = 4;
}
return false;
}
private int SortBytes(byte[] Data)
{
bool hasmore = true;
int msgs = 0;
int offset = 0;
while (hasmore)
{
//if we don't currently have a message
if (!bHasMessage)
{
//we do now
bHasMessage = true;
//and we need the "USDTP" prefix
lBytesTillNextAction = 5;
iStage = 1;
}
//we need either the number of bytes till the next action
//or all the data, whichever is least.
long maxget = Math.Min(lBytesTillNextAction, Data.Length);
hasmore = (offset + maxget < Data.Length);
//copy the data we need into dat
byte[] dat = new byte[maxget];
Buffer.BlockCopy(Data, offset, dat, 0, (int)maxget);
TmpBuffer.AddRange(dat);
offset += (int)maxget;
//subtract the bytes we got
lBytesTillNextAction -= maxget;
try
{
//sort the bytes, if ready, and if it completed the message
if (lBytesTillNextAction == 0 && ProcessTemp())
{
//get the payload
byte[][] payload = new byte[TmpPayload.Count][];
for (int i = 0; i < TmpPayload.Count; i++)
payload[i] = TmpPayload[i];
//(and header) and enqueue the message
Messages.Enqueue(new USDTPMessage(TmpHeader, payload));
//reset because we finished
Reset();
msgs++;
}
}
//an error occurred
catch (Exception ex)
{
//so reset
Reset();
//and re-throw it, discarding all the bytes.
throw ex;
}
}
return msgs;
}
private void RecieveBytes(byte[] Data, out int iFinished, ref Exception Ex)
{
iFinished = 0;
//acquire the lock
semLock.WaitOne();
try
{
//process the bytes
iFinished = SortBytes(Data);
//if we finished any messages
if (iFinished > 0)
//fire the Auto Reset Event
reWait.Set();
}
catch (Exception ex)
{
//set the exception
Ex = ex;
}
finally
{
//release the semaphore
semLock.Release();
}
}
// Tells the unpacker that bytes have been received.
public void RecieveBytes(byte[] Data, bool Quiet, out int Finished)
{
Exception ex = null;
RecieveBytes(Data, out Finished, ref ex);
if (ex != null && !Quiet) throw ex;
}
// Tells the unpacker that bytes have been received.
public int RecieveBytes(byte[] Data, bool Quiet)
{
int Fin = 0;
RecieveBytes(Data, Quiet, out Fin);
return Fin;
}
// Tells the unpacker that bytes have been received.
public int RecieveBytes(byte[] Data)
{
return RecieveBytes(Data, false);
}
}
}
We begin with the "required-for-everything" namespace, System
, plus the following namespaces:
System.Text
[Encoding
]System.Collections.Generic
[List<T>
,Queue<T>
]System.Threading
[AutoResetEvent
,Semaphore
]USDTP
[USDTPMessage
]
From the previous examples, you should be already clear on class, variable, and method declarations, so we will get right into the meat.
Properties (Worth Explaining)
- [
public
]Messages
(Queue<T>
ofbyte
s): Holds all messages that have been UnPacked. This is stored on a "first-in, first-out" basis, so it preserves the order in which messages are received. After a message has been UnPacked it isEnqueue()
d into this queue. When the user is ready to consume (retrieve/use) the message,Dequeue()
is called.
For more information onQueue
s, visit MSDN's article onQueue<T>
. - [
public readonly
]reWait
(AutoResetEvent
): ThisEventWaitHandle
is set when a message has been received (byRecieveBytes()
) and is used later inUSDTP.USDTPSocket.WaitMessage()
. It is readonly because we want it to bepublic
, but never to be changed by the public.
For more information onAutoResetEvent
s andEventWaitHandle
s, visit MSDN's article onAutoResetEvent
. - [
private
]SemLock
(Semaphore
): ThisSemaphore
controls access to the processing methods. Because data is received asynchronously, it is possible (without thesemaphore
) for two instances of the processing functions to be running at one time, which would be disastrous. (I learned this the hard way.)
For more information onSemaphore
s, visit MSDN's article onSemaphore
.
Methods
- [
private
]void
Reset
(): Resets properties so that another message can be processed.
- [
private
]bool
ProcessTemp
(): This method takes all the bytes stored into
TmpBuffer
bySortBytes()
and stores them in their proper location to be later built into the message. It works in a staged manner so that it can process individual blocks on the fly.
- The 5 bytes of the "USDTP" expected prefix have been received. Check to make sure that the bytes were indeed received properly and, if they were not, throw an exception to reset. Otherwise, get ready to receive the 1 byte message header and set the stage to 2.
- The 1 byte message header has been received, so store it in
TmpHeader
and prepare to receive the 8 bytes that constitute the number of expected segments. Set the stage to 3. - The 8 byte signed
long
containing the number of segments to expect has been received. Convert these bytes into thelong
and store the new number inlSegsExp
. If more segments are expected, prepare to receive the 8 byte segment lengthlong
and set the stage to 4. Otherwise, returntrue
because we have completed the message. - The 8 byte signed
long
containing the size of the current segment has been received. Subtract one from the number of segments we still need, even though this segment is still not complete. Set the number of expected bytes until the next action (lBytesTillNextAction
) to the value of the receivedlong
and set the stage to 5. - The remainder of the segment has been received, so store it in
TmpPayload
. If no more segments remain, returntrue
because we have finished the message. Otherwise, prepare to receive the 8 byte segment lengthlong
and set the stage to 4 because we will still have more segments to process.
- [
private
]int
SortBytes
(byte[]
): This method takes an array ofbyte
s and processes it one chunk at a time, adding the necessary bytes intoTmpBuffer
and controlling whenProcessTmp()
is called. It is capable of handling multiple messages at one time and returns the number of messages it finished unpacking with the provided chunk of data.
This is one of, if not the, most complicated method in the entire project so I will break it down line-by-line below. - [
private
]void
RecieveBytes
(byte[]
, outint
, refException
) {Overload 1}: This method wrapsSortBytes()
in shell that performs a number of auxiliary functions. This includes acquiring and, afterwards, releasing theSemaphore
(SemLock
), trapping exceptions, setting theAutoResetEvent
(reWait
) and "out
-ing" the number of completed messages.
Be sure to always acquire aSemaphore
before atry
block and release it in thattry
block'sfinally
clause so that it is always freed, no matter what happens before it is released, be it a success or failure. I cannot stress enough how important this is. - [
public
] {varies}ReceveBytes
({varies}) {Overloads 2-4}: These arepublic
overloads ofRecieveBytes
that are called byUSDTP.USDTPSocket
s to inform theUnPacker
bytes have been received by the network. These simply pass control to the aboveprivate
overload. Many overloads are used to save coding time.
I hope that wasn't too confusing!
Breakdown of SortBytes
Lines are counted starting from the opening brace of the method, ignoring all other braces, comments, and blank lines.
Line 1: bool hasmore
declared and set to true
. This variable is true
if there is more data remaining in Data
to be processed.
Line 2: int msgs
declared and set to 0
. This variable holds the number of messages this call has completed and is returned at the end.
Line 3: int offset
declared and set to 0
. This variable is the offset into Data
at which coping should begin.
Line 4: Loop while we still have data in Data
to process...
Line 5: If we don't currently have a message being processed...
Line 6: Set bHasMessage
to true
because we now have a message.
Line 7: We now need the "USDTP" prefix for the message, so that means we need five bytes...
Line 8: and the stage should be 1
.
Line 9: long maxget
is set to either the amount of data we need to begin action or all the data we can retrieve at this time, whichever is least.
Line 10: hasmore
is set to true
if data still remains to be processed (offset
+ maxget
is less than the total number of bytes in Data
).
Line 11: byte
array dat
is declared and its size is set to maxget
. This will hold the data we want to copy into TmpBuffer
.
Line 12: Buffer.BlockCopy
is used to copy the desired contents of Data
(offset
through offset
+ maxget
) into dat
.
Line 13: The desired data is added to TmpBuffer
from dat
.
Line 14: offset
is increased by maxget
because the data that maxget
represents has already been read.
Line 15: lBytesTillNextAction
id decreased by maxget
because the maxget
amount of data has been added to TmpBuffer
.
Line 16: try
block is entered to catch errors occurring while processing TmpBuffer
.
Line 17: If we are ready to process TmpBuffer
(lBytesTillNextAction
is 0), call ProcessTmp()
and if that returns true
...
(Note: Due to the way &&
is resolved in C#, ProcessTmp()
will only run if lBytesTillNextAction
is 0
).
Line 18: payload
is declared as a number-of-arrays-of-byte
s-in-TmpPayload
-sized array of arrays of bytes
. (That was a mouthful!)
Lines 19-20: The contents of TmpPayload
are copied into payload
using a for
loop.
Line 21: The USDTPMessage
is created from its parts (TmpHeader
and payload
) and Enqueue()
d into Message
.
Line 22: We call Reset()
because we have completed a message and this instance now needs to be reset to its initial state.
Line 23: We increment the number of messages we have completed (msgs
).
Line 24: An error has been caught...
Line 25: So call Reset
because this instance needs to be reset.
Line 26: And re-throw
the exception, in the hope that it may later be caught.
Line 27: Return the number of messages we unpacked (msgs
).
And that's all there is to it, 27 functioning lines.
The Event Handler
Now that we have the Packer and UnPacker, it would seem it is time to build the actual networking implementation, however because we desire an event-driven architecture we must first design the handler object for these events. This object should have two public delegate
s, one to handle all Socket
events and the other for all Listener
events. In case you aren't sure what exactly those are, it shall be my great pleasure to explain.
Sockets
A Socket
is basically an object on a computer that can connect to Listener
s on the same or different machines. (Technically, this is only true for TCP Socket
s, but that is what we will be using.) For this example, we assume they will be able to both send and receive byte
s.
Message_Recieved
: Occurs when a message has been received from the host of the socket.Connection_Lost
: Occurs when the connection to the host has been lost.Creation_Complete
: Occurs then an asynchronous creation attempt is completed. (More on this later.)
Listeners
A Listener
is a type of Socket
, however it "listens" for incoming connections from other Sockets
and, if it decides to accept them, it creates new Socket
objects to handle the new connections. (Notice the plural forms; Listener
s are able to do this repeatedly.)
Pending_Request_Accepted
: Occurs when a connection has been accepted, processed, and is ready to be used by the user.
With that covered, we can now build the handler:
using System;
namespace USDTP
{
// Core EventHandler object for use with USDTP objects.
// This allows all event-related code to be centered in one object,
// rather than many different specialized delegates.
public class USDTPEventHandler
{
// Describes the type of socket event.
public enum USDTPSocketEventType
{
// A message has been received by the Socket.
Message_Recieved,
// Connection to the remote host was lost.
Connection_Lost,
// The creation of a socket was completed.
// This is only ever used as the result of an USDTPEndSocket.AsyncCreate();
// call and is not even called from within a handler.
Creation_Complete
}
// Describes the type of listener event.
public enum USDTPListenerEventType
{
// A new connection has been accepted and is ready for processing.
Pending_Request_Accepted
}
public delegate void dUSDTPSocketEvent
(USDTPSocket socket, USDTPSocketEventType eventtype, Object EventInfo);
public delegate void dUSDTPListenerEvent
(USDTPListener listener, USDTPListenerEventType eventtype, Object EventInfo);
private event dUSDTPSocketEvent USDTPSocketEvent;
private event dUSDTPListenerEvent USDTPListenerEvent;
private bool bPreformSocketCall = false;
private bool bPreformListenerCall = false;
public USDTPEventHandler() { }
// Registers a Socket-Event-Handling method.
public void RegisterSocketEventHandler(dUSDTPSocketEvent handler)
{
USDTPSocketEvent += handler;
bPreformSocketCall = true;
}
// Registers a Listener-Event-Handling method.
public void RegisterListenerEventHandler(dUSDTPListenerEvent handler)
{
USDTPListenerEvent += handler;
bPreformListenerCall = true;
}
public void UnRegisterSocketEventHandler(dUSDTPSocketEvent handler)
{
USDTPSocketEvent -= handler;
}
public void UnRegisterListenerEventHandler(dUSDTPListenerEvent handler)
{
USDTPListenerEvent -= handler;
}
// Internal method which calls a socket event.
internal void CallSocketEvent
(USDTPSocket socket, USDTPSocketEventType eventtype, Object EventInfo)
{
if (bPreformSocketCall && USDTPSocketEvent.GetInvocationList().Length > 0)
USDTPSocketEvent(socket, eventtype, EventInfo);
}
// Internal method which calls a listener event.
internal void CallListenerEvent
(USDTPListener listener, USDTPListenerEventType eventtype, Object EventInfo)
{
if (bPreformListenerCall &&
USDTPListenerEvent.GetInvocationList().Length > 0)
USDTPListenerEvent(listener, eventtype, EventInfo);
}
}
}
This handler essentially allows the registration of delegate
s without worrying if a delegate
is actually registered when a call to the event
is made. It also prevents the use of individually specialized delegate
s being used more than once, which would create a true abomination of code. By consolidating all events into one global handler, it allows maximum code-reuse and supreme conditional runtime flexibility without the complication of raw event registration. Due to the simplistic nature of this class, I'm going to point you toward this CSharpHelp article on event
s and delegate
s.
However, there are several things worth noting. When an event
is called when no delegate
s have been assigned to it, an exception is thrown. This is resolved by setting [bPreform
*Call
] to true
if a delegate
has been registered for that event
. The event
will only ever be called if this bool
is true
. However, we come to the point that, if the last delegate
is un-registered, that an exception will once again be thrown. However the event
is definitely now defined, so we can check if it has any delegate
s in its InvocationList
(the delegate
s it will call when fired) and, if it does, we fire the event
.
The Socket
We have reached the point in our development process where networking is now necessary: all that remains is the Socket and the Listener. The socket implementation is just under 400 lines long, so I'm going to break it down into separate parts.
Namespaces and Class Definition
using System;
using System.Text;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace USDTP
{
using Packing;
public class USDTPSocket
{
You should be used to this by now.
System.Text
[Encoding
]System.Net
[IPAddress
,EndPoint
,IPEndPoint
]System.Net.Sockets
[TcpClient
,NetworkSream
]System.Threading
[Thread
,ThreadPriority
,ThreadStart
]USDTP
[USDTPMessage
]- (
USDTP.
)Packing
[USDTPPacker
,USDTPUnPacker
]
USDTPSocket
will be our socket class.
Properties (Standard)
private TcpClient tcConnection; //Underling TCP connection
private NetworkStream nsNetStream; //NetworkStream for tcConnection
private USDTPUnPacker upUnPacker; //UnPacker Object
private Thread tWorker; //Thread on which the manager runs
public readonly bool CanSend; //Can this instance Send?
public readonly bool CanRecieve; //Can this instance Receive?
private USDTPEventHandler ehHandler; //Global Event Handler
private bool bUseHandler = false; //True if the Event Handler (ehHandler)
//should be used.
Nothing too special here, except for tcConnection
and nsNetStream
.
tcConnection
is a TcpClient
that, when this socket is created, will be connected to the remote host. This provides the underlying TCP functions so that we don't have to worry about them. Yet another reason to prove that Microsoft loves us. TcpClients
provide us with a class that literally will save us hours of work, NetworkStream
. These classes definitely deserve a through explanation, which MSDN has already prepared just for us. If you do not already know how TcpClient
s and NetworkStream
s function, I assume you have read this article. tWorker
is a Thread
that will usually run a method that gets received data from the network. If you are unclear about either the static
or instance members of Thread
, it is critical you visit MSDN's article on Thread
.
Properties (Get-Only)
// Gets whether this socket is connected to a Remote Host.
// Exposes (TcpClient)tcConnection.Connected
public bool Connected
{
get
{
return tcConnection.Connected;
}
}
// The IPEndPoint of the Remote Host this socket is connected to.
// Exposes (TcpClient)tcConnection.RemoteEndPoint
public IPEndPoint RemoteEndPoint
{
get
{
return (IPEndPoint)tcConnection.Client.RemoteEndPoint;
}
}
// The total number of messages that have been received but not yet processed.
public int Messages
{
get
{
if (!CanRecieve) return -1;
return upUnPacker.Messages.Count;
}
}
This is, once again, fairly straightforward. The first two properties simply expose important, but otherwise private
, properties of the connection. Messages
allows the retrieval of the number of messages Enqueue()
d in the USDTPUnPacker upUnPacker
. It returns -1
if receiving is not supported.
Methods
- [
public
]USDTPMessage
GetMessage()
:// Immediately retrieves a message from the queue of received messages. // Returns the earliest message received that has not been processed, // null if no message has been received. // Throws a System.InvalidOperationException if Receiving is not supported // by this instance. public USDTPMessage GetMessage() { if (!CanRecieve) throw new InvalidOperationException ("receiving is not supported by this instance!"); try { return upUnPacker.Messages.Dequeue(); } catch (InvalidOperationException) { return null; } }
Immediately retrieves a message from the
Queue
inupUnPacker
. This class throws anInvalidOperationException
if receiving is not supported. Note also thatQueue
throws anInvalidOperationExeption
if no item exists to beDequeue()
d. This exception is caught and the method returnsnull
if this occurs. - [
public
]USDTPMessage
WaitMessage()
:// Blocks the current thread till a message has been received and // returns that message // or returns the oldest message received that has not yet been processed. // Returns the oldest message received that has not yet been processed. // Throws a System.InvalidOperationException if Receiving is not supported // by this instance. public USDTPMessage WaitMessage() { if (!CanRecieve) throw new InvalidOperationException ("receiving is not supported by this instance!"); upUnPacker.reWait.WaitOne(); return GetMessage(); }
All this method does is wait for the
AutoResetEvent reWait
in theUSDTPUnPacker upUnPack
to become set when a message is received. The thread blocks until this occurs. After the event is signalled, it callsGetMessage()
to get the message that has been received. - [
public
]void
SendMessage(USDTPMessage)
:// Immediately sends a message to the Remote Host. // Throws a System.InvalidOperationException if Sending is // not supported by this instance. public void SendMessage(USDTPMessage message) { if (!CanSend) throw new InvalidOperationException ("Sending is not supported by this instance!"); byte[] buffer = USDTPPacker.FullPack(message); nsNetStream.Write(buffer, 0, buffer.Length); }
This is fairly straightforward.
Line 1: Throw anInvalidOperationException
if sending is not supported.
Line 2: Fill an array of bytes with the packed message generated withUSDTP.Packing.USDTPPacker.FullPack()
.
Line 3: Send the packed data across the network using theNetworkStream nsNetStream
. - [
public
]void
Disconnect()
:// Disconnects from the Remote Host. // If this instance is already disconnected, no action is taken. public void Disconnect() { if (!Connected) return; try { nsNetStream.Close(5); tcConnection.Close(); tcConnection.Client.Close(5); } catch {} }
All this does here is close all the open network interfaces. Be sure they are closed in this order.
- [
private
]void
Manage()
:// Internal method run on tWorker that manages the flow of incoming packets, // routing them to (USDTPUnPacker)upUnPacker. private void Manage() { while (Connected) { try { //While tcConnection has unprocessed bytes... while (tcConnection.Available > 0) { //Buffer into which to read data. byte[] dataRead = new byte[tcConnection.Available]; //Read bytes into [dataRead] and store the total count //is iTotalBytes int iTotalBytes = nsNetStream.Read(dataRead, 0, tcConnection.Available); //If no bytes were read (just in case). if (iTotalBytes == 0) { //Sleep and start over again. Thread.Sleep(1); continue; } //Process the bytes... int finished = upUnPacker.RecieveBytes(dataRead, true); //if a message was completed and the handler should be called... if (finished > 0 && bUseHandler) //Call the appropriate event on the handler for each message. for(int i = 0; i < finished; i++) ehHandler.CallSocketEvent(this, USDTPEventHandler.USDTPSocketEventType.Message_Recieved, null); //Sleep so we don't consume too much CPU Time. Thread.Sleep(0); } } catch (ObjectDisposedException) { break; } catch { } finally { //Sleep so we don't consume too much CPU Time. Thread.Sleep(10); } } //If we reach this point the connection has somehow been lost. //If the handler should be used... if (bUseHandler) //Call the appropriate handler event for a lost connection. ehHandler.CallSocketEvent(this, USDTPEventHandler.USDTPSocketEventType.Connection_Lost, null); }
This method is run constantly on
Thread tWorker
and reads data in from the network and passes it on to theUSDTPUnPacker upUnPacker
. I believe the comments in the above code are sufficient to explain this method. - [
internal
] {constructor}USDTPSocket(TcpClient,bool,bool)
:// Internal USDTPSocket Constructor internal USDTPSocket(TcpClient conn, bool send, bool recieve) { tcConnection = conn; nsNetStream = tcConnection.GetStream(); CanSend = send; CanRecieve = recieve; if (CanRecieve) { upUnPacker = new USDTPUnPacker(); tWorker = new Thread(Manage); tWorker.Priority = ThreadPriority.Normal; tWorker.Start(); } }
This is the only constructor for this method. It is simple, but allows this class to be used both on the server and the client. The server internally creates one of these for each connection and provides it for later use. The client uses the static
Create
*()
methods to create an instance and connect to the server. It also starts theManage()
method running ontWorker
if it actually needs to be run (i.e. receiving is supported). - [
public
]void
RegisterHandler(USDTPEventHandler)
:// Registers an USDTPEventHandler for use as the primary event handler. // Throws a System.InvalidOperationException if another handler has // already been registered. // Avoid this by calling (USDTPSocket)instance.UnRegisterHandler() beforehand. public void RegisterHandler(USDTPEventHandler handler) { if (bUseHandler) throw new InvalidOperationException ("An USDTPEventHandler has already been registered!"); ehHandler = handler; bUseHandler = true; }
- [
public
]void
UnRegisterHandler()
:// Unregisters the current USDTPEventHandler. // This has no effect if a handler has not yet been registered. public void UnRegisterHandler() { bUseHandler = false; }
Enums
- [
public
]CreationResult
(byte
):// The result of the creation of a USDTPSocket. public enum CreationResult : byte { // The creation resulted in a viable socket capable of both // sending and receiving. SendRecieve, // The host was USDTP-compatable, but actively refused the request. Refused, // The host was invalid // (i.e. not accepting connections or not USDTP-compatable). Invalid }
- [
public
]CreationType
(byte
):// Represents the desired function of a newly created USDTPSocket. public enum CreationType : byte { // The socket should be able to both send and receive. SendRecieve }
Internal Classes
- [
public
]AsyncCreationInfo
:// The result of an Asynchronous creation attempt passed along as an Object // to the callback. public class AsyncCreationInfo { public USDTPSocket usSocket; public CreationResult Result; }
This class is passed along as
Object EventInfo
to the callbackdelegate
of an async creation attempt.
Static Methods - "Constructors"
- [
public
static
]USDTPSocket
Create(IPEndPoint, CreationType,
outCreationResult)
:// Creates a USDTPSocket and connects it to the specified Remote Host. // Host: The remote USDTP-compatable host to which connection should be attempted. // Use: The desired purpose of the new socket. // Result (Out): The result of the creation process. // Returns the new (connected) USDTPSocket, or null if connection failed. public static USDTPSocket Create(IPEndPoint Host, CreationType Use, out CreationResult Result) { //Create the underlying connection... TcpClient tcCli = new TcpClient(); //and connect to the host. tcCli.Connect(Host); NetworkStream nsNetStream = tcCli.GetStream(); //Write the default USDTP header (UTF8 Format) nsNetStream.Write(Encoding.UTF8.GetBytes(".USDTP "), 0, 7); //Write the proper request type for the desired use. if (Use == CreationType.SendRecieve) //Send/Receive nsNetStream.Write(Encoding.UTF8.GetBytes("<?>"), 0, 3); //Read the response from the host... int iCntr = 0; byte[] dataHeader = new byte[10]; while (iCntr < 10) { int iVal = nsNetStream.ReadByte(); if (iVal == -1) { Thread.Sleep(5); continue; } dataHeader[iCntr] = (byte)iVal; iCntr++; } //decode it... String sResponse = Encoding.UTF8.GetString(dataHeader); //and check if it is valid. if (sResponse == ".USDTP <->") { //ITS VALID! Result = CreationResult.SendRecieve; //So return the new (connected) USDTPSocket - //which can both send and receive. return new USDTPSocket(tcCli, true, true); } else if (sResponse == ".USDTP <X>") { //IT WAS REFUSED! Result = CreationResult.Refused; try { //So disconnect! nsNetStream.Close(); tcCli.Close(); tcCli.Client.Close(); } catch { } //And say we got garbage. return null; } else { //WE GOT A WIERD RESPONSE! Result = CreationResult.Invalid; try { //So disconnect! nsNetStream.Close(); tcCli.Close(); tcCli.Client.Close(); } catch { } //And say we got garbage. return null; } }
The only portion we need to cover is the initial reading of the 10-byte response.
//Read the response from the host... int iCntr = 0; byte[] dataHeader = new byte[10]; while (iCntr < 10) { int iVal = nsNetStream.ReadByte(); if (iVal == -1) { Thread.Sleep(5); continue; } dataHeader[iCntr] = (byte)iVal; iCntr++; }
All this does is to read 10 bytes one-at-a-time from the network using
NetworkStream nsNetStream.ReadByte()
. This method attempts to read one byte from the network and, if a byte is available, returns it as anint
, otherwise, it returns-1
. It is simple enough to check if the byte read is-1
or not, so we simply do that for each attempt. If it is-1
, weSleep()
the thread so we don't consume a large amount of CPU time waiting for more data. - [
private
static
]void
WorkAsyncCreate(Object)
:// Internal method which Asynchronously creates the USDTPSocket. // Arg: Object representing an Array of Objects representing the // arguments to this thread-start method. private static void WorkCreateAsync(Object Arg) { Object[] Args =(Object[])Arg; IPEndPoint epHost = (IPEndPoint)Args[0]; CreationType ctUse = (CreationType)Args[1]; USDTPEventHandler.dUSDTPSocketEvent seCallback = (USDTPEventHandler.dUSDTPSocketEvent)Args[2]; CreationResult Result = CreationResult.Invalid; USDTPSocket usSock = Create(epHost, ctUse, out Result); AsyncCreationInfo acInf = new AsyncCreationInfo(); acInf.Result = Result; acInf.usSocket = usSock; seCallback(null, USDTPEventHandler.USDTPSocketEventType.Creation_Complete, acInf); }
This method is run on a separate thread and simply enables the asynchronous creation of an
USDTPSocket
. When creation is complete, regardless of success or failure, the provideddelegate
(aUSDTPEventHandler.dUSDTPSocketEvent
) is called. - [
public
static
]USDTPSocket
CreateAsync(IPEndPoint, CreationType, USDTPEventHandler.dUSDTPSocketEvent)
:// Asynchronously creates a USDTPSocket. See: USDTPSocket.Create(); // Callback: The callback delegate to be called after creation is complete. public static void CreateAsync(IPEndPoint Host, CreationType Use, USDTPEventHandler.dUSDTPSocketEvent Callback) { Thread tCreator = new Thread(WorkCreateAsync); tCreator.Priority = ThreadPriority.AboveNormal; tCreator.Start(new Object[] { Host, Use, Callback }); }
A call to this method begins the asynchronous creation of a
USDTPSocket
(see above).
And, for good measure, we close the trailing parenthesis.
}
}
The Listener
As we reach the end of this tutorial, only one task remains, the creation of the USDTPListener
class. This class will server as the Listener implementation. It will be able to accept connections on multiple IP addresses and ports. It should also verify that a client is USDTP-valid before accepting it. Additionally, stop, pause, and start functionality should be available. Once again, because this file is large, it will be separated into parts.
Namespaces and Class Definition
using System;
using System.Text;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
namespace USDTP
{
public class USDTPListener
{
System.Text
[Encoding
]System.Collections.Generic
[Queue<T>
,List<T>
]System.Net
[IPAddress
,IPEndPoint
]System.Net.Sockets
[TcpClient
,TcpListener
,NetworkStream
]System.Threading
[Thread
,ThreadPriority
,ThreadStart
]- (
USDTP
) [USDTPSocket
]
Properties (Standard)
//All the registered listeners.
private List<TcpListener> ltlListeners = new List<TcpListener>();
private bool bRun = false; //Should the Manager thread accept connections?
private bool bAlive = true; //Is this instance alive (i.e. thread is running)?
private bool bHalt = true; //Should new listeners be automatically stared?
private Thread tWorker; //Primary worker thread
private USDTPEventHandler ehHandler; // Event Handler
private bool bUseHandler = false; // Should Event Handler be used?
// The new connections that have been accepted but not yet retrieved by the user.
public Queue<USDTPSocket> PendingRequests = new Queue<USDTPSocket>();
The only properties worth noting here are ltlListeners
and PendingRequests
.ltlListners
is a List<T>
of TcpListener
s. This property contains all the active underlying listeners in use by the object.PendingRequests
is a Queue<T>
of USDTPSocket
s. This property contains all the accepted and validated USDTPSockets
that are ready to be used by the user (by means of Dequeue()
).
Properties (Get-Only)
// Gets whether new connection attempts are being accepted.
public bool isRunning
{
get
{
return bRun;
}
}
// Get whether this instance is functioning.
public bool isAlive
{
get
{
return bAlive;
}
}
Management Methods
These methods help to accept and process incoming connections.
- [
private
]void
HandleClient(TcpClient)
:// Internal Method which processes a newly accepted TcpClient and // enqueues if it is valid. // tcCli: The newly accepted TcpClient. private void HandleClient(TcpClient tcCli) { NetworkStream nsNetStream = tcCli.GetStream(); //Read Connection Header... int iCntr = 0; byte[] dataHeader = new byte[10]; while (iCntr < 10) { int iVal = nsNetStream.ReadByte(); if (iVal == -1) { Thread.Sleep(5); continue; } dataHeader[iCntr] = (byte)iVal; iCntr++; } //Decode it... String sResponse = Encoding.UTF8.GetString(dataHeader); if (sResponse == ".USDTP <?>") { //IT'S VALID //So accept it. nsNetStream.Write(Encoding.UTF8.GetBytes(".USDTP <->"), 0, 10); //And enqueue it in the list of Sockets To Be Handled. PendingRequests.Enqueue(new USDTPSocket(tcCli, true, true)); //And, if the handler is to be used,... if (bUseHandler) //Call the proper Listener Event ehHandler.CallListenerEvent(this, USDTPEventHandler. USDTPListenerEventType.Pending_Request_Accepted, null); } else { //IT'S INVALID //So refuse it. nsNetStream.Write(Encoding.UTF8.GetBytes(".USDTP <X>"), 0, 10); //And Disconnect. nsNetStream.Close(); tcCli.Close(); tcCli.Client.Close(); } }
This method is called when a new
TcpClient
has been accepted byManage()
and is ready to be processed. It takes the client and reads the expected 10-byte header from the network. If this header is valid, it sends the acceptance message (defined at the beginning of this tutorial),Enqueue()
s the newUSDTPClient
intoPendingRequesets
, and fires the proper event onehHandler
. Otherwise it sends the refusal message and disconnects from the client. The workings of this method should, by now, be quite clear. - [
private
]void
Manage()
:// Internal method which accepts and processes new connection attempts // and passes control to HandleClient(); private void Manage() { //While Alive... while (bAlive) { //And accepting connections... while (bRun) { //Check each listener... foreach (TcpListener tlList in ltlListeners) { try { //while it has pending requests... while (tlList.Pending()) { //Accept the request... TcpClient tcCli = tlList.AcceptTcpClient(); //And handle it. HandleClient(tcCli); } } catch { } //Sleep so we don't consume too much CPU time. Thread.Sleep(0); } //Sleep so we don't consume too much CPU time. Thread.Sleep(1); } //Sleep so we don't consume too much CPU time. Thread.Sleep(25); } }
This method is run on
tWorker
. It constantly checks each and every registered listener (inltlListners
) to see if it has new clients. If it does, if passes the newTcpClient
toHandleClient()
. It has two inner loops to compensate for the stop/pause/start functions.
Constructors
- [
public
] {constructor}USDTPListner()
:// Default Constructor public USDTPListener() { tWorker = new Thread(Manage); tWorker.Priority = ThreadPriority.AboveNormal; tWorker.Start(); }
All this does is to start
Manage()
ontWorker
. - {deconstructor}
~USDTPListner()
:~USDTPListener() { Stop(); }
State Methods
- [
public
]void
Stop()
:// Permanently stops this instance from accepting requests and // terminates all listeners. public void Stop() { if (!bAlive) return; try { bRun = false; bAlive = false; Thread.Sleep(0); if (!tWorker.Join(100)) { tWorker.Abort(); tWorker.Join(); } foreach (TcpListener tlList in ltlListeners) { try { tlList.Stop(); } catch { } } ltlListeners.Clear(); } catch { } }
This method is called when the object is to stop all activity.
- [
public
]void
Pause(bool)
:// Pauses this instance from processing requests. // HaltAttempts: True if all listeners should be paused as well, // otherwise requests will be queued to be later accepted // and processed when Start() is called. public void Pause(bool HaltAttempts) { bRun = false; bHalt = HaltAttempts; if (bHalt) { foreach (TcpListener tlList in ltlListeners) tlList.Stop(); } }
This method is called to temporarily pause all processing activity. If
HaltAttempts
istrue
, all listeners are "paused" as well, so incoming requests will be denied. - [
public
]void
Start()
:// Starts or resumes this instance. public void Start() { bRun = true; if (bHalt) { foreach (TcpListener l in ltlListeners) l.Start(); bHalt = false; } }
This method is called to start or resume activity. Note how it handles if
bHalt
istrue
.
Listener Methods
- [
public
]void
RegisterListener(IPAddress, int)
:// Registers a new listener to listen for incoming connection attempts. // localaddr: The local address on which this listener should listen. // port: The port on which this listener should listen. // A listener may never be unregistered. public void RegisterListener(IPAddress localaddr, int port) { TcpListener tlList = new TcpListener(localaddr, port); if (!bHalt) tlList.Start(); ltlListeners.Add(tlList); }
This adds a listener to listen for connection attempts and starts it if necessary.
Event Handler Methods
You should be familiar with these from USDTPSocket
.
- [
public
]void
RegisterHandler(USDTPEventHandler)
:// Registers an USDTPEventHandler for use as the primary event handler. // Throws a System.InvalidOperationException if another handler has // already been registered. // Avoid this by calling (USDTPListener)instance.UnRegisterHandler() beforehand. public void RegisterHandler(USDTPEventHandler handler) { if (bUseHandler) throw new InvalidOperationException ("An USDTPEventHandler has already been registered!"); ehHandler = handler; bUseHandler = true; }
- [
public
]void
UnRegisterHandler()
:// Unregisters the current USDTPEventHandler. // This has no effect if a handler has not yet been registered. public void UnRegisterHandler() { bUseHandler = false; }
Using USDTP
We have reached the end of the tutorial section of this article. If you have read entirely through this article, use of the code we have created should be pretty clear. However, if you have not or it is not, here is a short and dirty demo.
using System;
using System.Net;
using USDTP;
namespace USDTP_Tester
{
class Program
{
//Method that handles listener events
private static void HandleListener
(USDTPListener listener, USDTPEventHandler.USDTPListenerEventType eventtype,
Object EventInfo)
{
Console.WriteLine("Listener Event: " + eventtype.ToString());
}
//Method that handles socket events
private static void HandleSocket
(USDTPSocket socket, USDTPEventHandler.USDTPSocketEventType eventtype,
Object EventInfo)
{
Console.WriteLine("Socket Event: " + eventtype.ToString());
}
static void Main(string[] args)
{
//create handler
USDTPEventHandler Handler = new USDTPEventHandler();
Handler.RegisterListenerEventHandler(HandleListener);
Handler.RegisterSocketEventHandler(HandleSocket);
//create listener
USDTPListener Listener = new USDTPListener();
Listener.RegisterHandler(Handler);
Listener.RegisterListener(IPAddress.Any, 367);
Listener.Start();
//server socket
USDTPSocket ssSock = null;
//create client socket
USDTPSocket.CreationResult res;
USDTPSocket csSock = USDTPSocket.Create(new IPEndPoint
(IPAddress.Loopback, 367),
USDTPSocket.CreationType.SendRecieve, out res);
//try to get server socket
while (ssSock == null)
{
Console.WriteLine("Press Any Key To Retrieve Server-Side Client!");
Console.ReadKey(true);
try
{
ssSock = Listener.PendingRequests.Dequeue();
}
catch
{
Console.WriteLine(" >> Error Occured!");
}
}
//register handler with server-side socket
ssSock.RegisterHandler(Handler);
//create message
USDTPMessage msgout = new USDTPMessage(0x00, new byte[][]
{ new byte[] { 0x01, 0x02, 0x03, 0x04 }, new byte[]
{ 0x05, 0x06, 0x07 }, new byte[] { 0x08, 0x09 } });
//send message to server
csSock.SendMessage(msgout);
//wait for message
USDTPMessage msgin = ssSock.WaitMessage();
//display results
Console.WriteLine("Message received!");
Console.WriteLine(" >> Header: " + msgin.Header);
foreach (byte[] bp in msgin.Payload)
{
Console.WriteLine(" >> Payload Segment:");
foreach (byte b in bp)
Console.WriteLine(" >> " + b);
}
Console.WriteLine("Press Any Key To Shutdown!");
Console.ReadKey();
//close connections
csSock.Disconnect();
ssSock.Disconnect();
//kill listener
Listener.Stop();
Console.WriteLine("Press Any Key To Exit!");
Console.ReadKey(true);
}
}
}
This should produce output similar to the following...
Listener Event: Pending_Request_Accepted
Press Any Key To Retrieve Server-Side Client!
<Key Press>
Socket Event: Message_Recieved
Message received!
>> Header: 0
>> Payload Segment:
>> 1
>> 2
>> 3
>> 4
>> Payload Segment:
>> 5
>> 6
>> 7
>> Payload Segment:
>> 8
>> 9
Press Any Key To Shutdown!
<Key Press>
Socket Event: Connection_Lost
Press Any Key To Exit!
<Key Press>
<Console Closes>
I hope that answers all questions.
Conclusion
In this tutorial, we created a protocol called USDTP and an implementation of it in C#. This included a Socket
and Listener
class, as well as a Packer, Unpacker, and Event Handler.
Feel free to drop me a line with any questions.
History
- 13th June, 2008: First version