|
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Net.Sockets;
using System.IO;
using System.Xml.Serialization;
namespace Utils
{
public class SocketReciever<D, R> : IDisposable
{
public delegate void NewData(D data);
public delegate void ConnectionChanged(Boolean connected);
private log4net.ILog log = log4net.LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType.Name);
// default timeout for first message
private const int DEFAULT_TIMEOUT = 5000;
private String hostname;
private int port;
private Socket socket;
private int reconnectSleep;
private XmlSerializer dataSerializer = new XmlSerializer(typeof(D));
private XmlSerializer responseSerializer = new XmlSerializer(typeof(R));
private StreamReader m_reader;
private StreamWriter m_writer;
private Thread readerThread;
private Boolean running = false;
// delegate for new data recieved
private NewData onNewData;
public event NewData OnNewData
{
add
{
onNewData = (NewData)System.Delegate.Combine(onNewData, value);
}
remove
{
onNewData = (NewData)System.Delegate.Remove(onNewData, value);
}
}
// delegate for changes in communication
private ConnectionChanged onConnectionChanged;
public event ConnectionChanged OnConnectionChanged
{
add
{
onConnectionChanged = (ConnectionChanged)System.Delegate.Combine(onConnectionChanged, value);
}
remove
{
onConnectionChanged = (ConnectionChanged)System.Delegate.Remove(onConnectionChanged, value);
}
}
public Boolean Connected
{
get
{
if (socket != null)
return socket.Connected;
else
return false;
}
}
/// <summary>
/// Socket client
/// </summary>
/// <param name="hostname">server to connect to</param>
/// <param name="port">port on server</param>
public SocketReciever(String hostname, int port)
{
this.hostname = hostname;
this.port = port;
}
/// <summary>
/// Start recieving data
/// </summary>
/// <param name="reconnectSleep">wait after connection error for reconection in ms - 0 not reconnect</param>
public void Start(int reconnectSleep)
{
this.reconnectSleep = reconnectSleep;
readerThread = new Thread(run);
// start listening in new thread
readerThread.Start();
}
private void run()
{
Boolean connected = false;
running = true;
while (running)
{
try
{
if ((socket == null) || !socket.Connected)
{
log.Debug("Socket not connected - connect....");
// open socket
socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
// set timeout
socket.ReceiveTimeout = DEFAULT_TIMEOUT;
socket.Connect(hostname, port);
m_reader = new StreamReader(new NetworkStream(socket), Encoding.Unicode);
m_writer = new StreamWriter(new NetworkStream(socket), Encoding.Unicode);
// we have connection status change
if (onConnectionChanged != null)
{
onConnectionChanged.Invoke(socket.Connected);
}
connected = true;
// read timeout from server and set it
socket.ReceiveTimeout = readTimeOut();
}
// wait for read new data
readData();
}
catch (System.Threading.ThreadAbortException)
{
log.Debug("ThreadAbort exception");
// end the loop
break;
}
catch (Exception ex)
{
// errors on comunication are only logged
log.Debug("Exception: " + ex.Message);
socket.Close();
// we have connection status change
if ((connected != socket.Connected) && (onConnectionChanged != null))
{
onConnectionChanged.Invoke(socket.Connected);
}
connected = false;
// after disconnect we sleep for some time - before we try reconnect
if (running)
if (reconnectSleep > 0)
{
log.Debug("Reconnect sleep: " + reconnectSleep.ToString() + " ms");
Thread.Sleep(reconnectSleep);
}
else
{
log.Debug("Reconnect disabled");
running = false;
}
}
}
}
private int readTimeOut()
{
// read the data - sync with timeout message
int timeout = -1;
do
{
String message = m_reader.ReadLine();
timeout = Commons.ParseTimeoutMessage(message);
}
while (timeout < 0);
log.Debug("Timeout recieved: " + timeout.ToString() + " ms");
return timeout;
}
private void readData()
{
// read the data - sync with size message
int size = -1;
do
{
String message = m_reader.ReadLine();
size = Commons.ParseSizeMessage(message);
}
while (size < 0);
// size 0 means lifetick
if (size == 0)
{
log.Debug("LifeTick Recieved");
return;
}
else
{
// data
char[] buf = new char[size];
StringBuilder sb = new StringBuilder();
while (sb.Length < size)
{
int len = m_reader.Read(buf, 0, size - sb.Length);
sb.Append(new string(buf, 0, len));
}
log.Debug("Deserialization");
StringReader sReader = new StringReader(sb.ToString());
D sd = (D)dataSerializer.Deserialize(sReader);
if ((sd != null) && (onNewData != null))
{
// send the new data
onNewData.Invoke(sd);
}
}
}
// -------------------------------------------------------------------------------//
/// <summary>
/// Send data to server
/// </summary>
/// <param name="responseData">Data object</param>
public void SendResponse(R responseData)
{
// the response is send in a new thread
Thread responseThread = new Thread(sendResponseInThread);
responseThread.Start(responseData);
}
private void sendResponseInThread(Object data)
{
if (m_writer != null)
lock (this.m_writer)
try
{
StringBuilder sBuilder = new StringBuilder();
StringWriter sWriter = new StringWriter(sBuilder);
responseSerializer.Serialize(sWriter, (R)data);
// size tag on message begin
m_writer.WriteLine(Commons.CreateSizeString(sBuilder.Length));
m_writer.WriteLine(sBuilder.ToString());
m_writer.Flush();
}
catch (Exception ex)
{
log.Debug(ex.Message);
}
}
// -------------------------------------------------------------------------------//
public void Stop()
{
this.running = false;
if (socket != null)
{
Boolean oldConnection = socket.Connected;
socket.Close();
// connection change on stop
if (oldConnection && (onConnectionChanged != null))
{
onConnectionChanged.Invoke(socket.Connected);
}
}
}
#region IDisposable Members
public void Dispose()
{
if (readerThread != null)
{
Stop();
readerThread.Abort();
readerThread.Join();
}
}
#endregion
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.