using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using Pfz.Threading;
namespace Pfz.Remoting.Udp
{
/// <summary>
/// An UDP connection that supports guaranteed delivery. It is not ordered, so it does not replaces the Tcp/IP.
/// </summary>
public sealed class GuaranteedUdpConnection:
ThreadSafeDisposable,
IGuaranteedUdpConnection
{
private readonly Dictionary<long, _BytePair> _pendings = new Dictionary<long, _BytePair>();
private readonly ManagedManualResetEvent _resendEvent1 = new ManagedManualResetEvent();
private readonly ManagedManualResetEvent _resendEvent2 = new ManagedManualResetEvent();
private readonly ManagedAutoResetEvent _confirmEvent1 = new ManagedAutoResetEvent();
private readonly ManagedAutoResetEvent _confirmEvent2 = new ManagedAutoResetEvent();
private readonly ManagedSemaphore _supportsGuaranteedSemaphore;
private readonly List<long> _missings = new List<long>();
private readonly int _confirmImmediatelyCount;
private readonly int _pendingGuaranteedDeliveryCount;
private readonly IPEndPoint _remoteEndpoint;
private UdpClient _udp;
private long _idGenerator = 1;
internal GuaranteedUdpConnection(GuaranteedUdpListener listener, IPEndPoint endpoint)
{
try
{
_listener = listener;
_udp = listener._udp;
_remoteEndpoint = endpoint;
_pendingGuaranteedDeliveryCount = listener._pendingGuaranteedDeliveryCount;
_supportsGuaranteedSemaphore = new ManagedSemaphore(listener._pendingGuaranteedDeliveryCount);
_confirmImmediatelyCount = listener._confirmImmediatelyCount;
UnlimitedThreadPool.Run(_Resender);
UnlimitedThreadPool.Run(_Confirmer);
}
catch
{
Dispose();
throw;
}
}
/// <summary>
/// Creates a new instance of this class.
/// </summary>
public GuaranteedUdpConnection(string hostname, int port, int pendingGuaranteedDeliveryCount=57):
this(new IPEndPoint(IPAddress.Parse(hostname), port), pendingGuaranteedDeliveryCount)
{
}
private static readonly byte[] _emptyBytes = new byte[1];
/// <summary>
/// Creates a new instance of this class.
/// </summary>
public GuaranteedUdpConnection(IPEndPoint remoteEndpoint, int pendingGuaranteedDeliveryCount=57)
{
try
{
if (remoteEndpoint == null)
throw new ArgumentNullException("remoteEndpoint");
if (pendingGuaranteedDeliveryCount < 2)
throw new ArgumentException("pendingGuaranteedDeliveryCount must be at least 2.", "pendingGuaranteedDeliveryCount");
_pendingGuaranteedDeliveryCount = pendingGuaranteedDeliveryCount;
_udp = new UdpClient(0);
_remoteEndpoint = remoteEndpoint;
_supportsGuaranteedSemaphore = new ManagedSemaphore(pendingGuaranteedDeliveryCount);
_confirmImmediatelyCount = pendingGuaranteedDeliveryCount/2;
UnlimitedThreadPool.Run(_Resender);
UnlimitedThreadPool.Run(_Confirmer);
UnlimitedThreadPool.Run(_Receiver);
GuaranteedSend(_emptyBytes, 0, 1);
}
catch
{
Dispose();
throw;
}
}
/// <summary>
/// Releases all resources used by this UDP connection.
/// </summary>
protected override void Dispose(bool disposing)
{
if (disposing)
{
if (_supportsGuaranteedSemaphore != null)
_supportsGuaranteedSemaphore.Dispose();
if (_resendEvent1 != null)
_resendEvent1.Dispose();
if (_resendEvent2 != null)
_resendEvent2.Dispose();
if (_confirmEvent1 != null)
_confirmEvent1.Dispose();
if (_confirmEvent2 != null)
_confirmEvent2.Dispose();
if (_receivesEvent != null)
_receivesEvent.Dispose();
if (_listener == null)
Disposer.Dispose(ref _udp);
else
{
lock(_listener.DisposeLock)
if (!_listener.WasDisposed)
_listener._connections.Remove(_remoteEndpoint);
}
}
base.Dispose(disposing);
}
/// <summary>
/// Gets information about the RemoteEndpoint.
/// </summary>
public IPEndPoint RemoteEndpoint
{
get
{
return _remoteEndpoint;
}
}
private static readonly TimeSpan _200Milliseconds = TimeSpan.FromMilliseconds(200);
private void _Resender()
{
try
{
while(true)
{
_resendEvent1.WaitOne();
if (_resendEvent2.WaitOne(200))
return;
lock(DisposeLock)
{
if (WasDisposed)
return;
foreach(var pair in _pendings.Values)
{
var dateTime = pair._dateTime;
var now = DateTime.Now;
TimeSpan diff = now-dateTime;
if (diff >= TimeSpan.Zero && diff < _200Milliseconds)
continue;
if (pair._count > (60000 / 200))
return; // this is a timeout, so we return, disposing ourselves.
var bytes = pair._bytes;
_udp.Send(bytes, bytes.Length, _remoteEndpoint);
pair._dateTime = DateTime.Now;
pair._count++;
}
}
}
}
catch
{
}
finally
{
Dispose();
}
}
private int _countReceivesWithoutConfirmation;
private void _Confirmer()
{
try
{
while(true)
{
// we will keep sending packets from 5 to 5 seconds to avoid disconnect.
_confirmEvent1.WaitOne(5000);
_confirmEvent2.WaitOne(57);
if (_confirmEvent2.WasDisposed)
return;
_countReceivesWithoutConfirmation = 0;
byte[] buffer;
int bufferLength;
lock(DisposeLock)
{
if (WasDisposed)
return;
long lastReceivedId = -_lastReceivedId;
int count = _missings.Count;
if (count > 50)
{
_confirmEvent1.Set();
count = 50;
lastReceivedId = -1;
}
bufferLength = (count*8) + 10;
buffer = new byte[bufferLength];
var sizeBytes = BitConverter.GetBytes((ushort)bufferLength-10);
buffer[0] = sizeBytes[0];
buffer[1] = sizeBytes[1];
var idBytes = BitConverter.GetBytes(lastReceivedId);
Buffer.BlockCopy(idBytes, 0, buffer, 2, 8);
int pos = 10;
for(int i=0; i<count; i++)
{
long missingId = _missings[i];
Buffer.BlockCopy(BitConverter.GetBytes(missingId), 0, buffer, pos, 8);
pos += 8;
}
}
_udp.Send(buffer, bufferLength, _remoteEndpoint);
}
}
catch
{
}
finally
{
Dispose();
}
}
private long _lastReceivedId = 1;
private bool _ProcessReceived(long id, byte[] received)
{
if (id < _lastReceivedId)
{
bool result = _missings.Remove(id);
return result;
}
if (id == _lastReceivedId)
return false;
if (id == _lastReceivedId+1)
{
_lastReceivedId++;
return true;
}
for(long i=_lastReceivedId+1; i<id; i++)
_missings.Add(i);
_lastReceivedId = id;
return true;
}
private readonly HashSet<long> _toRemove = new HashSet<long>();
private long _lastConfirmedId = 1;
private void _ProcessConfirmation(long lastConfirmedId, byte[] received)
{
lock(DisposeLock)
{
if (lastConfirmedId != 1)
_lastConfirmedId = lastConfirmedId;
_toRemove.Clear();
foreach(long id in _pendings.Keys)
if (id <= lastConfirmedId)
_toRemove.Add(id);
int length = received.Length;
for(int pos=10; pos < length; pos += 8)
{
long idStillMissing = BitConverter.ToInt64(received, pos);
_toRemove.Remove(idStillMissing);
}
int toRemoveCount = _toRemove.Count;
if (toRemoveCount > 0)
{
foreach(var id in _toRemove)
_pendings.Remove(id);
_supportsGuaranteedSemaphore.Exit(toRemoveCount);
if (_pendings.Count == 0)
_resendEvent1.Reset();
}
}
}
private readonly GuaranteedUdpListener _listener;
internal readonly ManagedAutoResetEvent _receivesEvent = new ManagedAutoResetEvent();
internal readonly Queue<byte[]> _receivesQueue = new Queue<byte[]>();
/// <summary>
/// Receives an UDP packet. Note that it includes a header used by the framework, so
/// check the UdpHeaderSize property.
/// </summary>
public byte[] UdpReceive()
{
while(true)
{
lock(DisposeLock)
{
if (WasDisposed)
return null;
if (_receivesQueue.Count > 0)
return _receivesQueue.Dequeue();
}
_receivesEvent.WaitOne();
}
}
private void _Receiver()
{
try
{
IPEndPoint endpoint = null;
while(!WasDisposed)
{
byte[] received = _udp.Receive(ref endpoint);
if (received == null)
return;
if (!endpoint.Equals(_remoteEndpoint))
continue;
_StoreReceived(received);
}
}
catch
{
}
finally
{
Dispose();
}
}
internal void _StoreReceived(byte[] received)
{
lock(DisposeLock)
{
if (WasDisposed)
return;
if (received.Length < 2)
return;
int size = BitConverter.ToUInt16(received, 0);
if (size + 10 != received.Length)
return;
long id = BitConverter.ToInt64(received, 2);
if (id < 0)
{
_ProcessConfirmation(-id, received);
return;
}
bool canEnqueue = id != 2 || _listener == null;
if (canEnqueue)
{
if (_receivesQueue.Count >= _pendingGuaranteedDeliveryCount)
return;
}
bool firstTime = id == 0 || _ProcessReceived(id, received);
_confirmEvent1.Set();
if (!firstTime)
return;
if (canEnqueue)
{
_receivesQueue.Enqueue(received);
_receivesEvent.Set();
}
if (_countReceivesWithoutConfirmation < _confirmImmediatelyCount)
_countReceivesWithoutConfirmation++;
else
{
_countReceivesWithoutConfirmation = 0;
_confirmEvent2.Set();
}
}
}
/// <summary>
/// Sends a guaranteed to arrive packet. It is still unordered.
/// </summary>
public void GuaranteedSend(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentException("offset can't be negative.", "offset");
if (offset > buffer.Length)
throw new ArgumentOutOfRangeException("offset");
if (count > 65535)
throw new ArgumentOutOfRangeException("count", "Packets must have at maximum 65535 bytes.");
if (offset+count > buffer.Length)
throw new ArgumentException("offset+count is outside the buffer limits.");
if (count == 0)
return;
int newLength = count + 10;
var lengthBytes = BitConverter.GetBytes((ushort)count);
byte[] newBuffer = new byte[newLength];
newBuffer[0] = lengthBytes[0];
newBuffer[1] = lengthBytes[1];
Buffer.BlockCopy(buffer, offset, newBuffer, 10, count);
long id = Interlocked.Increment(ref _idGenerator);
Buffer.BlockCopy(BitConverter.GetBytes(id), 0, newBuffer, 2, 8);
_supportsGuaranteedSemaphore.Enter();
lock(DisposeLock)
{
CheckUndisposed();
_pendings.Add(id, new _BytePair(newBuffer));
_resendEvent1.Set();
_udp.Send(newBuffer, newLength, _remoteEndpoint);
}
}
/// <summary>
/// Sends an udp packet. Note that if you are using guaranteed sends you should not use the udp object to send,
/// you must send udp packets by this method.
/// </summary>
public void UdpSend(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0)
throw new ArgumentException("offset can't be negative.", "offset");
if (offset > buffer.Length)
throw new ArgumentOutOfRangeException("offset");
if (count > 65535)
throw new ArgumentOutOfRangeException("Packets must have at maximum 65535 bytes.");
if (offset+count > buffer.Length)
throw new ArgumentException("offset+count is outside the buffer limits.");
if (count == 0)
return;
int newLength = count + 10;
var lengthBytes = BitConverter.GetBytes((ushort)count);
byte[] newBuffer = new byte[newLength];
newBuffer[0] = lengthBytes[0];
newBuffer[1] = lengthBytes[1];
// the id remains 0 because it is not set.
Buffer.BlockCopy(buffer, offset, newBuffer, 10, count);
lock(DisposeLock)
{
CheckUndisposed();
_udp.Send(newBuffer, newLength, _remoteEndpoint);
}
}
/// <summary>
/// Gets the size of the Header information put in all packets.
/// </summary>
public int UdpHeaderSize
{
get
{
return 10;
}
}
/// <summary>
/// Checks if a packet was sent as guaranteed (true) or not.
/// </summary>
public bool IsGuaranteedPacket(byte[] packet)
{
return BitConverter.ToInt64(packet, 2) != 0;
}
}
}