|
using System;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using System.Threading.Tasks;
namespace Pfz.DistributableObservablePoco.Remote.MinimalImplementation
{
public sealed class DopMessagePortOverStream:
IDopMessagePort
{
private readonly object _lock = new object();
private BinaryFormatter _serializer = new BinaryFormatter();
private Stream _stream;
private MemoryStream _memoryStream = new MemoryStream();
private readonly WeakReference _weakMemoryStream;
private Action<object> _messageReceivedHandler;
public DopMessagePortOverStream(Stream stream, Action<object> messageReceivedHandler)
{
if (stream == null)
throw new ArgumentNullException("stream");
if (messageReceivedHandler == null)
throw new ArgumentNullException("messageReceivedHandler");
_weakMemoryStream = new WeakReference(_memoryStream);
_completedDelegate = _WriteCompleted;
_stream = stream;
_messageReceivedHandler = messageReceivedHandler;
var thread = new Thread(_Receiver);
thread.Start();
}
public void Dispose()
{
Dispose(null);
}
public void Dispose(Exception exception)
{
if (IsDisposed)
return;
Stream stream;
EventHandler disposed;
lock(_lock)
{
stream = _stream;
if (stream == null)
return;
_stream = null;
DisposeException = exception;
_serializer = null;
_messageReceivedHandler = null;
_memoryStream = null;
disposed = _disposed;
_disposed = null;
}
stream.Dispose();
if (disposed != null)
disposed(this, EventArgs.Empty);
}
public bool IsDisposed
{
get
{
return _stream == null;
}
}
public Exception DisposeException { get; private set; }
private void _Receiver()
{
try
{
while(true)
{
var serializer = _serializer;
var stream = _stream;
if (serializer == null || stream == null)
return;
// Possibly optimizations:
// Make the reader async, avoiding a thread for it.
// Put a message size in its header, so we can read the entire message
// with a single read call (today the deserializer blocks the thread
// to do the read).
// Put a limit on the message size, so a client can't attack us
// by giving an "infinite" message.
var message = serializer.Deserialize(stream);
_messageReceivedHandler(message);
}
}
catch(Exception exception)
{
Dispose(exception);
}
}
private ObjectDisposedException _DisposedException()
{
return new ObjectDisposedException("This message port is disposed.", DisposeException);
}
public void PostMessage(object message)
{
lock(_lock)
{
if (IsDisposed)
throw _DisposedException();
if (_memoryStream == null)
{
_memoryStream = (MemoryStream)_weakMemoryStream.Target;
if (_memoryStream == null)
{
_memoryStream = new MemoryStream();
_weakMemoryStream.Target = _memoryStream;
}
}
_serializer.Serialize(_memoryStream, message);
}
}
public Task Flush()
{
lock(_lock)
{
if (IsDisposed)
throw _DisposedException();
var completionSource = new TaskCompletionSource<bool>();
int length = (int)_memoryStream.Length;
_stream.BeginWrite(_memoryStream.GetBuffer(), 0, length, _completedDelegate, completionSource);
_memoryStream.Position = 0;
_memoryStream.SetLength(0);
if (length > 64*1024)
_memoryStream = null;
return completionSource.Task;
}
}
private AsyncCallback _completedDelegate;
private void _WriteCompleted(IAsyncResult result)
{
try
{
var source = (TaskCompletionSource<bool>)result.AsyncState;
_stream.EndWrite(result);
source.SetResult(true);
_stream.Flush();
}
catch(Exception exception)
{
Dispose(exception);
// we should not rethrow as doing that would kill the application.
}
}
private EventHandler _disposed;
public event EventHandler Disposed
{
add
{
if (value == null)
return;
lock(_lock)
{
if (IsDisposed)
value(this, EventArgs.Empty);
else
_disposed += value;
}
}
remove
{
lock(_lock)
_disposed -= value;
}
}
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
I started to program computers when I was 11 years old, as a hobbyist, programming in AMOS Basic and Blitz Basic for Amiga.
At 12 I had my first try with assembler, but it was too difficult at the time. Then, in the same year, I learned C and, after learning C, I was finally able to learn assembler (for Motorola 680x0).
Not sure, but probably between 12 and 13, I started to learn C++. I always programmed "in an object oriented way", but using function pointers instead of virtual methods.
At 15 I started to learn Pascal at school and to use Delphi. At 16 I started my first internship (using Delphi). At 18 I started to work professionally using C++ and since then I've developed my programming skills as a professional developer in C++ and C#, generally creating libraries that help other developers do their work easier, faster and with less errors.
Want more info or simply want to contact me?
Take a look at:
http://paulozemek.azurewebsites.net/
Or e-mail me at: paulozemek@outlook.com
Codeproject MVP 2012, 2015 & 2016
Microsoft MVP 2013-2014 (in October 2014 I started working at Microsoft, so I can't be a Microsoft MVP anymore).