Click here to Skip to main content
15,888,610 members
Articles / Programming Languages / C#

A replacement for MemoryStream

Rate me:
Please Sign up or sign in to vote.
4.93/5 (76 votes)
19 Mar 2012CPOL7 min read 190.9K   4.8K   143   41
Explains the cause of the OutOfMemoryExceptions which commonly occur when using MemoryStream, and introduces a replacement which uses a dynamic list of memory segments as a backing store, as opposed to a single array, making it more resilient with large datasets.

Introduction

This article explains the cause of the ambiguous OutOfMemoryException that is common when using MemoryStream with large datasets, and introduces a class, MemoryTributary, which is intended as an alternative to .NET's MemoryStream that is capable of handling large amounts of data.

Background 

When attempting to use MemoryStream with relatively large datasets (in the order of tens of MB), it is common to encounter the OutOfMemoryException. This is not due to, as the name would imply, having reached limitations of the system memory, but in fact those of the process' virtual address space.

When a process requests memory from Windows, the memory manager is not allocating address space from RAM, but 'pages' - chunks (typically 4KB) of storage - which can exist in the RAM, or on disk, or anywhere the memory manager decides to store them. The pages are mapped into the address space of the process, so, for example, when the process attempts to access the memory starting at [0xAF758000], it is in reality accessing the byte at the beginning of [Page 496], wherever Page 496 happens to be. The process therefore can allocate as much memory as it likes so long as the disk space holds out, and can map as much of it as will fit into its virtual address space at any one time - provided, that is, these allocations are made in a large number of small chunks.

This is because the process address space is fragmented: large sections are taken up by the Operating System, others for the executable image, libraries, and all the other previous allocations. Once the memory manager has allocated a set of pages equivalent to the requested size, the process must map them into its address space - but if the address space does not contain a contiguous section of the requested size, the pages cannot be mapped, and the allocation fails with the OutOfMemoryException.

Image 1

The process is not running out of space, or even addresses: it is running out of sequential addresses. To see this (if you are on 64 bit), target the following program at x86 and run it, then target it at x64 and see how much farther it gets.

C#
static void Main(string[] args)
{
    List<byte[]> allocations = new List<byte[]>();
    for (int i = 0; true; i++)
    {
        try
        {
            allocations.Add(new byte[i * i * 10]);
        }
        catch (OutOfMemoryException e)
        {
            Console.Write(string.Format("Performed {0} allocations",i));
        }
    }
}

The current implementation of MemoryStream uses a single byte array as a backing store. When a write is attempted to a position in the stream, larger than the size of this array, it is doubled in size. Depending on the behaviour of the program, the backing store of MemoryStream can soon require more contiguous memory than is available in the virtual address space.

Using the code

The solution is to not require contiguous memory to store the data contained in the stream. MemoryTributary uses a dynamic list of 4KB blocks as the backing store, which are allocated on demand as the stream is used.

MemoryTributary derives from Stream and so is used like any other Stream, such as MemoryStream.

MemoryTributary however is intended as an alternative to MemoryStream not a drop-in replacement, due to a number of caveats:

  1. MemoryTributary does not implement all of MemoryStream's constructors (because currently there are no artificial capacity limits). It is capable of initialising from a byte[].
  2. MemoryTriburary subclasses Stream, not MemoryStream, and so cannot be used in place where a member accepts MemoryStream explicitly.
  3. MemoryTributary does not implement GetBuffer() as there is no single backing buffer to return; the functional equivalent is ToArray() but use this with caution.

When using MemoryTributary, be aware of the following: 

  1. Blocks are allocated on demand when accessed (e.g., by a read or write call). Before a read takes place, the Position is checked against the Length to ensure the read operation is performed within the bounds of the stream; Length is just a sanity check however and does not correspond to the current amount of allocated memory, but rather how much has been written to the stream. Setting Length does not allocate memory, but it does allow reads to proceed on undefined data.
  2. C#
    //A new MemoryTributary object: length is 0, position is 0, no memory has been allocated
    MemoryTributary d = new MemoryTributary();
    
    //returns -1 because Length is 0, no memory allocated
    int a = d.ReadByte();
    
    //Length now reports 10000 bytes, but no memory is allocted
    d.SetLength(10000);
    
    //three blocks of memory are now allocated,
    //but b is undefined because they have not been initialised
    int b = d.ReadByte();
  3. Memory is allocated in sequential blocks, that is, if the first block to be accessed is block 3, blocks 1 and 2 are automatically allocated.
  4. MemoryTributary includes the method ToArray() but this is unsafe as it unavoidably suffers from the problem that this class' existence is trying to solve: the need to allocate a large amount of contiguous memory.
  5. Instead, use MemoryTributary's ReadFrom() and WriteTo() methods to have MemoryTributary interact with other streams when operating on large amounts of data. 

