|
using System;
using System.Collections.Generic;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using Pfz.Caching;
using Pfz.Threading;
namespace Pfz.Remoting
{
/// <summary>
/// Channeller that uses NamedPipes to create new channels.
/// </summary>
public sealed class NamedPipeChanneller:
ThreadSafeExceptionAwareDisposable,
IChanneller
{
private object fStreamsLock = new object();
private List<ExceptionAwareStream> fStreams = new List<ExceptionAwareStream>();
/// <summary>
/// Creates a new NamedPipeChanneller using the given stream as the
/// "listener" stream.
/// </summary>
public NamedPipeChanneller(Stream baseStream, EventHandler<ChannelCreatedEventArgs> remoteChannelCreated)
{
if (baseStream == null)
throw new ArgumentNullException("baseStream");
BaseStream = baseStream;
GCUtils.Collected += p_Collected;
RemoteChannelCreated = remoteChannelCreated;
UnlimitedThreadPool.Run(p_Reader);
}
private void p_Collected()
{
if (BaseStream == null)
{
GCUtils.Collected -= p_Collected;
return;
}
try
{
AbortSafe.Lock
(
fStreamsLock,
delegate
{
var oldStreams = fStreams;
var newStreams = new List<ExceptionAwareStream>(oldStreams.Count);
foreach(var stream in oldStreams)
{
if (!stream.WasDisposed)
newStreams.Add(stream);
}
fStreams = newStreams;
}
);
}
catch
{
}
}
/// <summary>
/// Disposes the base stream.
/// </summary>
protected override void Dispose(bool disposing)
{
if (disposing)
{
GCUtils.Collected -= p_Collected;
var baseStream = BaseStream;
if (baseStream != null)
{
BaseStream = null;
var disposable = baseStream as IExceptionAwareDisposable;
if (disposable != null)
disposable.Dispose(DisposeException);
else
baseStream.Dispose();
}
if (fStreamsLock != null)
{
AbortSafe.Lock
(
fStreamsLock,
delegate
{
if (fStreams != null)
foreach(var stream in fStreams)
stream.Dispose();
}
);
}
}
base.Dispose(disposing);
if (disposing)
{
var disposed = Disposed;
if (disposed != null)
disposed(this, EventArgs.Empty);
}
}
/// <summary>
/// Gets the base (listener) stream.
/// </summary>
public Stream BaseStream { get; private set; }
/// <summary>
/// Creates a new channel.
/// </summary>
public ExceptionAwareStream CreateChannel()
{
return CreateChannel(null);
}
/// <summary>
/// Creates a new channel sending initial data to it.
/// </summary>
public ExceptionAwareStream CreateChannel(object createData)
{
string name = "Pfz.Remoting.NamedPipeChanneller.NamedPipeChannel_" + Guid.NewGuid();
var result = DuplexStream.CreateNamedPipeServer(name, false);
AbortSafe.Lock
(
BaseStream,
delegate
{
var binaryFormatter = new BinaryFormatter();
binaryFormatter.Serialize(BaseStream, new KeyValuePair<string, object>(name, createData));
}
);
result.ReadStream.WaitForConnection();
result.WriteStream.WaitForConnection();
AbortSafe.UnabortableLock
(
fStreamsLock,
() => fStreams.Add(result)
);
return result;
}
private void p_Reader()
{
var stream = BaseStream;
var binaryFormatter = new BinaryFormatter();
try
{
while(true)
{
var deserializedData = binaryFormatter.Deserialize(BaseStream);
var pair = (KeyValuePair<string, object>)deserializedData;
var args = new ChannelCreatedEventArgs();
var channel = DuplexStream.CreateNamedPipeClient(".", pair.Key, true);
args.Channel = channel;
args.Data = pair.Value;
AbortSafe.UnabortableLock
(
fStreamsLock,
() => fStreams.Add(channel)
);
UnlimitedThreadPool.Run
(
() =>
{
Exception exception = null;
try
{
RemoteChannelCreated(this, args);
}
catch(Exception caughtException)
{
exception = caughtException;
}
finally
{
if (args.CanDisposeChannel)
args.Channel.Dispose(exception);
}
}
);
}
}
catch
{
if (!WasDisposed)
throw;
}
}
/// <summary>
/// Event invoked just after this channeller is disposed.
/// </summary>
public event EventHandler Disposed;
/// <summary>
/// Event invoked when a channel is created by the remote side.
/// </summary>
public event EventHandler<ChannelCreatedEventArgs> RemoteChannelCreated;
#region IChanneller Members
ExceptionAwareStream IChanneller.CreateChannel()
{
return CreateChannel();
}
ExceptionAwareStream IChanneller.CreateChannel(object createData)
{
return CreateChannel(createData);
}
#endregion
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
I started to program computers when I was 11 years old, as a hobbyist, programming in AMOS Basic and Blitz Basic for Amiga.
At 12 I had my first try with assembler, but it was too difficult at the time. Then, in the same year, I learned C and, after learning C, I was finally able to learn assembler (for Motorola 680x0).
Not sure, but probably between 12 and 13, I started to learn C++. I always programmed "in an object oriented way", but using function pointers instead of virtual methods.
At 15 I started to learn Pascal at school and to use Delphi. At 16 I started my first internship (using Delphi). At 18 I started to work professionally using C++ and since then I've developed my programming skills as a professional developer in C++ and C#, generally creating libraries that help other developers do their work easier, faster and with less errors.
Want more info or simply want to contact me?
Take a look at:
http://paulozemek.azurewebsites.net/
Or e-mail me at: paulozemek@outlook.com
Codeproject MVP 2012, 2015 & 2016
Microsoft MVP 2013-2014 (in October 2014 I started working at Microsoft, so I can't be a Microsoft MVP anymore).