|
using System;
using System.IO.MemoryMappedFiles;
using System.Threading;
namespace Pfz.Remoting
{
/// <summary>
/// Creates a communication reader using MemoryMappedFiles.
/// In general, using MemoryMappedFiles is faster than using NamedPipes or other means of inter-process communication.
/// </summary>
public sealed class MemoryMappedFileReader:
IDisposable
{
private MemoryMappedFile _file;
private MemoryMappedViewAccessor _accessor;
private EventWaitHandle _sentEvent;
private EventWaitHandle _readEvent;
private ManualResetEvent _disposedEvent;
private int _mappedFileLength;
private unsafe byte *_pointer;
private unsafe MemoryMappedFileReaderWriterData *_data;
private bool _readSet;
private WaitHandle[] _handles;
/// <summary>
/// Creates the reader, which should connect to the given NamedPipe and, optionally, with a time-out,
/// which will be only as the connection time-out.
/// </summary>
public MemoryMappedFileReader(string name, int connectionTimeout = -1)
{
if (connectionTimeout < -1)
throw new ArgumentOutOfRangeException("connectionTimeout");
_sentEvent = new EventWaitHandle(false, EventResetMode.AutoReset, name + ":Sent");
if (!_sentEvent.WaitOne(connectionTimeout))
throw new TimeoutException("Connection timed-out.");
_file = MemoryMappedFile.OpenExisting(name);
_accessor = _file.CreateViewAccessor();
_mappedFileLength = _accessor.ReadInt32(0);
unsafe
{
byte *pointer = null;
_accessor.SafeMemoryMappedViewHandle.AcquirePointer(ref pointer);
_pointer = pointer;
_data = (MemoryMappedFileReaderWriterData *)(pointer + _mappedFileLength);
}
_readEvent = new EventWaitHandle(false, EventResetMode.AutoReset, name + ":Read");
_readEvent.Set();
_disposedEvent = new ManualResetEvent(false);
_handles = new WaitHandle[2];
_handles[0] = _disposedEvent;
_handles[1] = _sentEvent;
}
/// <summary>
/// Frees all the resources used by this reader and, possible, releases the MemoryMappedFile.
/// </summary>
public void Dispose()
{
var disposedEvent = _disposedEvent;
if (disposedEvent != null)
disposedEvent.Set();
var readEvent = _readEvent;
unsafe
{
if (_data != null && readEvent != null)
{
_data->_ended = true;
readEvent.Set();
}
}
Disposer.Dispose(ref _accessor);
Disposer.Dispose(ref _file);
Disposer.Dispose(ref _sentEvent);
Disposer.Dispose(ref _readEvent);
}
private int _timeout = -1;
/// <summary>
/// Gets or sets the time-out of this reader.
/// This is not the same as the connection time out and must be set explicity.
/// </summary>
public int Timeout
{
get
{
return _timeout;
}
set
{
if (value < -1)
throw new ArgumentOutOfRangeException();
_timeout = value;
}
}
/// <summary>
/// Gets the Length of the memory-mapped-file.
/// </summary>
public int MemoryMappedFileLength
{
get
{
return _mappedFileLength;
}
}
/// <summary>
/// Reads a single byte from the stream.
/// May return -1 if the connection is lost, or throw a TimeoutException if no more data is being received.
/// </summary>
public int ReadByte()
{
unsafe
{
int readerPosition = _data->_readerPosition;
bool readerState = _data->_readerState;
while(readerPosition == _data->_writerPosition && readerState == _data->_writerState)
{
if (_data->_ended)
{
if (readerPosition == _data->_writerPosition && readerState == _data->_writerState) // retest needed.
return -1;
break;
}
_readSet = false;
_readEvent.Set();
switch (WaitHandle.WaitAny(_handles, _timeout))
{
case WaitHandle.WaitTimeout: throw new TimeoutException("Read timed-out.");
case 0:
{
var disposedEvent = _disposedEvent;
if (disposedEvent != null)
{
_disposedEvent = null;
disposedEvent.Dispose();
}
return -1;
}
}
}
int result = _pointer[readerPosition];
readerPosition ++;
if (readerPosition >= _mappedFileLength)
{
readerPosition = 0;
_data->_readerState = !readerState;
}
_data->_readerPosition = readerPosition;
if (!_readSet)
{
_readSet = true;
_readEvent.Set();
}
return result;
}
}
/// <summary>
/// Tries to fill the buffer, but will return the actual number of bytes read.
/// </summary>
public int Read(byte[] buffer)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
return _Read(buffer, 0, buffer.Length);
}
/// <summary>
/// Tries to fill the buffer, but will return the actual number of bytes read.
/// </summary>
public int Read(byte[] buffer, int offset)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0 || offset >= buffer.Length)
throw new ArgumentOutOfRangeException("offset");
return _Read(buffer, offset, buffer.Length - offset);
}
/// <summary>
/// Tries to fill the buffer, but will return the actual number of bytes read.
/// </summary>
public int Read(byte[] buffer, int offset, int count)
{
if (buffer == null)
throw new ArgumentNullException("buffer");
if (offset < 0 || offset >= buffer.Length)
throw new ArgumentOutOfRangeException("offset");
if (count < 0 || offset + count > buffer.Length)
throw new ArgumentOutOfRangeException("count");
return _Read(buffer, offset, count);
}
private int _Read(byte[] buffer, int offset, int count)
{
unsafe
{
int readerPosition = _data->_readerPosition;
bool readerState = _data->_readerState;
int writerPosition;
bool writerState;
while(true)
{
writerPosition = _data->_writerPosition;
writerState = _data->_writerState;
if (readerPosition != _data->_writerPosition || readerState != _data->_writerState)
break;
if (_data->_ended)
{
if (readerPosition == _data->_writerPosition && readerState == _data->_writerState) // retest needed.
return 0;
break;
}
_readSet = false;
_readEvent.Set();
switch(WaitHandle.WaitAny(_handles, _timeout))
{
case WaitHandle.WaitTimeout: throw new TimeoutException("Read timed-out.");
case 0:
{
var disposedEvent = _disposedEvent;
if (disposedEvent != null)
{
_disposedEvent = null;
disposedEvent.Dispose();
}
return -1;
}
}
}
int remaining;
if (readerState == writerState)
remaining = writerPosition - readerPosition;
else
remaining = _mappedFileLength - readerPosition;
int toRead = Math.Min(remaining, count);
fixed(byte *destinationPointer = buffer)
UnsafeBuffer.BlockCopy(_pointer + readerPosition, destinationPointer + offset, toRead);
readerPosition += toRead;
if (readerPosition >= _mappedFileLength)
{
readerPosition = 0;
_data->_readerState = !readerState;
}
_data->_readerPosition = readerPosition;
if (!_readSet)
{
_readSet = true;
_readEvent.Set();
}
return toRead;
}
}
}
}
|
By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.
If a file you wish to view isn't highlighted, and is a text file (not binary), please
let us know and we'll add colourisation support for it.
I started to program computers when I was 11 years old, as a hobbyist, programming in AMOS Basic and Blitz Basic for Amiga.
At 12 I had my first try with assembler, but it was too difficult at the time. Then, in the same year, I learned C and, after learning C, I was finally able to learn assembler (for Motorola 680x0).
Not sure, but probably between 12 and 13, I started to learn C++. I always programmed "in an object oriented way", but using function pointers instead of virtual methods.
At 15 I started to learn Pascal at school and to use Delphi. At 16 I started my first internship (using Delphi). At 18 I started to work professionally using C++ and since then I've developed my programming skills as a professional developer in C++ and C#, generally creating libraries that help other developers do their work easier, faster and with less errors.
Want more info or simply want to contact me?
Take a look at:
http://paulozemek.azurewebsites.net/
Or e-mail me at: paulozemek@outlook.com
Codeproject MVP 2012, 2015 & 2016
Microsoft MVP 2013-2014 (in October 2014 I started working at Microsoft, so I can't be a Microsoft MVP anymore).