A Simple Stream Implementation for Debugging
This stream implementation is fast, thread-safe, easy to use, and very useful for debugging large distributed or concurrent projects.
Introduction
The DebugStream
class is a simple implementation of .NET's System.IO.Stream
API. It allows projects that use Streams to run locally, without
pumping data through a Pipe or with TCP/IP.
Background
I was working on a project that used serial communication between two PCs. I wanted to be able to test and debug the project without having serial cables hooked up to my development machine all of the time. Using physical cables was particularly cumbersome because I use a laptop for development.
Using the code
The publicly accessible class is called DebugStream
. However, DebugStream
is not itself a Stream. In order to allow async reading
and writing with System.IO.Stream
's pre-written Beginxxx and Endxxx
methods, the read stream and write stream have to be separate classes. The read
stream can be accessed through the DebugStream.ReadStream
property. Likewise, the write stream is accessible through the WriteStream
property.
To use the Stream
, simply instantiate the DebugStream
and access its properties.
var ds = new DebugStream();
var writeStream = ds.WriteStream;
var readStream = ds.ReadStream;
After doing so, you could write a file through the DebugStream
concurrently as follows:
string srcPath = "filePath";
string destPath = "filePath";
System.Threading.Tasks.Parallel.Invoke(
() =>
{
using (var dest = new FileStream(destPath, FileMode.Create))
readStream.CopyTo(dest);
},
() =>
{
using (var src = new FileStream(srcPath, FileMode.Open))
src.CopyTo(writeStream);
writeStream.Close();
}
);
I cannot get the code to upload as a .zip file, so here is the full source code of my Debug Stream:
#if DEBUG_STREAM
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.IO;
using System.Threading;
namespace DebugStream
{
public class DebugStream
{
private const int DefaultBufferSize = 4096;
private int _disposing;
private readonly object _bufferLock = new object();
private readonly byte[] _buffer;
private int _head = 0;
private int _tail = 0;
private bool _empty = true;
private Stream _readStream = null;
private Stream _writeStream = null;
public DebugStream() : this(DefaultBufferSize) {}
public DebugStream(int bufferSize) {
if (bufferSize <= 0)
throw new ArgumentException("bufferSize must be positive and nonzero.",
"bufferSize");
_buffer = new byte[bufferSize];
}
public Stream ReadStream {
get {
if (_readStream == null) {
_readStream = new InternalReadStream(this);
}
return _readStream;
}
}
public Stream WriteStream {
get {
if (_writeStream == null) {
_writeStream = new InternalWriteStream(this);
}
return _writeStream;
}
}
private void _checkDisposed() {
if (_disposing != 0)
throw new ObjectDisposedException(GetType().FullName);
}
protected void Dispose(bool disposing) {
lock (_bufferLock) {
Thread.VolatileWrite(ref _disposing, 1);
// Prevent new callers from reading
Monitor.PulseAll(_bufferLock);
}
}
private class InternalReadStream : Stream
{
private readonly DebugStream _parent;
internal InternalReadStream(DebugStream parent) {
_parent = parent;
}
public override bool CanRead {
get { return true; }
}
public override bool CanSeek {
get { return false; }
}
public override bool CanWrite {
get { return false; }
}
public override void Flush() {
throw new NotImplementedException();
}
public override long Length {
get {
throw new NotSupportedException("Cannot get the length of this Stream.");
}
}
public override long Position {
get {
throw new NotSupportedException("Cannot get the position of this Stream.");
}
set {
throw new NotSupportedException("Cannot set the position of this Stream.");
}
}
/// <summary>
/// Blocking read
/// </summary>
/// <param name="buffer"></param>
/// <param name="offset"></param>
/// <param name="count"></param>
/// <returns>Number of bytes read.</returns>
public override int Read(byte[] buffer, int offset, int count) {
if (buffer == null)
throw new ArgumentNullException("buffer", "buffer is null");
if (count < 0 || offset < 0)
throw new ArgumentException("offset or count is negative.");
if (offset + count > buffer.Length)
throw new IndexOutOfRangeException("The sum of offset and count is larger than the buffer length.");
var userBuffer = buffer;
lock (_parent._bufferLock) {
// Equality here could mean _buffer is empty or full
while (_parent._head == _parent._tail && _parent._empty && (_parent._disposing == 0))
Monitor.Wait(_parent._bufferLock);
if (_parent._disposing != 0 && _parent._head == _parent._tail && _parent._empty)
return 0;
// There is data in the buffer.
// Return what has been written
int firstRead;
int secondRead;
// Write as much as we can
int buffDiff = _parent._head - _parent._tail;
if (buffDiff > 0) {
// No wrap around
firstRead = Math.Min(buffDiff, count);
secondRead = 0;
} else {
// Buffer wrapped around
firstRead = Math.Min(_parent._buffer.Length - _parent._tail, count);
secondRead = Math.Min(count - firstRead, _parent._head);
}
// Do the first copy
if (firstRead > 0) {
Buffer.BlockCopy(_parent._buffer, _parent._tail, userBuffer, offset, firstRead);
// Do the second copy if necessary
if (secondRead > 0) {
Buffer.BlockCopy(_parent._buffer, 0, userBuffer, firstRead, secondRead);
}
// Adjust the tail
_parent._tail = (_parent._tail + firstRead + secondRead);
if (_parent._tail >= _parent._buffer.Length)
_parent._tail -= _parent._buffer.Length;
if (_parent._tail == _parent._head)
_parent._empty = true;
// Notify writers that we read some data
Monitor.PulseAll(_parent._bufferLock);
}
return firstRead + secondRead;
}
}
public override long Seek(long offset, SeekOrigin origin) {
throw new NotSupportedException("Cannot seek on this Stream.");
}
public override void SetLength(long value) {
throw new NotSupportedException("Cannot set the length of this Stream.");
}
public override void Write(byte[] buffer, int offset, int count) {
throw new NotSupportedException("Cannot write to this Stream.");
}
protected override void Dispose(bool disposing) {
_parent.Dispose(disposing);
base.Dispose(disposing);
}
}
private class InternalWriteStream : Stream
{
private readonly DebugStream _parent;
internal InternalWriteStream(DebugStream parent) {
_parent = parent;
}
public override bool CanRead {
get { return false; }
}
public override bool CanSeek {
get { return false; }
}
public override bool CanWrite {
get { return true; }
}
public override long Length {
get {
throw new NotSupportedException("Cannot get the length of this Stream.");
}
}
public override long Position {
get {
throw new NotSupportedException("Cannot get the position of this Stream.");
}
set {
throw new NotSupportedException("Cannot set the position of this Stream.");
}
}
public override void Flush() {
}
public override int Read(byte[] buffer, int offset, int count) {
throw new NotSupportedException("Cannot read from this Stream.");
}
public override long Seek(long offset, SeekOrigin origin) {
throw new NotSupportedException("Cannot seek on this Stream.");
}
public override void SetLength(long value) {
throw new NotSupportedException("Cannot set the length of this Stream.");
}
public override void Write(byte[] buffer, int offset, int count) {
// We will check for disposal after acquiring the lock
if (buffer == null)
throw new ArgumentNullException("buffer", "buffer is null");
if (count < 0 || offset < 0)
throw new ArgumentException("offset or count is negative.");
if (offset + count > buffer.Length)
throw new IndexOutOfRangeException("The sum of offset and count is larger than the buffer length.");
var userBuffer = buffer;
int remaining = count;
lock (_parent._bufferLock) {
while (remaining > 0) {
while (_parent._tail == _parent._head && !_parent._empty && (_parent._disposing == 0)) {
Monitor.Wait(_parent._bufferLock);
}
_parent._checkDisposed();
int firstWrite;
int secondWrite;
// Write as much as we can
if (_parent._empty) {
_parent._tail = _parent._head = 0;
firstWrite = Math.Min(remaining, _parent._buffer.Length);
secondWrite = 0;
} else if (_parent._tail > _parent._head) {
// Buffer is wrapped around
firstWrite = Math.Min(remaining, _parent._tail - _parent._head);
secondWrite = 0;
} else {
// Buffer is not wrapped around
firstWrite = Math.Min(remaining, _parent._buffer.Length - _parent._head);
secondWrite = Math.Max(0, Math.Min(remaining - firstWrite, _parent._tail));
}
// Do the first copy
if (firstWrite > 0) {
Buffer.BlockCopy(userBuffer, count - remaining, _parent._buffer, _parent._head, firstWrite);
// Do the second copy if necessary
if (secondWrite > 0) {
Buffer.BlockCopy(userBuffer, count - remaining + firstWrite, _parent._buffer, 0, secondWrite);
}
// Adjust the head
_parent._head = (_parent._head + firstWrite + secondWrite);
if (_parent._head >= _parent._buffer.Length)
_parent._head -= _parent._buffer.Length;
_parent._empty = false;
}
remaining -= (firstWrite + secondWrite);
// Notify readers that we read some data
Monitor.PulseAll(_parent._bufferLock);
}
}
}
protected override void Dispose(bool disposing) {
_parent.Dispose(disposing);
base.Dispose(disposing);
}
}
}
}
#endif
Points of Interest
The code is fairly straightforward, but there are some tricky synchronization issues that you should be wary of. Most importantly, DebugStream
will not close
until you explicitly close it. If you call into Write
and do not correspondingly call Read
, your code will deadlock until
Close
or Dispose
is called on the stream. This is the same behavior as a serial port or network stream exhibits.
History
Published article.