Click here to Skip to main content
Click here to Skip to main content

StreamMuxer

, 14 Feb 2006
Rate this:
Please Sign up or sign in to vote.
A class that allows you to create multiple substreams with varying access rights from a single stream.

Introduction

Have you ever opened a file stream with FileShare.None so that external programs could not alter its contents, only to be frustrated when other parts of your same program needed to open the file in read mode? Have you ever wanted to read data from a binary file that contains a fixed size header and treat the data section as if it were a standalone stream? Have you ever been given a COM stream and needed to use it in multiple places but couldn't? I have! In the past, I had developed a kludgy code to work around the problems, but finally got fed up with it and decided to create a reusable class to fix my problems.

StreamMuxer is a class that allows you to create multiple substreams with varying access rights from a single inner stream. The class handles locking, offsets, and rights so that classes that consume your streams can just work!

Using the code

The zip has a number of files. This is because I have a fairly large base library from which it is not trivial to extract components, for example. Sorry about that Frown | :( In the zip, I tried to delete most of the unnecessary references and code, but there are still five files overall. Sorry :(

BaseDisposable

This is a simple abstract class that makes it easier to use the IDisposable interface. Right now, you may be scratching your head asking what on earth is complex about the IDisposable interface. This class serves a few purposes for me:

  • Allows me to put all the dispose code into a single place OnDispose, rather than in both Dispose and my destructor.
  • Ensure that OnDispose is called exactly once!
  • Exposes a property where I can tell if I've been disposed.
  • Makes it so I don't forget to call GC.SupressFinalize when Dispose is called.

This class is fairly straightforward, and I tend to use it as a base when possible.

Validate

This is a static class where I dump my validation (especially argument) code. It contains functions like:

  • public static void StreamArg(Stream stream, string argName, bool mustBeReadable, bool mustBeWriteable, bool mustBeSeekable)
  • public static void NonNullArg(object arg, string argName)
  • public static void Relation(IComparable a, string argNameA, IComparable b, string argNameB, Equalities eq)
  • public static void ArrayArg(IList arg, int offset, int size, int minSize, string argName, bool nullable)

If you are a bean counter, you'll realize that calling:

public void Foo(object o) 
{
    Validate.NonNullArg(o, "o"); 
}

is less efficient than:

public void Foo(object o)
{
    if (o == null) throw new ArgumentNullException("o");
}

Frankly, I don't care! The reason I use these classes are so I can have consistent error checking. This to me is more important than saving a few cycles. Besides, calls like Validate.Relation are long enough that you could easily get lazy and not do the validation work.

StreamMuxer

Below is some of the code for the actual StreamMuxer class (no need to repeat myself since it is included in the zip). The only real important thing to note is that all of the real stream operations must take place from within a locked context! This is critical because the operations are not atomic since the sub-streams must constantly reposition the pointer in the inner stream before performing functions like Read.

using System;
using System.Diagnostics;
using System.IO;

namespace RevolutionaryStuff.JBT.Streams
{
    /// <summary>
    /// Allows a stream to be sub-divided into
    /// independent logical streams with varying access rights
    /// </summary>
    public class StreamMuxer : BaseDisposable
    {
        /// <summary>
        /// The underlying stream
        /// </summary>
        /// <remarks>
        /// While tempting to make this public, doing so is dangerous
        /// as the outside world could easily access the member's without 
        /// using the appropriate locks, which would kill us in multithreaded
        /// situations
        /// </remarks>
        protected Stream Inner
        {
            get
            {
                if (this.IsDisposed)
                    throw new ObjectDisposedException("StreamMuxer");
                return this.Inner_p;
            }
        }
        private readonly Stream Inner_p;

        /// <summary>
        /// When true, we should leave
        /// the inner stream open when the muxer
        /// is either closed or disposed.
        /// </summary>
        private readonly bool LeaveOpen;

        #region Constructors

        public StreamMuxer(Stream inner)
            : this(inner, false)
        { }

        public StreamMuxer(Stream inner, bool leaveOpen)
        {
            Validate.StreamArg(inner, "inner", false, false, true);
            this.Inner_p = inner;
            this.LeaveOpen = leaveOpen;
        }

        protected override void OnDispose(bool disposing)
        {
            base.OnDispose(disposing);
            this.Inner_p.Flush();
            if (!this.LeaveOpen)
            {
                this.Inner_p.Close();
                this.Inner_p.Dispose();
            }
        }

        #endregion

        #region Quickies...

        ...
        
        /// <summary>
        /// Write to a section of the underlying stream
        /// </summary>
        /// <param name="buffer">An array of bytes.
        /// This method copies count bytes from buffer
        /// to the current stream</param>
        /// <param name="offset">The zero-based byte
        /// offset in buffer at which to begin copying bytes
        /// to the current stream</param>
        /// <param name="count">The number of bytes
        // to be written to the current stream.</param>
        /// <param name="offsetInFile">The place in the current
        /// stream to where we should begin to write data</param>
        public void Write(byte[] buffer, int offset, 
                          int count, long offsetInFile)
        {
            Validate.ArrayArg(buffer, offset, count);
            if (!Inner.CanWrite) throw new NotSupportedException();
            lock (Inner)
            {
                Inner.Position = offsetInFile;
                Inner.Write(buffer, offset, count);
            }
        }
        
        ...

        #endregion

        #region Sub-Stream Creation

        /// <summary>
        /// Create a new stream the full size of the underlying stream
        /// With read/write access
        /// </summary>
        /// <returns>A stream</returns>
        public Stream Create()
        {
            return Create(true, true);
        }
        
        ...

        /// <summary>
        /// Create a new stream that is a subset of the underlying
        /// stream with the specified access rights
        /// </summary>
        /// <param name="canRead">When true,
        /// the new stream has read access</param>
        /// <param name="canWrite">When true,
        /// the new stream has write access</param>
        /// <param name="offset">The offset into the original
        /// stream to use as the base for this new stream</param>
        /// <param name="size">The size of this new stream,
        /// -1 if it should be adjusted according
        /// to the size of the underlying stream</param>
        /// <returns>A Stream</returns>
        public Stream Create(bool canRead, bool canWrite, long offset, long size)
        {
            return new MyStream(this, canRead, canWrite, offset, size);
        }
        
        #endregion

        /// <summary>
        /// Our sub-stream
        /// </summary>
        private class MyStream : Stream
        {
            /// <summary>
            /// The parent muxer
            /// </summary>
            private readonly StreamMuxer Muxer;
            /// <summary>
            /// The muxer's stream
            /// </summary>
            private readonly Stream Inner;
            /// <summary>
            /// The offset into the parent "Inner"
            /// stream which serves as our base
            /// </summary>
            private readonly long Offset;
            /// <summary>
            /// The size of this stream. When -1, this 
            /// is the determined by the underlying stream
            /// </summary>
            private readonly long Size;
            /// <summary>
            /// Have we been closed
            /// </summary>
            private bool IsClosed;

            #region Constructors

            /// <summary>
            /// Construct a new stream
            /// </summary>
            /// <param name="muxer">The muxer</param>
            /// <param name="canRead">When true,
            /// the caller can read from this new stream</param>
            /// <param name="canWrite">When true,
            /// the caller can write to this new stream</param>
            /// <param name="offset">The offset into
            /// the parent stream to use as a base</param>
            /// <param name="size">The size of this new
            /// stream, -1 if determined by the parent</param>
            public MyStream(StreamMuxer muxer, bool canRead, 
                            bool canWrite, long offset, long size)
            {
                Validate.NonNullArg(muxer, "muxer");

                this.Muxer = muxer;
                this.Inner = muxer.Inner;
                Validate.Between(offset, "offset", 0, Inner.Length);
                if (size != -1)
                {
                    Validate.Between(size, "size", 0, 
                                     Inner.Length - offset + 1);
                }
                this.Offset = offset;
                this.Size = size;
                this.CanRead_p = canRead;
                this.CanWrite_p = canWrite;
            }

            #endregion

            #region Stream Overrides

            ...

            public override long Seek(long offset, SeekOrigin origin)
            {
                return StreamStuff.SeekViaPos(this, offset, origin);
            }
            
            ...

            public override long Length
            {
                get
                {
                    if (this.IsClosed) throw new NotNowException();
                    if (this.Size == -1)
                    {
                        return Inner.Length - this.Offset;
                    }
                    return this.Size;
                }
            }

            public override long Position
            {
                get 
                {
                    if (this.IsClosed) throw new NotNowException();
                    return Position_p; 
                }
                set
                {
                    if (this.IsClosed) throw new NotNowException();
                    try
                    {
                        Validate.Between(value, "value", 0, this.Length);
                    }
                    catch (Exception ex)
                    {
                        //we rethrow so we can support 
                        //the accepted Stream exception conventions
                        throw new NotSupportedException("New" + 
                              " Position is past the acceptable bounds", ex);
                    }
                    Position_p = value;
                }
            }
            private long Position_p;
            
            ...

            public override int Read(byte[] buffer, int offset, int count)
            {
                if (!this.CanRead) throw new NotNowException();
                Validate.ArrayArg(buffer, offset, count);
                count = (int)Math.Min(count, Length - Position);
                lock (Inner)
                {
                    Inner.Position = this.Offset + Position;
                    int read = Inner.Read(buffer, offset, count);
                    this.Position += read;
                    return read;
                }
            }

            public override void Write(byte[] buffer, int offset, int count)
            {
                if (!this.CanWrite) throw new ReadOnlyException();
                this.Muxer.Write(buffer, offset, count, this.Position);
                this.Position += count;
            }

            #endregion
        }
    }
}

The real gottcha in the StreamMuxer is properly handling exceptions. From using and decompiling the BCL, you may have come across ObjectDisposedObjects, InvalidOperationExceptions, ... My code does not throw the same exceptions in the same situations as do the BCL streams. Perhaps, I'll fix this in the future.

Example Usage

Below (and of course in the zip) is a dumb example of how to use the StreamMuxer. In this example, I create (using the standard techniques of a BinaryWriter and XmlSerializer) a single stream which interleaves binary and XML content. Using the StreamMuxer, I then take this original stream and open sub-streams that are passed back to the XmlSerializer and BinaryReader to read the data back in. This shows off the property to create a substream that is positioned and sized smaller than the original, and this is needed in certain applications.

using System;
using System.Diagnostics;
using System.IO;
using System.Xml.Serialization;
using RevolutionaryStuff.JBT.Streams;

namespace StreamMuxerExample
{
    public class TestObj
    {
        public static readonly XmlSerializer Serializer = 
                      new XmlSerializer(typeof(TestObj));
        private static readonly Random R = 
                       new Random(Environment.TickCount);

        public int A;
        public string B;
        public byte[] RandomBinaryData;

        public TestObj(){}
        public TestObj(int a, string b)
        {
            this.A = a;
            this.B = b;
            this.RandomBinaryData = new byte[R.Next(256)];
            R.NextBytes(this.RandomBinaryData);            
        }

        public override string ToString()
        {
            return string.Format("TestObj: A={0} B={1}" + 
                   " RandomBinaryDataLen={2}", this.A, 
                   this.B, this.RandomBinaryData.Length);
        }
    }

    /// <summary>
    /// This is an example program to show
    /// some of the benefits of the Stream Muxer
    /// </summary>
    class Program
    {
        static void Main(string[] args)
        {
            //The format is #Objects, 
            //SizeObj0, Obj0... SizeObjN, ObjN
            //This is a combo of binary and xml data
            MemoryStream st = new MemoryStream();

            //Create a stream the old fashioned way
            Console.WriteLine("Creating Stream==========");
            Debug.WriteLine("Creating Stream==========");
            BinaryWriter w = new BinaryWriter(st);
            for (int z = 0; z < 31; ++z)
            {
                st.Position = 0;
                w.Write(z+1);
                long objSizeOffset = st.Length;
                st.SetLength(st.Length + 8);
                st.Seek(0, SeekOrigin.End);
                TestObj o = new TestObj(z, 
                            string.Format("Test Object #{0}", z));
                Console.WriteLine(o);
                Debug.WriteLine(o);
                TestObj.Serializer.Serialize(st, o);
                long size = st.Length - objSizeOffset - 8;
                st.Position = objSizeOffset;
                w.Write(size);
                st.Flush();
            }

            //So you can examine the stream 
            //contents in the memory window
            byte[] buf = st.ToArray();

            //now let's de-serialize every other object with the muxer
            Console.WriteLine("Reading Stream==========");
            Debug.WriteLine("Reading Stream==========");
            st.Position = 0;
            using (StreamMuxer muxer = new StreamMuxer(st))
            {
                using (Stream binaryPartsStream = 
                              muxer.Create(true, false))
                {
                    BinaryReader r = new BinaryReader(binaryPartsStream);
                    int objCnt = r.ReadInt32();
                    long basePos = 4;
                    for (int z = 0; z < objCnt; ++z)
                    {
                        long size = r.ReadInt64();
                        basePos += 8;
                        //jump the binary stream past the xml data
                        //note that i am seeking past 
                        //the data, not setting the position.
                        //this should show the independance of the 2 streams!
                        binaryPartsStream.Seek(size, SeekOrigin.Current);
                        //if every other...
                        if (z % 2 == 0)
                        {
                            //create a new readonly stream 
                            //positioned at the current spot
                            //But every now and then, artificially 
                            //expand the new stream to force an exeption
                            long s = size + ((z % 8 == 2) ? 40 : 0);
                            using (Stream xmlPartStream = 
                                   muxer.Create(true, false, basePos, s))
                            {
                                try
                                {
                                    Debug.WriteLine(string.Format("XmlPartStream:" + 
                                          " BEFORE read of obj {0}. xmlSize={1}" + 
                                          " size={2} pos={3}", z, size, 
                                          xmlPartStream.Length, 
                                          xmlPartStream.Position));
                                    TestObj testObj = (TestObj)
                                     TestObj.Serializer.Deserialize(xmlPartStream);
                                    Console.WriteLine(testObj);
                                    Debug.WriteLine(testObj);
                                    Debug.WriteLine(string.Format("XmlPartStream:" + 
                                          " AFTER read of obj {0}. xmlSize={1}" + 
                                          " size={2} pos={3}", z, size, 
                                          xmlPartStream.Length, 
                                          xmlPartStream.Position));
                                }
                                catch (Exception ex)
                                {
                                    Debug.WriteLine("Object creation failed since" + 
                                          " stream size was expanded past bounds " + 
                                          "and serializer could not recognize " + 
                                          "binary data after the xml data!");
                                }
                            }
                        }
                        basePos += size;
                    }
                }
            }   
        }
    }
}

This is my third code submission. If you like what I've done, vote! If you don't, post messages and ask questions. I have a bunch of code to contribute, but since it takes time, will only do so if people are getting value from it.

Happy coding Smile | :)

History

  • 2/9/2006 - First submission.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here

About the Author

Jason.Thomas

United States United States
No Biography provided

Comments and Discussions

 
GeneralMy vote of 3 PinmemberMember 454611426-Dec-12 9:44 
GeneralThanks! PinmemberGrant Hess10-Dec-09 10:31 
GeneralExcellent! PinmemberAlexey A. Popov21-Mar-06 23:13 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.

| Advertise | Privacy | Mobile
Web04 | 2.8.140721.1 | Last Updated 14 Feb 2006
Article Copyright 2006 by Jason.Thomas
Everything else Copyright © CodeProject, 1999-2014
Terms of Service
Layout: fixed | fluid