Performance Metrics

Performance both in terms of capacity and speed, of MemoryStream and MemoryTributary, is difficult to predict as it is dependant on a number of factors, one of the most significant being the fragmentation and memory usage of the current process - a process which allocates a lot of memory will use up large contiguous sections faster than one that does not - it is possible though to get an idea of the relative performance characteristics of the two by taking measurements in controlled conditions. 

The tables below compare the capacity and access times of MemoryTributary and MemoryStream.  In all cases the process instance tested only the target stream (i.e. a new process was created so a test on MemoryStream did not impact one on MemoryTributary, and no allocations were made other than for the purpose of reading or writing). 

Capacity 

To perform this test, a loop wrote the contents of a 1MB array to the target stream over and over until the stream threw an OutOfMemoryException, which was caught and the total number of writes before the exception was returned.

Stream Average Stream Length Before Exception (MB)
MemoryStream 488
MemoryTributary 1272

(This test process targeted x86.) 

Speed (Access Times)

For these measurements, a set amount of data was written to, then read from the stream. The data was written in random lengths between 1KB and 1MB, to and from a 1MB byte array. A Stopwatch instance was used to determine the amount of time it took to write, then read, the specified amount of data. These are applicable  only to sequential accesses as no seeks were made, other than for the start of the read process.

Each process executed its test six times, on the same object, so the variations between the results for a given stream for a given test, indicate the time taken allocating memory vs. that taken accessing it. 

Stream Test Execution Times (ms)
Amount written and read (MB) MemoryStream  MemoryTributary (4KB Block) MemoryTributary (64KB Block) MemoryTributary (1MB Block)
10  10 13 11 7
3 5 3 3
3 6 3 3
3 5 3 3
4 5 3 3
3 6 3 3
100  100 148 123 52
34 54 42 35
34 48 35 34
35 47 36 35
34 48 36 35
35 51 35 35
500  516 390 290 237
167 222 184 170
168 186 154 167
167 187 151 168
167 186 151 168
167 185 153 168
1,000  1185 1585 1299 485
347 547 431 344
343 463 350 345
338 462 350 345
3377 461 349 345
339 465 351 343 

The results indicate that MemoryTributary can store approximately double the data of MemoryStream in ideal conditions. The access times depend on the block setting of MemoryTributary; the initial allocations are marginally faster than MemoryStream but access times are equivalent. The smaller the block the more allocations must be made, but the less susceptible to memory fragmentation the instance becomes.

The attached source has a default block size of 64KB, which is set by the blockSize member. 

Points of Interest 

See Eric Lippert's post entitled '“Out Of Memory” Does Not Refer to Physical Memory' for a full explanation of the cause of the OutOfMemoryException

The methods of MemoryTributary access the appropriate blocks via private properties - the idea being that the way the blocks are stored could be easily swapped out if the simple List<> becomes untenable, without having to alter every member of the class. MemoryTributary has been tested with a few hundred MB, and fails with the OutOfMemoryException when the amount reaches approximately 1GB.

The MSDN page for MemoryStream is here.

Yes, tributary is the most creative synonym for stream I could come up with.

History

  • 15/03/2012 - First version. 
  • 19/03/2012
    • Updated Read() method to use a long counter internally as opposed to an int; to support streams of 10s of GB.
    • Updated article with performance measurements.
    • When a capacity is passed to MemoryTributary's constructor, MemoryTributary will now allocate the equivalent number of blocks.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
United Kingdom United Kingdom
This member has not yet provided a Biography. Assume it's interesting and varied, and probably something to do with programming.

Comments and Discussions

 
QuestionDon't use 1MB Block Pin
Brent Wang4-Oct-19 19:42
Brent Wang4-Oct-19 19:42 
QuestionError: Unable to read beyond the end of the stream. Pin
TerriTop23-May-18 10:29
TerriTop23-May-18 10:29 
AnswerRe: Error: Unable to read beyond the end of the stream. Pin
TerriTop24-May-18 5:31
TerriTop24-May-18 5:31 
QuestionSpeed (Access Times) Pin
Member 1223739716-Apr-18 0:34
Member 1223739716-Apr-18 0:34 
QuestionStartup time issue Pin
Maury Markowitz7-Jun-16 6:38
Maury Markowitz7-Jun-16 6:38 
Questionanother solution Pin
manchanx3-Feb-15 11:10
professionalmanchanx3-Feb-15 11:10 
AnswerRe: another solution Pin
InvisibleMedia26-May-17 23:19
professionalInvisibleMedia26-May-17 23:19 
QuestionHow about Gzip? Pin
Wasia30-Mar-14 6:57
Wasia30-Mar-14 6:57 
GeneralMy vote of 5 Pin
Clodoaldo29-Jan-14 0:53
Clodoaldo29-Jan-14 0:53 
QuestionOutOfMemoryException for ToArray Pin
Member 8122949-Dec-13 2:54
Member 8122949-Dec-13 2:54 
QuestionTemp file Pin
Br12-Oct-13 10:14
Br12-Oct-13 10:14 
GeneralMy vote of 5 Pin
Renju Vinod13-May-13 0:27
professionalRenju Vinod13-May-13 0:27 
GeneralMy vote of 5 Pin
Oleksandr Kulchytskyi20-Apr-13 0:53
professionalOleksandr Kulchytskyi20-Apr-13 0:53 
QuestionInteresting but... Pin
jfriedman11-Apr-13 10:59
jfriedman11-Apr-13 10:59 
The time spent allocating new memory is mostly what causes your paging problem so it's kind of like stepping on your self..

