Introduction
Have you ever opened a file stream with FileShare.None
so that external programs could not alter its contents, only to be frustrated when other parts of your same program needed to open the file in read mode? Have you ever wanted to read data from a binary file that contains a fixed size header and treat the data section as if it were a standalone stream? Have you ever been given a COM stream and needed to use it in multiple places but couldn't? I have! In the past, I had developed a kludgy code to work around the problems, but finally got fed up with it and decided to create a reusable class to fix my problems.
StreamMuxer
is a class that allows you to create multiple substreams with varying access rights from a single inner stream. The class handles locking, offsets, and rights so that classes that consume your streams can just work!
Using the code
The zip has a number of files. This is because I have a fairly large base library from which it is not trivial to extract components, for example. Sorry about that :( In the zip, I tried to delete most of the unnecessary references and code, but there are still five files overall. Sorry :(
BaseDisposable
This is a simple abstract class that makes it easier to use the IDisposable
interface. Right now, you may be scratching your head asking what on earth is complex about the IDisposable
interface. This class serves a few purposes for me:
- Allows me to put all the dispose code into a single place
OnDispose
, rather than in both Dispose
and my destructor.
- Ensure that
OnDispose
is called exactly once!
- Exposes a property where I can tell if I've been disposed.
- Makes it so I don't forget to call
GC.SupressFinalize
when Dispose
is called.
This class is fairly straightforward, and I tend to use it as a base when possible.
Validate
This is a static class where I dump my validation (especially argument) code. It contains functions like:
public static void StreamArg(Stream stream, string argName, bool mustBeReadable, bool mustBeWriteable, bool mustBeSeekable)
public static void NonNullArg(object arg, string argName)
public static void Relation(IComparable a, string argNameA, IComparable b, string argNameB, Equalities eq)
public static void ArrayArg(IList arg, int offset, int size, int minSize, string argName, bool nullable)
If you are a bean counter, you'll realize that calling:
public void Foo(object o)
{
Validate.NonNullArg(o, "o");
}
is less efficient than:
public void Foo(object o)
{
if (o == null) throw new ArgumentNullException("o");
}
Frankly, I don't care! The reason I use these classes are so I can have consistent error checking. This to me is more important than saving a few cycles. Besides, calls like Validate.Relation
are long enough that you could easily get lazy and not do the validation work.
StreamMuxer
Below is some of the code for the actual StreamMuxer
class (no need to repeat myself since it is included in the zip). The only real important thing to note is that all of the real stream operations must take place from within a locked context! This is critical because the operations are not atomic since the sub-streams must constantly reposition the pointer in the inner stream before performing functions like Read
.
using System;
using System.Diagnostics;
using System.IO;
namespace RevolutionaryStuff.JBT.Streams
{
public class StreamMuxer : BaseDisposable
{
protected Stream Inner
{
get
{
if (this.IsDisposed)
throw new ObjectDisposedException("StreamMuxer");
return this.Inner_p;
}
}
private readonly Stream Inner_p;
private readonly bool LeaveOpen;
#region Constructors
public StreamMuxer(Stream inner)
: this(inner, false)
{ }
public StreamMuxer(Stream inner, bool leaveOpen)
{
Validate.StreamArg(inner, "inner", false, false, true);
this.Inner_p = inner;
this.LeaveOpen = leaveOpen;
}
protected override void OnDispose(bool disposing)
{
base.OnDispose(disposing);
this.Inner_p.Flush();
if (!this.LeaveOpen)
{
this.Inner_p.Close();
this.Inner_p.Dispose();
}
}
#endregion
#region Quickies...
...
public void Write(byte[] buffer, int offset,
int count, long offsetInFile)
{
Validate.ArrayArg(buffer, offset, count);
if (!Inner.CanWrite) throw new NotSupportedException();
lock (Inner)
{
Inner.Position = offsetInFile;
Inner.Write(buffer, offset, count);
}
}
...
#endregion
#region Sub-Stream Creation
public Stream Create()
{
return Create(true, true);
}
...
public Stream Create(bool canRead, bool canWrite, long offset, long size)
{
return new MyStream(this, canRead, canWrite, offset, size);
}
#endregion
private class MyStream : Stream
{
private readonly StreamMuxer Muxer;
private readonly Stream Inner;
private readonly long Offset;
private readonly long Size;
private bool IsClosed;
#region Constructors
public MyStream(StreamMuxer muxer, bool canRead,
bool canWrite, long offset, long size)
{
Validate.NonNullArg(muxer, "muxer");
this.Muxer = muxer;
this.Inner = muxer.Inner;
Validate.Between(offset, "offset", 0, Inner.Length);
if (size != -1)
{
Validate.Between(size, "size", 0,
Inner.Length - offset + 1);
}
this.Offset = offset;
this.Size = size;
this.CanRead_p = canRead;
this.CanWrite_p = canWrite;
}
#endregion
#region Stream Overrides
...
public override long Seek(long offset, SeekOrigin origin)
{
return StreamStuff.SeekViaPos(this, offset, origin);
}
...
public override long Length
{
get
{
if (this.IsClosed) throw new NotNowException();
if (this.Size == -1)
{
return Inner.Length - this.Offset;
}
return this.Size;
}
}
public override long Position
{
get
{
if (this.IsClosed) throw new NotNowException();
return Position_p;
}
set
{
if (this.IsClosed) throw new NotNowException();
try
{
Validate.Between(value, "value", 0, this.Length);
}
catch (Exception ex)
{
throw new NotSupportedException("New" +
" Position is past the acceptable bounds", ex);
}
Position_p = value;
}
}
private long Position_p;
...
public override int Read(byte[] buffer, int offset, int count)
{
if (!this.CanRead) throw new NotNowException();
Validate.ArrayArg(buffer, offset, count);
count = (int)Math.Min(count, Length - Position);
lock (Inner)
{
Inner.Position = this.Offset + Position;
int read = Inner.Read(buffer, offset, count);
this.Position += read;
return read;
}
}
public override void Write(byte[] buffer, int offset, int count)
{
if (!this.CanWrite) throw new ReadOnlyException();
this.Muxer.Write(buffer, offset, count, this.Position);
this.Position += count;
}
#endregion
}
}
}
The real gottcha in the StreamMuxer
is properly handling exceptions. From using and decompiling the BCL, you may have come across ObjectDisposedObject
s, InvalidOperationException
s, ... My code does not throw the same exceptions in the same situations as do the BCL streams. Perhaps, I'll fix this in the future.
Example Usage
Below (and of course in the zip) is a dumb example of how to use the StreamMuxer
. In this example, I create (using the standard techniques of a BinaryWriter
and XmlSerializer
) a single stream which interleaves binary and XML content. Using the StreamMuxer
, I then take this original stream and open sub-streams that are passed back to the XmlSerializer
and BinaryReader
to read the data back in. This shows off the property to create a substream that is positioned and sized smaller than the original, and this is needed in certain applications.
using System;
using System.Diagnostics;
using System.IO;
using System.Xml.Serialization;
using RevolutionaryStuff.JBT.Streams;
namespace StreamMuxerExample
{
public class TestObj
{
public static readonly XmlSerializer Serializer =
new XmlSerializer(typeof(TestObj));
private static readonly Random R =
new Random(Environment.TickCount);
public int A;
public string B;
public byte[] RandomBinaryData;
public TestObj(){}
public TestObj(int a, string b)
{
this.A = a;
this.B = b;
this.RandomBinaryData = new byte[R.Next(256)];
R.NextBytes(this.RandomBinaryData);
}
public override string ToString()
{
return string.Format("TestObj: A={0} B={1}" +
" RandomBinaryDataLen={2}", this.A,
this.B, this.RandomBinaryData.Length);
}
}
class Program
{
static void Main(string[] args)
{
MemoryStream st = new MemoryStream();
Console.WriteLine("Creating Stream==========");
Debug.WriteLine("Creating Stream==========");
BinaryWriter w = new BinaryWriter(st);
for (int z = 0; z < 31; ++z)
{
st.Position = 0;
w.Write(z+1);
long objSizeOffset = st.Length;
st.SetLength(st.Length + 8);
st.Seek(0, SeekOrigin.End);
TestObj o = new TestObj(z,
string.Format("Test Object #{0}", z));
Console.WriteLine(o);
Debug.WriteLine(o);
TestObj.Serializer.Serialize(st, o);
long size = st.Length - objSizeOffset - 8;
st.Position = objSizeOffset;
w.Write(size);
st.Flush();
}
byte[] buf = st.ToArray();
Console.WriteLine("Reading Stream==========");
Debug.WriteLine("Reading Stream==========");
st.Position = 0;
using (StreamMuxer muxer = new StreamMuxer(st))
{
using (Stream binaryPartsStream =
muxer.Create(true, false))
{
BinaryReader r = new BinaryReader(binaryPartsStream);
int objCnt = r.ReadInt32();
long basePos = 4;
for (int z = 0; z < objCnt; ++z)
{
long size = r.ReadInt64();
basePos += 8;
binaryPartsStream.Seek(size, SeekOrigin.Current);
if (z % 2 == 0)
{
long s = size + ((z % 8 == 2) ? 40 : 0);
using (Stream xmlPartStream =
muxer.Create(true, false, basePos, s))
{
try
{
Debug.WriteLine(string.Format("XmlPartStream:" +
" BEFORE read of obj {0}. xmlSize={1}" +
" size={2} pos={3}", z, size,
xmlPartStream.Length,
xmlPartStream.Position));
TestObj testObj = (TestObj)
TestObj.Serializer.Deserialize(xmlPartStream);
Console.WriteLine(testObj);
Debug.WriteLine(testObj);
Debug.WriteLine(string.Format("XmlPartStream:" +
" AFTER read of obj {0}. xmlSize={1}" +
" size={2} pos={3}", z, size,
xmlPartStream.Length,
xmlPartStream.Position));
}
catch (Exception ex)
{
Debug.WriteLine("Object creation failed since" +
" stream size was expanded past bounds " +
"and serializer could not recognize " +
"binary data after the xml data!");
}
}
}
basePos += size;
}
}
}
}
}
}
This is my third code submission. If you like what I've done, vote! If you don't, post messages and ask questions. I have a bunch of code to contribute, but since it takes time, will only do so if people are getting value from it.
Happy coding :)
History
- 2/9/2006 - First submission.
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.