Click here to Skip to main content
15,892,768 members
Articles / Programming Languages / C#

Memory Stream Multiplexer–write and read from many threads simultaneously

Rate me:
Please Sign up or sign in to vote.
4.76/5 (14 votes)
22 Jul 2012CPOL3 min read 97.9K   1K   52  
MemoryStreamMultiplexer is a MemoryStream like buffer manager where one thread can write and many threads can read from it simultaneously. It supports blocking reads, so that reader threads can call .Read() and wait for some data to be written. Handy for loading data in one thread that is consumed c
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.IO;

namespace MultithreadedStream
{
    /// <summary>
    /// Multithreaded buffer where one thread can write and many threads can read simultaneously. 
    /// </summary>
    public class MemoryStreamMultiplexer : IDisposable
    {
        private ManualResetEvent[] _dataReadyEvents = new ManualResetEvent[255];
        private ManualResetEvent[] _finishedEvents = new ManualResetEvent[255];
        private int _readerCount = 0;

        private bool _finished;
        private int _Length;
        private List<byte[]> _Buffer = new List<byte[]>();

        public int Length { get { return _Length; } }

        public MemoryStreamMultiplexer()
        {
            
        }

        public void Write(byte[] data, int pos, int length)
        {
            byte[] newBuf = new byte[length];
            Buffer.BlockCopy(data, pos, newBuf, 0, length);
            lock (_Buffer)
            {
                _Buffer.Add(newBuf);
                _Length += length;
            }
            Set();
        }

        private void Set()
        {
            for (int i = 0; i < _readerCount; i++)
                _dataReadyEvents[i].Set();
        }

        public void Finish()
        {
            for (int i = 0; i < _readerCount; i++)
                _finishedEvents[i].Set();
            _finished = true;
        }

        public MemoryStreamReader GetReader()
        {
            ManualResetEvent dataReady = new ManualResetEvent(_finished);
            ManualResetEvent finished = new ManualResetEvent(_finished);
            lock (_dataReadyEvents)
            {
                _dataReadyEvents[_readerCount] = dataReady;
                _finishedEvents[_readerCount] = finished;
                _readerCount++;
            }            
            return new MemoryStreamReader(_Buffer, dataReady, finished);
        }

        private bool disposed = false;
        public void Dispose()
        {
            if (!disposed)
            {
                Finish();

                for (int i = 0; i < _readerCount; i++)
                {
                    _dataReadyEvents[i].Dispose();
                    _finishedEvents[i].Dispose();
                }
                _readerCount = 0;
                
                disposed = true;
            }
        }
    }

    public class MemoryStreamReader : Stream, IDisposable
    {
        private int _position;
        private int _bufferIndex;
        private int _bufferPos;
        private List<byte[]> _bufferList;

        private ManualResetEvent[] _waitHandles;
        private ManualResetEvent _dataReady;
        private ManualResetEvent _finished;

        public MemoryStreamReader(List<byte[]> bufferList, ManualResetEvent dataReady, ManualResetEvent finished)
        {
            _waitHandles = new ManualResetEvent[] { dataReady, finished };
            _bufferList = bufferList;
            _dataReady = dataReady;
            _finished = finished;
            _bufferPos = 0;
            _bufferIndex = 0;
            _position = 0;
        }

        public override bool CanRead
        {
            get { return true; }
        }

        public override bool CanSeek
        {
            get { return false; }
        }

        public override bool CanWrite
        {
            get { return false; }
        }

        public override void Flush()
        {
            throw new NotImplementedException();    
        }

        public override long Length
        {
            get { throw new NotImplementedException(); }
        }

        public override long Position
        {
            get
            {
                return _position;
            }
            set
            {
                throw new NotImplementedException();
            }
        }

        public override int Read(byte[] buffer, int offset, int count)
        {           
            if (_bufferIndex < _bufferList.Count)
            {
                return ReadInternal(buffer, offset, count);
            }
            else
            {
                _dataReady.Reset();
                // Wait for either data ready event of the finished event.
                int index = WaitHandle.WaitAny(_waitHandles, TimeSpan.FromSeconds(30), false);
                // either of the event fired. see if there's more data to read.
                if (_bufferIndex < _bufferList.Count)
                    return ReadInternal(buffer, offset, count);
                else
                    return 0;   // No more bytes will be available. Finished.
            }
        }

        private int ReadInternal(byte[] buffer, int offset, int count)
        {
            byte[] currentBuffer = _bufferList[_bufferIndex];

            if (_bufferPos + count <= currentBuffer.Length)
            {
                // the current buffer holds the same or more bytes than what is asked for
                // So, give what was asked.
                Buffer.BlockCopy(currentBuffer, _bufferPos, buffer, offset, count);

                _bufferPos += count;
                _position += count;
                return count;
            }
            else
            {
                // current buffer does not have the necessary bytes. deliver whatever is available.
                if (_bufferPos < currentBuffer.Length)
                {
                    int remainingBytes = currentBuffer.Length - _bufferPos;
                    Buffer.BlockCopy(currentBuffer, _bufferPos, buffer, offset, remainingBytes);

                    _position += remainingBytes;
                    _bufferIndex++;
                    _bufferPos = 0;

                    // Try to read from the next buffer in the list and deliver
                    // the undelivered bytes. The Read call might block and wait for 
                    // remaining bytes to appear. 
                    return remainingBytes + 
                        this.Read(buffer, offset + remainingBytes, count - remainingBytes);
                }
                else
                {
                    // Already all bytes from currnet buffer has been delivered. Try next buffer.
                    _bufferIndex++;
                    _bufferPos = 0;

                    // There may not be next buffer and thus we will have to wait.
                    return this.Read(buffer, offset, count);                    
                }
            }
        }

        public override long Seek(long offset, SeekOrigin origin)
        {
            throw new NotImplementedException();
        }

        public override void SetLength(long value)
        {
            throw new NotImplementedException();
        }

        public override void Write(byte[] buffer, int offset, int count)
        {
            throw new NotImplementedException();
        }

        public new void Dispose()
        {
            _dataReady = null;
            _finished = null;
            _bufferList = null;
            _waitHandles = null;
        }
    }
}

By viewing downloads associated with this article you agree to the Terms of Service and the article's licence.

If a file you wish to view isn't highlighted, and is a text file (not binary), please let us know and we'll add colourisation support for it.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)


Written By
Architect BT, UK (ex British Telecom)
United Kingdom United Kingdom

Comments and Discussions