I think the better answer to something like this from an engineering standpoint is just to have a facade around the stream and when you have data to write to you just put the pointer in the place you need it.

Given bytes {01235} you will then insert (4, 4) giving you



Then when irritating the stream as a whole (through the facade) you will see

{0123
(4)
5}

Each position is maintained by itself and thus there is no allocations just pointer reading which is why a linked list is so fast!

See these classes and my article here for more info

@

Complete Managed Media Aggregation - Part I: Designing a Class vs Designing a Framework[^]

Complete Managed Media Aggregation Part 2: Making things easier[^]

And these classes

C#
#region Stream Classes

    #region EnumerableByteStream

    public class EnumerableByteStream : IEnumerable<byte>, IEnumerator<byte>, IList<byte>
    {
        protected System.IO.Stream m_Stream;

        internal protected readonly long m_VirtualIndex, m_VirtualCount;

        internal protected int m_Current;

        internal protected bool m_Disposed;

        public bool Read { get { return m_Current != -1; } }

        public int CurrentInt { get { return m_Current; } }

        public byte Current { get { return (byte)m_Current; } }

        public bool CanSeek { get { return m_Stream.CanSeek; } }

        public EnumerableByteStream(byte Byte) : this(Byte.Yield()) { }

        public EnumerableByteStream(IEnumerable<byte> bytes, int? index = null, int? count = null)
            : this(new System.IO.MemoryStream(bytes.ToArray(), index ?? 0, count ?? bytes.Count()), index, count) { }

        public EnumerableByteStream(System.IO.Stream stream, int? index = null, int? count = null)
        {
            if (stream == null) throw new ArgumentNullException();
            else if (!stream.CanRead && count > 0) throw new InvalidOperationException("The given stream must be able to read to offset from the beginning");

            m_Stream = stream;

            if (index.HasValue)
            {
                if (index > m_Stream.Length) throw new InvalidOperationException("Index must be less than stream.Length");
                else m_VirtualIndex = (long)index;

                if (m_VirtualIndex != 0)
                {
                    if ((index != stream.Position && !stream.CanSeek)) throw new InvalidOperationException("Can only avdance a stream which can seek and a 0 based index was given which was not equal to the Position of the stream");
                    else m_VirtualIndex = stream.Length - stream.Seek((long)index, System.IO.SeekOrigin.Begin);
                }
            }
            else m_VirtualIndex = stream.Position;

            if (count.HasValue)
            {
                if (count + m_VirtualIndex < 0 && count + m_VirtualIndex > m_Stream.Length) throw new InvalidOperationException("Index must be greater than 0 and less than stream.Length");
                else m_VirtualCount = (long)count;
            }
            else m_VirtualCount = stream.Length;
        }

        public IEnumerable<byte> ToArray(int offset, int count, byte[] buffer)
        {
            if (offset < 0) throw new ArgumentOutOfRangeException("offset must refer to a location within the buffer.");
            else if (count + offset > Length) throw new ArgumentOutOfRangeException("count must refer to a location within the buffer with respect to offset.");

            if (count == 0) return Enumerable.Empty<byte>();
            buffer = buffer ?? new byte[count];
            int len = count;
            while ((len -= m_Stream.Read(buffer, offset, count)) > 0
                &&
                m_VirtualCount != -1 ? m_VirtualIndex < m_VirtualCount : true)
            {
                //
            }
            return buffer;
        }

        public IEnumerator<byte> GetEnumerator()
        {
            if (m_Stream.CanSeek) Reset();
            while (!m_Disposed && MoveNext()) yield return (byte)m_Current;
        }

        public bool MoveNext() { return CoreGetEnumerator() != -1; }

        int CoreGetEnumerator(long direction = 1)
        {
            if (!m_Disposed && m_Stream.CanRead && m_Stream.Position < m_Stream.Length)
            {
                //Stop the enumeration if we are past the virtual index by indicating -1
                if (m_Stream.Position >= Count) m_Current = -1;
                else m_Current = m_Stream.ReadByte();
            }
            else m_Current = -1;
            return m_Current;
        }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return GetEnumerator(); }

        internal protected long CoreIndexOf(IEnumerable<byte> items, int start = -1, int count = -1)
        {
            if (m_Stream == null || m_Disposed || items == null || items == Enumerable.Empty<byte>() || count == 0) return -1;
            if (count == -1) count = items.Count();

            if (!Read && !MoveNext()) return -1;

            if (start != -1 && start + count > Count) return -1;
            else if (start != -1 && start != Position) if (m_Stream.CanSeek) m_Stream.Seek(start, System.IO.SeekOrigin.Begin);
                else return -1;//throw new InvalidOperationException("The underlying stream must be able to seek if the start index is specified and not equal to -1");

            using (IEnumerator<byte> itemPointer = items.Skip(start).Take(count).GetEnumerator())
            {
                if (!itemPointer.MoveNext()) return -1;
                //We start at 
                long position = Position;
                if (start == -1 && m_Stream.CanSeek && m_Stream.Position != 0 && itemPointer.Current != Current)
                {
                    m_Stream.Seek((long)m_VirtualIndex, System.IO.SeekOrigin.Begin);
                    Reset();
                }
                else start = (int)m_Stream.Position;
                //While there is an itemPointer
                while (itemPointer != null)
                {
                    int j = count;
                    while (itemPointer.Current == Current && (--j > 0))
                    {
                        if (!itemPointer.MoveNext()) break;
                    }
                    //The match is complete
                    if (j == 0)
                    {
                        //If CanSeek and moved the position and we will go back to where we were
                        //if (m_Stream.CanSeek && position != Position) m_Stream.Seek(position, System.IO.SeekOrigin.Begin); //Curent and Begin need to be aware...
                        return m_Stream.Position - 1; //-1 Because a byte was read to obtain Current
                    }
                    if (!MoveNext()) break;
                }
                if (start == -1 && m_Stream.CanSeek && position != Position) m_Stream.Seek(position, System.IO.SeekOrigin.Begin);
                return -1;
            }
        }

        internal protected int CoreIndexOf(byte item, int start, int count) { return (int)CoreIndexOf(item.Yield(), start, count); }

        public virtual int IndexOf(byte item)
        {
            return CoreIndexOf(item, -1, 1);
        }

        public virtual void Insert(int index, byte item)
        {

            //System.IO.MemoryStream newMemory = new System.IO.MemoryStream(Count + 1);
            //System.IO.Stream oldMemory;
            //using (EnumerableByteStream preSegment = new EnumerableByteStream(m_Stream, 0, index - 1))
            //{
            //    using (EnumerableByteStream postSegment = new EnumerableByteStream(m_Stream, index - 1, Count - index + 1))
            //    {
            //        foreach (byte b in preSegment) newMemory.WriteByte(b);
            //        newMemory.WriteByte(item);
            //        foreach (byte b in postSegment) newMemory.WriteByte(b);
            //    }
            //}
            //oldMemory = m_Stream;
            //m_Stream = newMemory;
            //oldMemory.Dispose();

            //Linked stream around origional bytes up to index
            //additional byte
            //Rest of old stream
            //long preInsert = m_Stream.Position;
            m_Stream = LinkedStream.LinkAll(new EnumerableByteStream(m_Stream, 0, index - 1), new EnumerableByteStream(item), new EnumerableByteStream(m_Stream, index - 1, Count - index + 1));
            //m_Stream.Position = preInsert;
        }

        public virtual void RemoveAt(int index)
        {
            //Linked stream around index
            m_Stream = LinkedStream.LinkAll(new EnumerableByteStream(m_Stream, 0, index), new EnumerableByteStream(m_Stream, ++index, Count - index));
        }

        /// <summary>
        /// Sets or Retrieves a byte from the underlying stream
        /// </summary>
        /// <param name="index"></param>
        /// <returns></returns>
        public virtual byte this[int index]
        {
            get
            {
                if (index < 1) index = 0;
                if (index != m_Stream.Position)
                {
                    if (m_Stream.CanSeek) m_Stream.Seek(index, System.IO.SeekOrigin.Begin);
                    else throw new InvalidOperationException("You can only move the index if the underlying stream CanSeek");
                }
                return (byte)m_Stream.ReadByte();
            }
            set
            {
                if (m_Stream.CanWrite && m_Stream.CanSeek) m_Stream.Seek(index, System.IO.SeekOrigin.Begin);
                else throw new InvalidOperationException("You can logically set a byte in the stream the index if the underlying stream supports CanWrite and CanSeek");
                m_Stream.Write(value.Yield().ToArray(), 0, 1);
            }
        }

        public virtual void Add(byte item)
        {
            if (m_Stream.CanWrite) m_Stream.Write(item.Yield().ToArray(), 0, 1);
            else throw new InvalidOperationException("You can logically set a byte in the stream the index if the underlying stream supports CanWrite");
        }

        /// <summary>
        /// Erases all bytes in perspective of the EnumerableByteStream
        /// </summary>
        /// <remarks>
        /// Creates a new <see cref="System.IO.MemoryStream"/> in the place of the m_Stream.
        /// </remarks>
        public virtual void Clear()
        {
            m_Stream = new System.IO.MemoryStream();
            return;
        }

        public virtual bool Contains(byte item)
        {
            //See CachingEnumerableByteStream on why not < -1
            return CoreIndexOf(item, 0, 1) != -1;
        }

        /// <summary>
        /// Advanced the underlying System.IO.Stream by reading into the given array
        /// </summary>
        /// <param name="array">The array to read into</param>
        /// <param name="arrayIndex">The index into the given array to stary copying at</param>
        public virtual void CopyTo(byte[] array, int arrayIndex)
        {
            CoreCopyTo(array, arrayIndex);
        }

        public virtual void CoreCopyTo(byte[] array, int arrayIndex, int length = -1)
        {
            if (length <= 0) return;
            if (length == -1) length = array.Length - arrayIndex;
            else if (length > m_Stream.Length) throw new ArgumentOutOfRangeException("Can't copy more bytes then are availble from the stream");
            m_Stream.Read(array, arrayIndex, length);
        }

        public long Position { get { return m_VirtualIndex; } }

        public virtual long Length
        {
            get { return m_VirtualCount != -1 ? m_VirtualCount : m_Stream.Length; }
        }

        public int Count { get { return (int)Length; } }

        public virtual bool IsReadOnly
        {
            get { return m_Stream.CanWrite; }
        }

        public virtual bool Remove(byte item)
        {
            //Create N new EnumerableByteStreams with the items index noted in each iteration.
            //For each occurance of item in the underlying stream place an index
            //For each index create a new EnumerableByteStream with the index = i and the count = 1
            //m_Stream = new LinkedStream(parts)
            //return true if any of this happened 
            return false;
        }

        public static implicit operator System.IO.Stream(EnumerableByteStream eByteStream) { return eByteStream.m_Stream; }

        public void Dispose()
        {
            m_Disposed = true;
            m_Stream.Dispose();
        }

        object System.Collections.IEnumerator.Current
        {
            get { return GetEnumerator().Current; }
        }

        void System.Collections.IEnumerator.Reset()
        {
            m_Current = -1;
            if (m_Stream.CanSeek) m_Stream.Seek(m_VirtualIndex, System.IO.SeekOrigin.Begin);
        }

        public void Reset()
        {
            (this as System.Collections.IEnumerator).Reset();
        }

        public long Seek(long offset, System.IO.SeekOrigin origin) { if (m_Stream.CanSeek) return m_Stream.Seek(offset, origin); return m_Stream.Position; }
    }

    #endregion

    #region CachingEnumerableByteStream

    //Todo Test and Complete, use LinkedStream if required
    //Aims to be a type of constrained stream as well give the ability to cache previous read bytes which may no longer be able to be read from the stream
    public class CachingEnumerableByteStream : EnumerableByteStream
    {
        //Same as above but with a Cache for previously read bytes
        List<byte> m_ReadCache = new List<byte>(), m_WriteCache = new List<byte>();

        public CachingEnumerableByteStream(System.IO.Stream stream)
            : base(stream)
        {
        }

        internal void EnsureCache(int index)
        {
            try
            {
                //Read Ahead
                if (index > m_ReadCache.Count)
                {
                    byte[] buffer = new byte[index - m_Stream.Position];
                    CopyTo(buffer, 0);
                    m_ReadCache.AddRange(buffer);
                }
            }
            catch { throw; }
        }

        public override byte this[int index]
        {
            get
            {
                EnsureCache(index);
                return m_ReadCache[index];
            }
            set
            {
                //Read Ahead
                EnsureCache(index);
                base[index] = m_ReadCache[index] = value;
            }
        }
    }

    #endregion

    #region LinkedStream

    /// <summary>
    /// Represtents multiple streams as a single stream.
    /// </summary>
    /// <remarks>
    /// Turning into a interesting little looping buffer when you just call AsPerpetual, would be a cool idea for a RingBuffer
    /// </remarks>
    public class LinkedStream :
        System.IO.Stream, //LinkedStream is a System.IO.Stream
        IEnumerable<EnumerableByteStream>, //Which happens to be IEnumerable of more Stream's
        IEnumerator<EnumerableByteStream>, //It can maintain the state of those Stream's which it is IEnumerable
        IEnumerable<byte>, //It can be thought of as a single contagious allocation of memory to callers
        IEnumerator<byte>//It can maintain a state of those bytes in which it enumerates
    {
        #region Fields

        internal protected IEnumerable<EnumerableByteStream> m_Streams;

        internal protected ulong m_AbsolutePosition;

        internal protected IEnumerator<EnumerableByteStream> m_Enumerator;

        internal EnumerableByteStream m_CurrentStream;

        internal int m_StreamIndex = 0;

        internal protected bool m_Disposed;

        #endregion

        #region Propeties

        public System.IO.Stream CurrentStream
        {
            get
            {
                if (m_Disposed) return null;
                if (m_CurrentStream == null) m_CurrentStream = m_Enumerator.Current;
                return m_CurrentStream;
            }
        }

        public override bool CanRead
        {
            get { return !m_Disposed && CurrentStream.CanRead; }
        }

        public override bool CanSeek
        {
            get { return !m_Disposed && CurrentStream.CanSeek; }
        }

        public override bool CanWrite
        {
            get { return !m_Disposed && CurrentStream.CanWrite; }
        }

        public override void Flush()
        {
            if (!m_Disposed) CurrentStream.Flush();
        }

        public override long Length
        {
            get { return (long)TotalLength; }
        }

        public ulong TotalLength
        {
            get
            {
                if (m_Disposed) return 0;
                ulong totalLength = 0;
                foreach (EnumerableByteStream stream in this as IEnumerable<EnumerableByteStream>) totalLength += (ulong)(stream.Count);
                return totalLength;
            }
        }

        public override long Position
        {
            get
            {
                return (long)TotalPosition;
            }
            set
            {
                if (!m_Disposed) SeekAbsolute(value, System.IO.SeekOrigin.Current);
            }
        }

        public ulong TotalPosition
        {
            ///The total position is the m_AbsolutePosition and the currentPosition which is free roaming
            get { return m_Disposed ? 0 : m_AbsolutePosition + (ulong)m_CurrentStream.Position; }
            protected internal set { if (m_Disposed) return; m_AbsolutePosition = value; }
        }

        #endregion

        #region Constructors

        public LinkedStream(IEnumerable<EnumerableByteStream> streams)
        {
            if (streams == null) streams = Enumerable.Empty<EnumerableByteStream>();
            m_Streams = streams;
            m_Enumerator = GetEnumerator();
            m_Enumerator.MoveNext();
        }

        public LinkedStream(EnumerableByteStream stream) : this(stream.Yield()) { }


        public static LinkedStream LinkAll(params EnumerableByteStream[] streams) { return new LinkedStream(streams); }

        #endregion

        #region Methods

        /// <summary>
        /// Creates a new MemoryStream with the contents of all sub streams
        /// </summary>
        /// <returns>The resulting MemoryStream</returns>
        public System.IO.MemoryStream Flatten()
        {
            System.IO.MemoryStream memory = new System.IO.MemoryStream((int)TotalLength);
            foreach (System.IO.Stream stream in m_Streams) stream.CopyTo(memory);
            return memory;
        }

        public LinkedStream Link(params EnumerableByteStream[] streams) { return new LinkedStream(m_Streams.Concat(streams)); }

        public LinkedStream Link(EnumerableByteStream stream) { return Link(stream.Yield().ToArray()); }

        public LinkedStream Unlink(int streamIndex, bool reverse = false)
        {
            if (streamIndex > m_Streams.Count()) throw new ArgumentOutOfRangeException("index cannot be greater than the amount of contained streams");
            IEnumerable<EnumerableByteStream> streams = m_Streams;
            if (reverse) streams.Reverse();
            return new LinkedStream(m_Streams.Skip(streamIndex));
        }

        public LinkedStream SubStream(ulong absoluteIndex, ulong count)
        {

            LinkedStream result = new LinkedStream(new EnumerableByteStream(new System.IO.MemoryStream()));

            if (count == 0) return result;

            while (absoluteIndex > 0)
            {
                //absoluteIndex -= Read(// StreamOverload
            }

            return result;
        }

        internal void SelectStream(int logicalIndex)
        {
            if (m_Disposed) return;
            //If the stream is already selected return
            if (m_StreamIndex == logicalIndex) return;
            else if (logicalIndex < m_StreamIndex) m_Enumerator.Reset(); //If the logicalIndex is bofore the stream index then reset
            while (logicalIndex > m_StreamIndex) MoveNext();//While the logicalIndex is > the stream index MoveNext (casues m_StreamIndex to be increased).
        }

        internal void SelectStream(ulong absolutePosition)
        {
            if (m_Disposed) return;
            if (TotalPosition == absolutePosition) return;
            else if (absolutePosition < TotalPosition) m_Enumerator.Reset(); //If seeking to a position before the TotalPosition reset
            //While the total postion is not in range
            while (TotalPosition < absolutePosition)
            {
                //Move to the next stream (causing TotalPosition to advance by the current streams length), if we cant then return
                if (!MoveNext()) return;
                //subtract the length of the stream we skipped from the absolutePosition
                if (CurrentStream.Length > 0) absolutePosition -= (ulong)CurrentStream.Length;
            }
        }

        #endregion

        #region Wrappers

        public override int ReadByte()
        {
            return CurrentStream.ReadByte();
        }

        public override int Read(byte[] buffer, int offset, int count)
        {
            return CoreRead(buffer, offset, count);
        }

        internal protected int CoreRead(byte[] buffer, long offset, long count, System.IO.SeekOrigin readOrigin = System.IO.SeekOrigin.Current)
        {
            switch (readOrigin)
            {
                case System.IO.SeekOrigin.Begin:
                    {
                        Seek(offset, System.IO.SeekOrigin.Begin);
                        goto case System.IO.SeekOrigin.Current;
                    }
                case System.IO.SeekOrigin.End:
                    {
                        //Seek to the end
                        MoveToEnd();

                        //Use the current case
                        goto case System.IO.SeekOrigin.Current;
                    }
                case System.IO.SeekOrigin.Current:
                    {
                        int read = 0;
                        while (read < count)
                        {
                            if (CurrentStream.Position < offset && CurrentStream.CanSeek) CurrentStream.Seek(offset, System.IO.SeekOrigin.Begin);
                            read += CurrentStream.Read(buffer, (int)offset, (int)count - (int)read);
                            if (CurrentStream.Position >= CurrentStream.Length && read < count) if (!MoveNext()) break;
                            //if (read != -1) m_AbsolutePosition += (uint)read;
                        }
                        return read;
                    }
                default:
                    {
                        Utility.BreakIfAttached();
                        break;
                    }
            }
            return 0;
        }

        bool MoveNext()
        {
            if (m_Disposed || m_Streams == null || m_Enumerator == null) return false;
            //If the current stream can seek it's not a big deal to reset it now also
            //if (m_CurrentStream.CanSeek) m_CurrentStream.Seek(0, System.IO.SeekOrigin.Begin);
            m_AbsolutePosition += (ulong)(m_CurrentStream.Count - m_CurrentStream.Position); //Cumulate the position of the non seeking stream - what was already read
            //Advance the stream index
            ++m_StreamIndex;
            //Return the result of moving the enumerator to the next available streeam
            return m_Enumerator.MoveNext();
        }

        public override long Seek(long offset, System.IO.SeekOrigin origin)
        {
            switch (origin)
            {
                case System.IO.SeekOrigin.End:
                case System.IO.SeekOrigin.Begin:
                    return (long)SeekAbsolute(offset, System.IO.SeekOrigin.Begin);
                case System.IO.SeekOrigin.Current:
                    return m_CurrentStream.Seek(offset, origin);
                default: return m_CurrentStream.Position;
            }
        }

        public void MoveToEnd()
        {
            //While thre is a stream to advance
            while (MoveNext())
            {
                //Move the current stream to it's last position
                CurrentStream.Position = CurrentStream.Length;
            }
        }

        public ulong SeekAbsolute(long offset, System.IO.SeekOrigin origin)
        {
            switch (origin)
            {
                case System.IO.SeekOrigin.Current:
                    {

                        //Determine how many bytes to read in terms of long
                        long offsetPosition = (long)TotalPosition - offset;
                        if (offsetPosition < 0) CurrentStream.Seek(offsetPosition, System.IO.SeekOrigin.Current);
                        else CurrentStream.Position = offsetPosition;
                        return TotalPosition;
                    }
                case System.IO.SeekOrigin.Begin:
                    {
                        SelectStream((ulong)offset);
                        goto case System.IO.SeekOrigin.Current;
                    }
                case System.IO.SeekOrigin.End:
                    {
                        MoveToEnd();
                        goto case System.IO.SeekOrigin.Current;
                    }
                default:
                    {
                        Utility.BreakIfAttached();
                        break;
                    }
            }
            return m_AbsolutePosition;
        }



        IEnumerator<EnumerableByteStream> GetEnumerator() { return m_Disposed ? null : m_Streams.GetEnumerator(); }

        public override void SetLength(long value)
        {
            //If the position is < value set the position to the value first which will update CurrentStream if required.
            if (value < Length) Position = (long)m_AbsolutePosition + value;
            //Set the length of the current stream 
            CurrentStream.SetLength(value);
            m_AbsolutePosition = (ulong)value;
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            if (count < CoreWrite(buffer, (ulong)offset, (ulong)count))
            {
#if DEBUG
                Utility.BreakIfAttached();
#endif
            }
        }

        ///Todo => Test and complete
        internal protected int CoreWrite(byte[] buffer, ulong offset, ulong count, System.IO.SeekOrigin readOrigin = System.IO.SeekOrigin.Current)
        {
            //Select the stream to write based on the offset
            SelectStream((ulong)offset);
            switch (readOrigin)
            {
                case System.IO.SeekOrigin.Current:
                    {
                        return (this as IEnumerator<System.IO.Stream>).Current.Read(buffer, (int)offset, (int)count);
                    }
                case System.IO.SeekOrigin.Begin:
                    {
                        return (this as IEnumerator<System.IO.Stream>).Current.Read(buffer, (int)offset, (int)count);
                    }
                case System.IO.SeekOrigin.End:
                    {
                        return (this as IEnumerator<System.IO.Stream>).Current.Read(buffer, (int)offset, (int)count);
                    }
                default:
                    {
#if DEBUG
                        Utility.BreakIfAttached();
#endif
                        break;
                    }
            }
            return 0;
        }

        IEnumerator<EnumerableByteStream> IEnumerable<EnumerableByteStream>.GetEnumerator()
        {
            return m_Streams.GetEnumerator();
        }

        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
        {
            return m_Streams.GetEnumerator();
        }

        EnumerableByteStream IEnumerator<EnumerableByteStream>.Current
        {
            get { return GetEnumerator().Current; }
        }

        void IDisposable.Dispose()
        {
            m_Disposed = true;
            if (m_Streams != null) m_Streams = null;
            m_StreamIndex = -1;
            base.Dispose();
        }

        void System.Collections.IEnumerator.Reset()
        {
            foreach (EnumerableByteStream stream in m_Streams) if (stream.CanSeek) stream.Seek(0, System.IO.SeekOrigin.Begin);
            GetEnumerator().Reset();
        }

        object System.Collections.IEnumerator.Current
        {
            get { return CurrentStream; }
        }

        bool System.Collections.IEnumerator.MoveNext()
        {
            return MoveNext();
        }

        IEnumerator<byte> IEnumerable<byte>.GetEnumerator() { return new EnumerableByteStream(this as IEnumerable<byte>).GetEnumerator(); }

        byte IEnumerator<byte>.Current { get { return (this as IEnumerable<byte>).GetEnumerator().Current; } }

        #endregion
    }

    #endregion

    #endregion

GeneralMy vote of 5 Pin
aklahr26-Feb-13 23:10
aklahr26-Feb-13 23:10 
QuestionBlockSize as constructor argument Pin
Idsa7-Feb-13 0:35
Idsa7-Feb-13 0:35 
GeneralMy vote of 5 Pin
Şafak Gür10-Jan-13 4:41
Şafak Gür10-Jan-13 4:41 
QuestionVery useful! Pin
tekHedd13-Nov-12 12:47
tekHedd13-Nov-12 12:47 
QuestionMy vote of 5 Pin
Marius Bancila17-Apr-12 21:29
professionalMarius Bancila17-Apr-12 21:29 
QuestionMy Vote of 6 too Pin
Shahin Khorshidnia14-Apr-12 3:26
professionalShahin Khorshidnia14-Apr-12 3:26 
Questionnice Pin
BillW335-Apr-12 10:51
professionalBillW335-Apr-12 10:51 
QuestionMy vote of 6 Pin
abdurahman ibn hattab2-Apr-12 12:17
abdurahman ibn hattab2-Apr-12 12:17 
AnswerRe: My vote of 6 Pin
Shahin Khorshidnia14-Apr-12 3:21
professionalShahin Khorshidnia14-Apr-12 3:21 
QuestionCompiling Problem Pin
dp.sistemas27-Mar-12 10:26
dp.sistemas27-Mar-12 10:26 
AnswerRe: Compiling Problem Pin
bitterskittles27-Mar-12 22:59
bitterskittles27-Mar-12 22:59 

General General    News News    Suggestion Suggestion    Question Question    Bug Bug    Answer Answer    Joke Joke    Praise Praise    Rant Rant    Admin Admin   

Use Ctrl+Left/Right to switch messages, Ctrl+Up/Down to switch threads, Ctrl+Shift+Left/Right to switch pages.