|
using System;
using System.IO.MemoryMappedFiles;
using System.Threading;
using Pfz.Caching;
using Pfz.Extensions;
using Pfz.Serialization;
using Pfz.Threading;
namespace Pfz.Remoting
{
/// <summary>
/// Creates a Channeller (multi-channel support) using MemoryMappedFiles.
/// </summary>
public sealed class MmfChanneller:
ThreadSafeExceptionAwareDisposable,
IChanneller
{
private MemoryMappedFile _memoryMappedFile;
private MemoryMappedViewAccessor _accessor;
private Mutex _mutex;
private EventWaitHandle _weRequestNewChannel;
private EventWaitHandle _otherRequestNewChannel;
private EventWaitHandle _resultEvent;
private EventWaitHandle _mreDisposed;
private long _id;
internal WeakHashSet<MmfChannel> _channels = new WeakHashSet<MmfChannel>();
/// <summary>
/// Creates a new MemoryMappedFile channeller with the given id, and running as a client or server. In this case, it is not bound
/// to a listener, and you must provide a unique id (probably using TicksOrIncrement.GetValue().
/// </summary>
public MmfChanneller(long id, bool runAsServer)
{
_Initialize(id, runAsServer);
}
/// <summary>
/// Creates a new channeller that will try to connect to the given listener.
/// </summary>
public MmfChanneller(string listenerName)
{
long id;
MemoryMappedFile file = null;
MemoryMappedViewAccessor accessor = null;
Mutex mutex = null;
EventWaitHandle requestEvent = null;
EventWaitHandle resultEvent = null;
try
{
file = MemoryMappedFile.OpenExisting("Pfz.Remoting.MmfListener.MemoryMappedFile:" + listenerName);
accessor = file.CreateViewAccessor();
mutex = Mutex.OpenExisting("Pfz.Remoting.MmfListener.Mutex:" + listenerName);
requestEvent = EventWaitHandle.OpenExisting("Pfz.Remoting.MmfListener.RequestEvent:" + listenerName);
resultEvent = EventWaitHandle.OpenExisting("Pfz.Remoting.MmfListener.ResultEvent:" + listenerName);
if (!mutex.WaitOne(15000))
throw new TimeoutException("Timed-out while waiting for a response.");
try
{
resultEvent.Reset(); // we Reset here to guarantee that the server is not corrupted.
requestEvent.Set();
if (!resultEvent.WaitOne(60000))
{
throw new TimeoutException("Timed-out while waiting for a response.");
// in this situation the server gets corrupted.
}
id = accessor.ReadInt64(0);
}
finally
{
mutex.ReleaseMutex();
}
}
finally
{
resultEvent.CheckedDispose();
requestEvent.CheckedDispose();
mutex.CheckedDispose();
accessor.CheckedDispose();
file.CheckedDispose();
}
_Initialize(id, false);
}
private int _channelBufferLength = 64*1024;
/// <summary>
/// Gets or sets the channel buffer length.
/// </summary>
public int ChannelBufferLength
{
get
{
return _channelBufferLength;
}
set
{
if (value < 256)
throw new ArgumentOutOfRangeException("value", "value can't be less than 256.");
_channelBufferLength = value;
}
}
private void _Initialize(long id, bool runAsServer)
{
_id = id;
try
{
if (runAsServer)
_memoryMappedFile = MemoryMappedFile.CreateNew("Pfz.Remoting.MmfChanneller:Mmf" + id, sizeof(long));
else
_memoryMappedFile = MemoryMappedFile.OpenExisting("Pfz.Remoting.MmfChanneller:Mmf" + id);
_accessor = _memoryMappedFile.CreateViewAccessor();
bool createdNew;
_mutex = new Mutex(false, "Pfz.Remoting.MmfChanneller:Mutex" + id, out createdNew);
if (runAsServer && !createdNew)
throw new RemotingException("A mutex with the same name already exists.");
if (runAsServer)
{
_weRequestNewChannel = new EventWaitHandle(false, EventResetMode.AutoReset, "Pfz.Remoting.MmfChanneller:1:" + id);
_otherRequestNewChannel = new EventWaitHandle(false, EventResetMode.AutoReset, "Pfz.Remoting.MmfChanneller:2:" + id);
_resultEvent = new EventWaitHandle(false, EventResetMode.AutoReset, "Pfz.Remoting.MmfChanneller:Result:" + id);
}
else
{
_weRequestNewChannel = EventWaitHandle.OpenExisting("Pfz.Remoting.MmfChanneller:2:" + id);
_otherRequestNewChannel = EventWaitHandle.OpenExisting("Pfz.Remoting.MmfChanneller:1:" + id);
_resultEvent = EventWaitHandle.OpenExisting("Pfz.Remoting.MmfChanneller:Result:" + id);
}
}
catch
{
Dispose();
throw;
}
}
/// <summary>
/// Frees the resources (memory mapped files, mutexes and so on) used by this channeller.
/// </summary>
protected override void Dispose(bool disposing)
{
if (disposing)
{
var channels = _channels;
if (channels != null)
{
_channels = null;
foreach(var channel in channels)
channel.Dispose();
channels.Dispose();
}
var mreDisposed = _mreDisposed;
if(mreDisposed != null)
mreDisposed.Set();
Disposer.Dispose(ref _weRequestNewChannel);
Disposer.Dispose(ref _otherRequestNewChannel);
Disposer.Dispose(ref _mutex);
Disposer.Dispose(ref _accessor);
Disposer.Dispose(ref _memoryMappedFile);
}
base.Dispose(disposing);
}
/// <summary>
/// Creates a new channel for communication.
/// </summary>
public IChannel CreateChannel(object userData = null)
{
lock(DisposeLock)
{
CheckUndisposed();
if (_mreDisposed == null)
throw new RemotingException("This channeller was not started.");
if (!_mutex.WaitOne(15000))
throw new TimeoutException("Timed-out while trying to create a channel.");
try
{
_weRequestNewChannel.Set();
if (!_resultEvent.WaitOne(10000))
throw new TimeoutException("Timed-out while trying to create a channel.");
long id = _accessor.ReadInt64(0);
var stream = MemoryMappedFileStream.CreateAsClient("Pfz.Remoting.MmfChannellerClient.Channel:" + id, 10000);
if (userData == null)
stream.WriteByte(0);
else
{
stream.WriteByte(1);
var serializer = new BinarySerializer();
serializer.Serialize(stream, userData);
stream.Flush();
}
return new MmfChannel(this, stream);
}
finally
{
_mutex.ReleaseMutex();
}
}
}
/// <summary>
/// Starts this channeller.
/// </summary>
public void Start()
{
lock(DisposeLock)
{
CheckUndisposed();
if (_mreDisposed != null)
throw new RemotingException("This channeller was already started.");
_mreDisposed = new EventWaitHandle(false, EventResetMode.ManualReset, "MmfChanneller:Disposed" + _id);
UnlimitedThreadPool.Run(_Reader);
}
}
private void _Reader()
{
var mreDisposed = _mreDisposed;
var otherRequestedNewChannel = _otherRequestNewChannel;
if (mreDisposed == null || otherRequestedNewChannel == null)
return;
try
{
var waitHandles = new WaitHandle[2];
waitHandles[0] = mreDisposed;
waitHandles[1] = otherRequestedNewChannel;
while (true)
{
if (WaitHandle.WaitAny(waitHandles) == 0 || WasDisposed)
return;
long id = TicksOrIncrement.GetNext();
_accessor.Write(0, id);
_resultEvent.Set();
var stream = MemoryMappedFileStream.CreateAsServer("Pfz.Remoting.MmfChannellerClient.Channel:" + id, _channelBufferLength, 10000);
try
{
var args = new ChannelCreatedEventArgs();
args.Channel = new MmfChannel(this, stream);
if (stream.ReadByte() == 1)
{
BinarySerializer serializer = new BinarySerializer();
args.Data = serializer.Deserialize(stream);
}
UnlimitedThreadPool.Run(_RunChannel, args);
}
catch
{
stream.Dispose();
throw;
}
}
}
catch(Exception exception)
{
Dispose(exception);
}
finally
{
mreDisposed.Dispose();
_mreDisposed = null;
Dispose();
}
}
private void _RunChannel(ChannelCreatedEventArgs args)
{
using(args.Channel)
{
var handler = ChannelCreated;
if (handler != null)
handler(this, args);
}
}
/// <summary>
/// Event invoked when a new channel is created from the remote side.
/// </summary>
public event EventHandler<ChannelCreatedEventArgs> ChannelCreated;
string IChanneller.LocalEndpoint
{
get
{
return "MmfChanneller";
}
}
string IChanneller.RemoteEndpoint
{
get
{
return "MmfChanneller";
}
}
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
I started to program computers when I was 11 years old, as a hobbyist, programming in AMOS Basic and Blitz Basic for Amiga.
At 12 I had my first try with assembler, but it was too difficult at the time. Then, in the same year, I learned C and, after learning C, I was finally able to learn assembler (for Motorola 680x0).
Not sure, but probably between 12 and 13, I started to learn C++. I always programmed "in an object oriented way", but using function pointers instead of virtual methods.
At 15 I started to learn Pascal at school and to use Delphi. At 16 I started my first internship (using Delphi). At 18 I started to work professionally using C++ and since then I've developed my programming skills as a professional developer in C++ and C#, generally creating libraries that help other developers do their work easier, faster and with less errors.
Want more info or simply want to contact me?
Take a look at:
http://paulozemek.azurewebsites.net/
Or e-mail me at: paulozemek@outlook.com
Codeproject MVP 2012, 2015 & 2016
Microsoft MVP 2013-2014 (in October 2014 I started working at Microsoft, so I can't be a Microsoft MVP anymore).