Click here to Skip to main content
Click here to Skip to main content

Tagged as

Go to top

A Simple Stream Implementation for Debugging

, 9 Nov 2012
Rate this:
Please Sign up or sign in to vote.
This stream implementation is fast, thread-safe, easy to use, and very useful for debugging large distributed or concurrent projects.

Introduction

The DebugStream class is a simple implementation of .NET's System.IO.Stream API. It allows projects that use Streams to run locally, without pumping data through a Pipe or with TCP/IP.

Background

I was working on a project that used serial communication between two PCs. I wanted to be able to test and debug the project without having serial cables hooked up to my development machine all of the time. Using physical cables was particularly cumbersome because I use a laptop for development.

Using the code

The publicly accessible class is called DebugStream. However, DebugStream is not itself a Stream. In order to allow async reading and writing with System.IO.Stream's pre-written Beginxxx and Endxxx methods, the read stream and write stream have to be separate classes. The read stream can be accessed through the DebugStream.ReadStream property. Likewise, the write stream is accessible through the WriteStream property.  

To use the Stream, simply instantiate the DebugStream and access its properties. 

var ds = new DebugStream();  
var writeStream = ds.WriteStream; 
var readStream = ds.ReadStream; 

After doing so, you could write a file through the DebugStream concurrently as follows:

string srcPath = "filePath";
string destPath = "filePath";

System.Threading.Tasks.Parallel.Invoke(
	() =>
	{
	   using (var dest = new FileStream(destPath, FileMode.Create))
		  readStream.CopyTo(dest);
	},
	() => 
	{
	   using (var src = new FileStream(srcPath, FileMode.Open))
		  src.CopyTo(writeStream);
	   writeStream.Close();
	}
 );

I cannot get the code to upload as a .zip file, so here is the full source code of my Debug Stream:

#if DEBUG_STREAM
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.IO;
using System.Threading;

namespace DebugStream
{
   public class DebugStream
   {
      private const int DefaultBufferSize = 4096;

      private int _disposing;
      private readonly object _bufferLock = new object();

      private readonly byte[] _buffer;
      private int _head = 0;
      private int _tail = 0;
      private bool _empty = true;

      private Stream _readStream = null;
      private Stream _writeStream = null;

      public DebugStream() : this(DefaultBufferSize) {}

      public DebugStream(int bufferSize) {
         if (bufferSize <= 0)
            throw new ArgumentException("bufferSize must be positive and nonzero.", 
                      "bufferSize");

         _buffer = new byte[bufferSize];      
      }

      public Stream ReadStream {
         get {
            if (_readStream == null) {
               _readStream = new InternalReadStream(this);
            }
            
            return _readStream;
         }
      }

      public Stream WriteStream {
         get {
            if (_writeStream == null) {
               _writeStream = new InternalWriteStream(this);
            }
            
            return _writeStream;
         }
      }

      private void _checkDisposed() {
         if (_disposing != 0)
            throw new ObjectDisposedException(GetType().FullName);
      }

      protected void Dispose(bool disposing) {
         lock (_bufferLock) {
            Thread.VolatileWrite(ref _disposing, 1);

            // Prevent new callers from reading
            Monitor.PulseAll(_bufferLock);
         }
      }

      private class InternalReadStream : Stream
      {
         private readonly DebugStream _parent;

         internal InternalReadStream(DebugStream parent) {
            _parent = parent;
         }

         public override bool CanRead {
            get { return true; }
         }

         public override bool CanSeek {
            get { return false; }
         }

         public override bool CanWrite {
            get { return false; }
         }

         public override void Flush() {
            throw new NotImplementedException();
         }

         public override long Length {
            get {
               throw new NotSupportedException("Cannot get the length of this Stream.");
            }
         }

         public override long Position {
            get {
               throw new NotSupportedException("Cannot get the position of this Stream.");
            }
            set {
               throw new NotSupportedException("Cannot set the position of this Stream.");

            }
         }

         /// <summary>
         /// Blocking read
         /// </summary>
         /// <param name="buffer"></param>
         /// <param name="offset"></param>
         /// <param name="count"></param>
         /// <returns>Number of bytes read.</returns>
         public override int Read(byte[] buffer, int offset, int count) {
            if (buffer == null)
               throw new ArgumentNullException("buffer", "buffer is null");
            if (count < 0 || offset < 0)
               throw new ArgumentException("offset or count is negative.");
            if (offset + count > buffer.Length)
               throw new IndexOutOfRangeException("The sum of offset and count is larger than the buffer length.");

            var userBuffer = buffer;

            lock (_parent._bufferLock) {
               // Equality here could mean _buffer is empty or full
               while (_parent._head == _parent._tail && _parent._empty && (_parent._disposing == 0))
                  Monitor.Wait(_parent._bufferLock);

               if (_parent._disposing != 0 && _parent._head == _parent._tail && _parent._empty)
                  return 0;

               // There is data in the buffer.
               // Return what has been written
               int firstRead;
               int secondRead;

               // Write as much as we can
               int buffDiff = _parent._head - _parent._tail;
               if (buffDiff > 0) {
                  // No wrap around
                  firstRead = Math.Min(buffDiff, count);
                  secondRead = 0;
               } else {
                  // Buffer wrapped around
                  firstRead = Math.Min(_parent._buffer.Length - _parent._tail, count);
                  secondRead = Math.Min(count - firstRead, _parent._head);
               }

               // Do the first copy
               if (firstRead > 0) {
                  Buffer.BlockCopy(_parent._buffer, _parent._tail, userBuffer, offset, firstRead);

                  // Do the second copy if necessary
                  if (secondRead > 0) {
                     Buffer.BlockCopy(_parent._buffer, 0, userBuffer, firstRead, secondRead);
                  }

                  // Adjust the tail
                  _parent._tail = (_parent._tail + firstRead + secondRead);
                  if (_parent._tail >= _parent._buffer.Length)
                     _parent._tail -= _parent._buffer.Length;

                  if (_parent._tail == _parent._head)
                     _parent._empty = true;

                  // Notify writers that we read some data
                  Monitor.PulseAll(_parent._bufferLock);
               }

               return firstRead + secondRead;
            }
         }

