Click here to Skip to main content
Click here to Skip to main content
Add your own
alternative version

Memory Stream Multiplexer–write and read from many threads simultaneously

By , 22 Jul 2012
MemoryStreamMultiplexer.zip
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.IO;

namespace MultithreadedStream
{
    /// <summary>
    /// Multithreaded buffer where one thread can write and many threads can read simultaneously. 
    /// </summary>
    public class MemoryStreamMultiplexer : IDisposable
    {
        private ManualResetEvent[] _dataReadyEvents = new ManualResetEvent[255];
        private ManualResetEvent[] _finishedEvents = new ManualResetEvent[255];
        private int _readerCount = 0;

        private bool _finished;
        private int _Length;
        private List<byte[]> _Buffer = new List<byte[]>();

        public int Length { get { return _Length; } }

        public MemoryStreamMultiplexer()
        {
            
        }

        public void Write(byte[] data, int pos, int length)
        {
            byte[] newBuf = new byte[length];
            Buffer.BlockCopy(data, pos, newBuf, 0, length);
            lock (_Buffer)
            {
                _Buffer.Add(newBuf);
                _Length += length;
            }
            Set();
        }

        private void Set()
        {
            for (int i = 0; i < _readerCount; i++)
                _dataReadyEvents[i].Set();
        }

        public void Finish()
        {
            for (int i = 0; i < _readerCount; i++)
                _finishedEvents[i].Set();
            _finished = true;
        }

        public MemoryStreamReader GetReader()
        {
            ManualResetEvent dataReady = new ManualResetEvent(_finished);
            ManualResetEvent finished = new ManualResetEvent(_finished);
            lock (_dataReadyEvents)
            {
                _dataReadyEvents[_readerCount] = dataReady;
                _finishedEvents[_readerCount] = finished;
                _readerCount++;
            }            
            return new MemoryStreamReader(_Buffer, dataReady, finished);
        }

        private bool disposed = false;
        public void Dispose()
        {
            if (!disposed)
            {
                Finish();

                for (int i = 0; i < _readerCount; i++)
                {
                    _dataReadyEvents[i].Dispose();
                    _finishedEvents[i].Dispose();
                }
                _readerCount = 0;
                
                disposed = true;
            }
        }
    }

    public class MemoryStreamReader : Stream, IDisposable
    {
        private int _position;
        private int _bufferIndex;
        private int _bufferPos;
        private List<byte[]> _bufferList;

        private ManualResetEvent[] _waitHandles;
        private ManualResetEvent _dataReady;
        private ManualResetEvent _finished;

        public MemoryStreamReader(List<byte[]> bufferList, ManualResetEvent dataReady, ManualResetEvent finished)
        {
            _waitHandles = new ManualResetEvent[] { dataReady, finished };
            _bufferList = bufferList;
            _dataReady = dataReady;
            _finished = finished;
            _bufferPos = 0;
            _bufferIndex = 0;
            _position = 0;
        }

        public override bool CanRead
        {
            get { return true; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush()
        {
            throw new NotImplementedException();    
        }

        public override long Length
        {
            get { throw new NotImplementedException(); }
        }

        public override long Position
        {
            get
            {
                return _position;
            }
            set
            {
                throw new NotImplementedException();
            }
        }

        public override int Read(byte[] buffer, int offset, int count)
        {           
            if (_bufferIndex < _bufferList.Count)
            {
                return ReadInternal(buffer, offset, count);
            }
            else
            {
                _dataReady.Reset();
                // Wait for either data ready event of the finished event.
                int index = WaitHandle.WaitAny(_waitHandles, TimeSpan.FromSeconds(30), false);
                // either of the event fired. see if there's more data to read.
                if (_bufferIndex < _bufferList.Count)
                    return ReadInternal(buffer, offset, count);
                else
                    return 0;   // No more bytes will be available. Finished.
            }
        }

        private int ReadInternal(byte[] buffer, int offset, int count)
        {
            byte[] currentBuffer = _bufferList[_bufferIndex];

            if (_bufferPos + count <= currentBuffer.Length)
            {
                // the current buffer holds the same or more bytes than what is asked for
                // So, give what was asked.
                Buffer.BlockCopy(currentBuffer, _bufferPos, buffer, offset, count);

                _bufferPos += count;
                _position += count;
                return count;
            }
            else
            {
                // current buffer does not have the necessary bytes. deliver whatever is available.
                if (_bufferPos < currentBuffer.Length)
                {
                    int remainingBytes = currentBuffer.Length - _bufferPos;
                    Buffer.BlockCopy(currentBuffer, _bufferPos, buffer, offset, remainingBytes);

                    _position += remainingBytes;
                    _bufferIndex++;
                    _bufferPos = 0;

                    // Try to read from the next buffer in the list and deliver
                    // the undelivered bytes. The Read call might block and wait for 
                    // remaining bytes to appear. 
                    return remainingBytes + 
                        this.Read(buffer, offset + remainingBytes, count - remainingBytes);
                }
                else
                {
                    // Already all bytes from currnet buffer has been delivered. Try next buffer.
                    _bufferIndex++;
                    _bufferPos = 0;

                    // There may not be next buffer and thus we will have to wait.
                    return this.Read(buffer, offset, count);                    
                }
            }
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }

        public new void Dispose()
        {
            _dataReady = null;
            _finished = null;
            _bufferList = null;
            _waitHandles = null;
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of use and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)

About the Author


| Advertise | Privacy | Mobile
Web02 | 2.8.140421.2 | Last Updated 22 Jul 2012
Article Copyright 2012 by Omar Al Zabir
Everything else Copyright © CodeProject, 1999-2014
Terms of Use
Layout: fixed | fluid