         public override long Seek(long offset, SeekOrigin origin) {
            throw new NotSupportedException("Cannot seek on this Stream.");
         }

         public override void SetLength(long value) {
            throw new NotSupportedException("Cannot set the length of this Stream.");
         }

         public override void Write(byte[] buffer, int offset, int count) {
            throw new NotSupportedException("Cannot write to this Stream.");
         }

         protected override void Dispose(bool disposing) {
            _parent.Dispose(disposing);
            base.Dispose(disposing);
         }
      }

      private class InternalWriteStream : Stream
      {
         private readonly DebugStream _parent;

         internal InternalWriteStream(DebugStream parent) {
            _parent = parent;
         }

         public override bool CanRead {
            get { return false; }
         }

         public override bool CanSeek {
            get { return false; }
         }

         public override bool CanWrite {
            get { return true; }
         }

         public override long Length {
            get {
               throw new NotSupportedException("Cannot get the length of this Stream.");
            }
         }

         public override long Position {
            get {
               throw new NotSupportedException("Cannot get the position of this Stream.");
            }
            set {
               throw new NotSupportedException("Cannot set the position of this Stream.");

            }
         }

         public override void Flush() {
            
         }

         public override int Read(byte[] buffer, int offset, int count) {
            throw new NotSupportedException("Cannot read from this Stream.");
         }

         public override long Seek(long offset, SeekOrigin origin) {
            throw new NotSupportedException("Cannot seek on this Stream.");
         }

         public override void SetLength(long value) {
            throw new NotSupportedException("Cannot set the length of this Stream.");
         }

         public override void Write(byte[] buffer, int offset, int count) {
            // We will check for disposal after acquiring the lock
            if (buffer == null)
               throw new ArgumentNullException("buffer", "buffer is null");
            if (count < 0 || offset < 0)
               throw new ArgumentException("offset or count is negative.");
            if (offset + count > buffer.Length)
               throw new IndexOutOfRangeException("The sum of offset and count is larger than the buffer length.");

            var userBuffer = buffer;
            int remaining = count;

            lock (_parent._bufferLock) {
               while (remaining > 0) {
                  while (_parent._tail == _parent._head && !_parent._empty && (_parent._disposing == 0)) {
                     Monitor.Wait(_parent._bufferLock);
                  }

                  _parent._checkDisposed();

                  int firstWrite;
                  int secondWrite;

                  // Write as much as we can
                  if (_parent._empty) {
                     _parent._tail = _parent._head = 0;
                     firstWrite = Math.Min(remaining, _parent._buffer.Length);
                     secondWrite = 0;
                  } else if (_parent._tail > _parent._head) { 
                     // Buffer is wrapped around
                     firstWrite = Math.Min(remaining, _parent._tail - _parent._head);
                     secondWrite = 0;
                  } else {
                     // Buffer is not wrapped around
                     firstWrite = Math.Min(remaining, _parent._buffer.Length - _parent._head);
                     secondWrite = Math.Max(0, Math.Min(remaining - firstWrite, _parent._tail));
                  }

                  // Do the first copy
                  if (firstWrite > 0) {
                     Buffer.BlockCopy(userBuffer, count - remaining, _parent._buffer, _parent._head, firstWrite);

                     // Do the second copy if necessary
                     if (secondWrite > 0) {
                        Buffer.BlockCopy(userBuffer, count - remaining + firstWrite, _parent._buffer, 0, secondWrite);
                     }

                     // Adjust the head
                     _parent._head = (_parent._head + firstWrite + secondWrite);
                     if (_parent._head >= _parent._buffer.Length)
                        _parent._head -= _parent._buffer.Length;

                     _parent._empty = false;
                  }

                  remaining -= (firstWrite + secondWrite);

                  // Notify readers that we read some data
                  Monitor.PulseAll(_parent._bufferLock);
               }
            }
         }

         protected override void Dispose(bool disposing) {
            _parent.Dispose(disposing);
            base.Dispose(disposing);
         }
      }
   }
}
#endif

Points of Interest

The code is fairly straightforward, but there are some tricky synchronization issues that you should be wary of. Most importantly, DebugStream will not close until you explicitly close it. If you call into Write and do not correspondingly call Read, your code will deadlock until Close or Dispose is called on the stream. This is the same behavior as a serial port or network stream exhibits.

History

Published article.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

Share

About the Author

mlzg4

United States United States
No Biography provided

Comments and Discussions

 
-- There are no messages in this forum --
| Advertise | Privacy | Mobile
Web02 | 2.8.140926.1 | Last Updated 9 Nov 2012
Article Copyright 2012 by mlzg4
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid